diff options
Diffstat (limited to 'chromium/base/task/thread_pool/job_task_source.h')
-rw-r--r-- | chromium/base/task/thread_pool/job_task_source.h | 140 |
1 files changed, 133 insertions, 7 deletions
diff --git a/chromium/base/task/thread_pool/job_task_source.h b/chromium/base/task/thread_pool/job_task_source.h index ddaca19b343..57838679e7a 100644 --- a/chromium/base/task/thread_pool/job_task_source.h +++ b/chromium/base/task/thread_pool/job_task_source.h @@ -38,10 +38,33 @@ class BASE_EXPORT JobTaskSource : public TaskSource { RepeatingCallback<size_t()> max_concurrency_callback, PooledTaskRunnerDelegate* delegate); + static experimental::JobHandle CreateJobHandle( + scoped_refptr<internal::JobTaskSource> task_source) { + return experimental::JobHandle(std::move(task_source)); + } + // Notifies this task source that max concurrency was increased, and the // number of worker should be adjusted. void NotifyConcurrencyIncrease(); + // Informs this JobTaskSource that the current thread would like to join and + // contribute to running |worker_task|. Returns true if the joining thread can + // contribute (RunJoinTask() can be called), or false if joining was completed + // and all other workers returned because either there's no work remaining or + // Job was cancelled. + bool WillJoin(); + + // Contributes to running |worker_task| and returns true if the joining thread + // can contribute again (RunJoinTask() can be called again), or false if + // joining was completed and all other workers returned because either there's + // no work remaining or Job was cancelled. This should be called only after + // WillJoin() or RunJoinTask() previously returned true. + bool RunJoinTask(); + + // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() + // to return RunStatus::kDisallowed. + void Cancel(TaskSource::Transaction* transaction = nullptr); + // TaskSource: ExecutionEnvironment GetExecutionEnvironment() override; size_t GetRemainingConcurrency() const override; @@ -50,6 +73,12 @@ class BASE_EXPORT JobTaskSource : public TaskSource { // concurrently. size_t GetMaxConcurrency() const; + // Returns true if a worker should return from the worker task on the current + // thread ASAP. + bool ShouldYield(); + + PooledTaskRunnerDelegate* delegate() const { return delegate_; } + #if DCHECK_IS_ON() size_t GetConcurrencyIncreaseVersion() const; // Returns true if the concurrency version was updated above @@ -58,11 +87,98 @@ class BASE_EXPORT JobTaskSource : public TaskSource { #endif // DCHECK_IS_ON() private: - static constexpr size_t kInvalidWorkerCount = - std::numeric_limits<size_t>::max(); + // Atomic internal state to track the number of workers running a task from + // this JobTaskSource and whether this JobTaskSource is canceled. + class State { + public: + static constexpr size_t kCanceledMask = 1; + static constexpr size_t kWorkerCountBitOffset = 1; + static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset; + + struct Value { + size_t worker_count() const { return value >> kWorkerCountBitOffset; } + // Returns true if canceled. + bool is_canceled() const { return value & kCanceledMask; } + + uint32_t value; + }; + + State(); + ~State(); + + // Sets as canceled using std::memory_order_relaxed. Returns the state + // before the operation. + Value Cancel(); + + // Increments the worker count by 1 if smaller than |max_concurrency| and if + // |!is_canceled()|, using std::memory_order_release, and returns the state + // before the operation. Equivalent to Load() otherwise. + Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency); + + // Decrements the worker count by 1 using std::memory_order_acquire. Returns + // the state before the operation. + Value DecrementWorkerCountFromWorkerAcquire(); + + // Increments the worker count by 1 using std::memory_order_relaxed. Returns + // the state before the operation. + Value IncrementWorkerCountFromJoiningThread(); + + // Decrements the worker count by 1 using std::memory_order_relaxed. Returns + // the state before the operation. + Value DecrementWorkerCountFromJoiningThread(); + + // Loads and returns the state, using std::memory_order_relaxed. + Value Load() const; + + private: + std::atomic<uint32_t> value_{0}; + }; + + // Atomic flag that indicates if the joining thread is currently waiting on + // another worker to yield or to signal. + class JoinFlag { + public: + static constexpr uint32_t kNotWaiting = 0; + static constexpr uint32_t kWaitingForWorkerToSignal = 1; + static constexpr uint32_t kWaitingForWorkerToYield = 3; + // kWaitingForWorkerToYield is 3 because the impl relies on the following + // property. + static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == + kWaitingForWorkerToSignal, + ""); + + JoinFlag(); + ~JoinFlag(); + + // Sets the status as kWaitingForWorkerToYield using + // std::memory_order_relaxed. + void SetWaiting(); + + // If the flag is kWaitingForWorkerToYield, returns true indicating that the + // worker should yield, and atomically updates to kWaitingForWorkerToSignal + // (using std::memory_order_relaxed) to ensure that a single worker yields + // in response to SetWaiting(). + bool ShouldWorkerYield(); + + // If the flag is kWaiting*, returns true indicating that the worker should + // signal, and atomically updates to kNotWaiting (using + // std::memory_order_relaxed) to ensure that a single worker signals in + // response to SetWaiting(). + bool ShouldWorkerSignal(); + + private: + std::atomic<uint32_t> value_{kNotWaiting}; + }; ~JobTaskSource() override; + // Called from the joining thread. Waits for the worker count to be below or + // equal to max concurrency (will happen when a worker calls + // DidProcessTask()). Returns true if the joining thread should run a task, or + // false if joining was completed and all other workers returned because + // either there's no work remaining or Job was cancelled. + bool WaitForParticipationOpportunity(); + // TaskSource: RunStatus WillRunTask() override; Optional<Task> TakeTask(TaskSource::Transaction* transaction) override; @@ -70,13 +186,23 @@ class BASE_EXPORT JobTaskSource : public TaskSource { bool DidProcessTask(TaskSource::Transaction* transaction) override; SequenceSortKey GetSortKey() const override; - // The current number of workers concurrently running tasks from this - // TaskSource. - std::atomic_size_t worker_count_{0U}; + // Current atomic state. + State state_; + // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() + // hence the use of atomics. + JoinFlag join_flag_ GUARDED_BY(lock_); + // Signaled when |join_flag_| is kWaiting* and a worker returns. + std::unique_ptr<ConditionVariable> worker_released_condition_ + GUARDED_BY(lock_); const Location from_here_; - base::RepeatingCallback<size_t()> max_concurrency_callback_; - base::RepeatingClosure worker_task_; + RepeatingCallback<size_t()> max_concurrency_callback_; + + // Worker task set by the job owner. + RepeatingCallback<void(experimental::JobDelegate*)> worker_task_; + // Task returned from TakeTask(), that calls |worker_task_| internally. + RepeatingClosure primary_task_; + const TimeTicks queue_time_; PooledTaskRunnerDelegate* delegate_; |