summaryrefslogtreecommitdiffstats
path: root/chromium/base/task/thread_pool/job_task_source.h
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/base/task/thread_pool/job_task_source.h')
-rw-r--r--chromium/base/task/thread_pool/job_task_source.h140
1 files changed, 133 insertions, 7 deletions
diff --git a/chromium/base/task/thread_pool/job_task_source.h b/chromium/base/task/thread_pool/job_task_source.h
index ddaca19b343..57838679e7a 100644
--- a/chromium/base/task/thread_pool/job_task_source.h
+++ b/chromium/base/task/thread_pool/job_task_source.h
@@ -38,10 +38,33 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
RepeatingCallback<size_t()> max_concurrency_callback,
PooledTaskRunnerDelegate* delegate);
+ static experimental::JobHandle CreateJobHandle(
+ scoped_refptr<internal::JobTaskSource> task_source) {
+ return experimental::JobHandle(std::move(task_source));
+ }
+
// Notifies this task source that max concurrency was increased, and the
// number of worker should be adjusted.
void NotifyConcurrencyIncrease();
+ // Informs this JobTaskSource that the current thread would like to join and
+ // contribute to running |worker_task|. Returns true if the joining thread can
+ // contribute (RunJoinTask() can be called), or false if joining was completed
+ // and all other workers returned because either there's no work remaining or
+ // Job was cancelled.
+ bool WillJoin();
+
+ // Contributes to running |worker_task| and returns true if the joining thread
+ // can contribute again (RunJoinTask() can be called again), or false if
+ // joining was completed and all other workers returned because either there's
+ // no work remaining or Job was cancelled. This should be called only after
+ // WillJoin() or RunJoinTask() previously returned true.
+ bool RunJoinTask();
+
+ // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
+ // to return RunStatus::kDisallowed.
+ void Cancel(TaskSource::Transaction* transaction = nullptr);
+
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
@@ -50,6 +73,12 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// concurrently.
size_t GetMaxConcurrency() const;
+ // Returns true if a worker should return from the worker task on the current
+ // thread ASAP.
+ bool ShouldYield();
+
+ PooledTaskRunnerDelegate* delegate() const { return delegate_; }
+
#if DCHECK_IS_ON()
size_t GetConcurrencyIncreaseVersion() const;
// Returns true if the concurrency version was updated above
@@ -58,11 +87,98 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
#endif // DCHECK_IS_ON()
private:
- static constexpr size_t kInvalidWorkerCount =
- std::numeric_limits<size_t>::max();
+ // Atomic internal state to track the number of workers running a task from
+ // this JobTaskSource and whether this JobTaskSource is canceled.
+ class State {
+ public:
+ static constexpr size_t kCanceledMask = 1;
+ static constexpr size_t kWorkerCountBitOffset = 1;
+ static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
+
+ struct Value {
+ size_t worker_count() const { return value >> kWorkerCountBitOffset; }
+ // Returns true if canceled.
+ bool is_canceled() const { return value & kCanceledMask; }
+
+ uint32_t value;
+ };
+
+ State();
+ ~State();
+
+ // Sets as canceled using std::memory_order_relaxed. Returns the state
+ // before the operation.
+ Value Cancel();
+
+ // Increments the worker count by 1 if smaller than |max_concurrency| and if
+ // |!is_canceled()|, using std::memory_order_release, and returns the state
+ // before the operation. Equivalent to Load() otherwise.
+ Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
+
+ // Decrements the worker count by 1 using std::memory_order_acquire. Returns
+ // the state before the operation.
+ Value DecrementWorkerCountFromWorkerAcquire();
+
+ // Increments the worker count by 1 using std::memory_order_relaxed. Returns
+ // the state before the operation.
+ Value IncrementWorkerCountFromJoiningThread();
+
+ // Decrements the worker count by 1 using std::memory_order_relaxed. Returns
+ // the state before the operation.
+ Value DecrementWorkerCountFromJoiningThread();
+
+ // Loads and returns the state, using std::memory_order_relaxed.
+ Value Load() const;
+
+ private:
+ std::atomic<uint32_t> value_{0};
+ };
+
+ // Atomic flag that indicates if the joining thread is currently waiting on
+ // another worker to yield or to signal.
+ class JoinFlag {
+ public:
+ static constexpr uint32_t kNotWaiting = 0;
+ static constexpr uint32_t kWaitingForWorkerToSignal = 1;
+ static constexpr uint32_t kWaitingForWorkerToYield = 3;
+ // kWaitingForWorkerToYield is 3 because the impl relies on the following
+ // property.
+ static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
+ kWaitingForWorkerToSignal,
+ "");
+
+ JoinFlag();
+ ~JoinFlag();
+
+ // Sets the status as kWaitingForWorkerToYield using
+ // std::memory_order_relaxed.
+ void SetWaiting();
+
+ // If the flag is kWaitingForWorkerToYield, returns true indicating that the
+ // worker should yield, and atomically updates to kWaitingForWorkerToSignal
+ // (using std::memory_order_relaxed) to ensure that a single worker yields
+ // in response to SetWaiting().
+ bool ShouldWorkerYield();
+
+ // If the flag is kWaiting*, returns true indicating that the worker should
+ // signal, and atomically updates to kNotWaiting (using
+ // std::memory_order_relaxed) to ensure that a single worker signals in
+ // response to SetWaiting().
+ bool ShouldWorkerSignal();
+
+ private:
+ std::atomic<uint32_t> value_{kNotWaiting};
+ };
~JobTaskSource() override;
+ // Called from the joining thread. Waits for the worker count to be below or
+ // equal to max concurrency (will happen when a worker calls
+ // DidProcessTask()). Returns true if the joining thread should run a task, or
+ // false if joining was completed and all other workers returned because
+ // either there's no work remaining or Job was cancelled.
+ bool WaitForParticipationOpportunity();
+
// TaskSource:
RunStatus WillRunTask() override;
Optional<Task> TakeTask(TaskSource::Transaction* transaction) override;
@@ -70,13 +186,23 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
- // The current number of workers concurrently running tasks from this
- // TaskSource.
- std::atomic_size_t worker_count_{0U};
+ // Current atomic state.
+ State state_;
+ // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
+ // hence the use of atomics.
+ JoinFlag join_flag_ GUARDED_BY(lock_);
+ // Signaled when |join_flag_| is kWaiting* and a worker returns.
+ std::unique_ptr<ConditionVariable> worker_released_condition_
+ GUARDED_BY(lock_);
const Location from_here_;
- base::RepeatingCallback<size_t()> max_concurrency_callback_;
- base::RepeatingClosure worker_task_;
+ RepeatingCallback<size_t()> max_concurrency_callback_;
+
+ // Worker task set by the job owner.
+ RepeatingCallback<void(experimental::JobDelegate*)> worker_task_;
+ // Task returned from TakeTask(), that calls |worker_task_| internally.
+ RepeatingClosure primary_task_;
+
const TimeTicks queue_time_;
PooledTaskRunnerDelegate* delegate_;