summaryrefslogtreecommitdiffstats
path: root/chromium/base/task/thread_pool/job_task_source.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/base/task/thread_pool/job_task_source.cc')
-rw-r--r--chromium/base/task/thread_pool/job_task_source.cc330
1 files changed, 252 insertions, 78 deletions
diff --git a/chromium/base/task/thread_pool/job_task_source.cc b/chromium/base/task/thread_pool/job_task_source.cc
index c6e5f9733c5..be52945d0d9 100644
--- a/chromium/base/task/thread_pool/job_task_source.cc
+++ b/chromium/base/task/thread_pool/job_task_source.cc
@@ -19,6 +19,120 @@
namespace base {
namespace internal {
+// Memory ordering on |state_| operations
+//
+// The write operation on |state_| in WillRunTask() uses
+// std::memory_order_release, matched by std::memory_order_acquire on read
+// operations (in DidProcessTask()) to establish a
+// Release-Acquire ordering. When a call to WillRunTask() is caused by an
+// increase of max concurrency followed by an associated
+// NotifyConcurrencyIncrease(), the priority queue lock guarantees an
+// happens-after relation with NotifyConcurrencyIncrease(). This ensures that an
+// increase of max concurrency that happened-before NotifyConcurrencyIncrease()
+// is visible to a read operation that happens-after WillRunTask().
+//
+// In DidProcessTask(), this is necessary to
+// ensure that the task source is always re-enqueued when it needs to. When the
+// task source needs to be queued, either because the current task yielded or
+// because of NotifyConcurrencyIncrease(), one of the following is true:
+// A) DidProcessTask() happens-after WillRunTask():
+// T1: Current task returns (because it is done) or yields.
+// T2: Increases the value returned by GetMaxConcurrency()
+// NotifyConcurrencyIncrease() enqueues the task source
+// T3: WillRunTask(), in response to the concurrency increase - Release
+// Does not keep the TaskSource in PriorityQueue because it is at max
+// concurrency
+// T1: DidProcessTask() - Acquire - Because of memory barrier, sees the same
+// (or newer) max concurrency as T2
+// Re-enqueues the TaskSource because no longer at max concurrency
+// Without the memory barrier, T1 may see an outdated max concurrency that
+// is lower than the actual max concurrency and won't re-enqueue the
+// task source, because it thinks it's already saturated.
+// The task source often needs to be re-enqueued if its task
+// completed because it yielded and |max_concurrency| wasn't decreased.
+// B) DidProcessTask() happens-before WillRunTask():
+// T1: Current task returns (because it is done) or yields
+// T2: Increases the value returned by GetMaxConcurrency()
+// NotifyConcurrencyIncrease() enqueues the task source
+// T1: DidProcessTask() - Acquire (ineffective)
+// Since the task source is already in the queue, it doesn't matter
+// whether T1 re-enqueues the task source or not.
+// Note that stale values the other way around can cause incorrectly
+// re-enqueuing this task_source, which is not an issue because the queues
+// support empty task sources.
+
+JobTaskSource::State::State() = default;
+JobTaskSource::State::~State() = default;
+
+JobTaskSource::State::Value JobTaskSource::State::Cancel() {
+ return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::TryIncrementWorkerCountFromWorkerRelease(
+ size_t max_concurrency) {
+ uint32_t value_before_add = value_.load(std::memory_order_relaxed);
+
+ // std::memory_order_release on success to establish Release-Acquire ordering
+ // with DecrementWorkerCountAcquire() (see Memory Ordering comment at top of
+ // the file).
+ while (!(value_before_add & kCanceledMask) &&
+ (value_before_add >> kWorkerCountBitOffset) < max_concurrency &&
+ !value_.compare_exchange_weak(
+ value_before_add, value_before_add + kWorkerCountIncrement,
+ std::memory_order_release, std::memory_order_relaxed)) {
+ }
+ return {value_before_add};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::DecrementWorkerCountFromWorkerAcquire() {
+ const size_t value_before_sub =
+ value_.fetch_sub(kWorkerCountIncrement, std::memory_order_acquire);
+ DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
+ return {value_before_sub};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::IncrementWorkerCountFromJoiningThread() {
+ size_t value_before_add =
+ value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
+ return {value_before_add};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::DecrementWorkerCountFromJoiningThread() {
+ const size_t value_before_sub =
+ value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
+ DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
+ return {value_before_sub};
+}
+
+JobTaskSource::State::Value JobTaskSource::State::Load() const {
+ return {value_.load(std::memory_order_relaxed)};
+}
+
+JobTaskSource::JoinFlag::JoinFlag() = default;
+JobTaskSource::JoinFlag::~JoinFlag() = default;
+
+void JobTaskSource::JoinFlag::SetWaiting() {
+ const auto previous_value =
+ value_.exchange(kWaitingForWorkerToYield, std::memory_order_relaxed);
+ DCHECK(previous_value == kNotWaiting);
+}
+
+bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
+ // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
+ // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
+ return value_.fetch_and(kWaitingForWorkerToSignal,
+ std::memory_order_relaxed) ==
+ kWaitingForWorkerToYield;
+}
+
+bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
+ return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
+}
+
JobTaskSource::JobTaskSource(
const Location& from_here,
const TaskTraits& traits,
@@ -28,64 +142,122 @@ JobTaskSource::JobTaskSource(
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
from_here_(from_here),
max_concurrency_callback_(std::move(max_concurrency_callback)),
- worker_task_(base::BindRepeating(
- [](JobTaskSource* self,
- const RepeatingCallback<void(experimental::JobDelegate*)>&
- worker_task) {
+ worker_task_(std::move(worker_task)),
+ primary_task_(base::BindRepeating(
+ [](JobTaskSource* self) {
// Each worker task has its own delegate with associated state.
experimental::JobDelegate job_delegate{self, self->delegate_};
- worker_task.Run(&job_delegate);
+ self->worker_task_.Run(&job_delegate);
},
- base::Unretained(this),
- std::move(worker_task))),
+ base::Unretained(this))),
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
}
JobTaskSource::~JobTaskSource() {
-#if DCHECK_IS_ON()
- auto worker_count = worker_count_.load(std::memory_order_relaxed);
// Make sure there's no outstanding active run operation left.
- DCHECK(worker_count == 0U || worker_count == kInvalidWorkerCount)
- << worker_count;
-#endif
+ DCHECK_EQ(state_.Load().worker_count(), 0U);
}
ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create(), nullptr};
}
+bool JobTaskSource::WillJoin() {
+ {
+ CheckedAutoLock auto_lock(lock_);
+ DCHECK(!worker_released_condition_); // This may only be called once.
+ worker_released_condition_ = lock_.CreateConditionVariable();
+ }
+ // std::memory_order_relaxed on |worker_count_| is sufficient because call to
+ // GetMaxConcurrency() is used for a best effort early exit. Stale values will
+ // only cause WaitForParticipationOpportunity() to be called.
+ const auto state_before_add = state_.IncrementWorkerCountFromJoiningThread();
+
+ if (!state_before_add.is_canceled() &&
+ state_before_add.worker_count() < GetMaxConcurrency()) {
+ return true;
+ }
+ return WaitForParticipationOpportunity();
+}
+
+bool JobTaskSource::RunJoinTask() {
+ experimental::JobDelegate job_delegate{this, nullptr};
+ worker_task_.Run(&job_delegate);
+
+ // std::memory_order_relaxed on |worker_count_| is sufficient because the call
+ // to GetMaxConcurrency() is used for a best effort early exit. Stale values
+ // will only cause WaitForParticipationOpportunity() to be called.
+ const auto state = state_.Load();
+ if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency())
+ return true;
+
+ return WaitForParticipationOpportunity();
+}
+
+void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
+ // Sets the kCanceledMask bit on |state_| so that further calls to
+ // WillRunTask() never succeed. std::memory_order_relaxed is sufficient
+ // because this task source never needs to be re-enqueued after Cancel().
+ state_.Cancel();
+}
+
+bool JobTaskSource::WaitForParticipationOpportunity() {
+ CheckedAutoLock auto_lock(lock_);
+
+ // std::memory_order_relaxed is sufficient because no other state is
+ // synchronized with |state_| outside of |lock_|.
+ auto state = state_.Load();
+ size_t max_concurrency = GetMaxConcurrency();
+
+ // Wait until either:
+ // A) |worker_count| is below or equal to max concurrency and state is not
+ // canceled.
+ // B) All other workers returned and |worker_count| is 1.
+ while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
+ state.worker_count() == 1)) {
+ // std::memory_order_relaxed is sufficient because no other state is
+ // synchronized with |join_flag_| outside of |lock_|.
+ join_flag_.SetWaiting();
+
+ // To avoid unnecessarily waiting, if either condition A) or B) change
+ // |lock_| is taken and |worker_released_condition_| signaled if necessary:
+ // 1- In DidProcessTask(), after worker count is decremented.
+ // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
+ worker_released_condition_->Wait();
+ state = state_.Load();
+ max_concurrency = GetMaxConcurrency();
+ }
+ // Case A:
+ if (state.worker_count() <= max_concurrency && !state.is_canceled())
+ return true;
+ // Case B:
+ // Only the joining thread remains.
+ DCHECK_EQ(state.worker_count(), 1U);
+ DCHECK(state.is_canceled() || max_concurrency == 0U);
+ state_.DecrementWorkerCountFromJoiningThread();
+ return false;
+}
+
TaskSource::RunStatus JobTaskSource::WillRunTask() {
- // When this call is caused by an increase of max concurrency followed by an
- // associated NotifyConcurrencyIncrease(), the priority queue lock guarantees
- // an happens-after relation with NotifyConcurrencyIncrease(). The memory
- // operations on |worker_count| below and in DidProcessTask() use
- // std::memory_order_release and std::memory_order_acquire respectively to
- // establish a Release-Acquire ordering. This ensures that all memory
- // side-effects made before this point, including an increase of max
- // concurrency followed by NotifyConcurrencyIncrease() are visible to a
- // DidProcessTask() call which is ordered after this one.
const size_t max_concurrency = GetMaxConcurrency();
- size_t worker_count_before_add =
- worker_count_.load(std::memory_order_relaxed);
-
- // std::memory_order_release on success to make the newest |max_concurrency|
- // visible to a thread that calls DidProcessTask() containing a matching
- // std::memory_order_acquire.
- while (worker_count_before_add < max_concurrency &&
- !worker_count_.compare_exchange_weak(
- worker_count_before_add, worker_count_before_add + 1,
- std::memory_order_release, std::memory_order_relaxed)) {
- }
+ // std::memory_order_release on success to establish Release-Acquire ordering
+ // with read operations (see Memory Ordering comment at top of the file).
+ const auto state_before_add =
+ state_.TryIncrementWorkerCountFromWorkerRelease(max_concurrency);
+
// Don't allow this worker to run the task if either:
- // A) |worker_count_| is already at |max_concurrency|.
- // B) |max_concurrency| was lowered below or to |worker_count_|.
- // C) |worker_count_| was invalidated.
- if (worker_count_before_add >= max_concurrency) {
- // The caller is prevented from running a task from this TaskSource.
+ // A) |state_| was canceled.
+ // B) |worker_count| is already at |max_concurrency|.
+ // C) |max_concurrency| was lowered below or to |worker_count|.
+ // Case A:
+ if (state_before_add.is_canceled())
+ return RunStatus::kDisallowed;
+ const size_t worker_count_before_add = state_before_add.worker_count();
+ // Case B) or C):
+ if (worker_count_before_add >= max_concurrency)
return RunStatus::kDisallowed;
- }
DCHECK_LT(worker_count_before_add, max_concurrency);
return max_concurrency == worker_count_before_add + 1
@@ -96,15 +268,23 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
- const size_t worker_count = worker_count_.load(std::memory_order_relaxed);
+ const auto state = state_.Load();
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
- if (worker_count > max_concurrency)
+ if (state.is_canceled() || state.worker_count() > max_concurrency)
return 0;
- return max_concurrency - worker_count;
+ return max_concurrency - state.worker_count();
}
void JobTaskSource::NotifyConcurrencyIncrease() {
+ {
+ // Lock is taken to access |join_flag_| below and signal
+ // |worker_released_condition_|.
+ CheckedAutoLock auto_lock(lock_);
+ if (join_flag_.ShouldWorkerSignal())
+ worker_released_condition_->Signal();
+ }
+
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
@@ -125,6 +305,14 @@ size_t JobTaskSource::GetMaxConcurrency() const {
return max_concurrency_callback_.Run();
}
+bool JobTaskSource::ShouldYield() {
+ // It is safe to read |join_flag_| without a lock since this
+ // variable is atomic, keeping in mind that threads may not immediately see
+ // the new value when it is updated.
+ return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
+ state_.Load().is_canceled();
+}
+
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
@@ -151,45 +339,36 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
#endif // DCHECK_IS_ON()
Optional<Task> JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
- DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U);
- DCHECK(worker_task_);
- return base::make_optional<Task>(from_here_, worker_task_, TimeDelta());
+ // JobTaskSource members are not lock-protected so no need to acquire a lock
+ // if |transaction| is nullptr.
+ DCHECK_GT(state_.Load().worker_count(), 0U);
+ DCHECK(primary_task_);
+ return base::make_optional<Task>(from_here_, primary_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
- size_t worker_count_before_sub =
- worker_count_.load(std::memory_order_relaxed);
-
- // std::memory_order_acquire on |worker_count_| is necessary to establish
- // Release-Acquire ordering (see WillRunTask()).
- // When the task source needs to be queued, either because the current task
- // yielded or because of NotifyConcurrencyIncrease(), one of the following is
- // true:
- // A) The JobTaskSource is already in the queue (no worker picked up the
- // extra work yet): Incorrectly returning false is fine and the memory
- // barrier may be ineffective.
- // B) The JobTaskSource() is no longer in the queue: The Release-Acquire
- // ordering with WillRunTask() established by |worker_count| ensures that
- // the upcoming call for GetMaxConcurrency() happens-after any
- // NotifyConcurrencyIncrease() that happened-before WillRunTask(). If
- // this task completed because it yielded, this barrier guarantees that
- // it sees an up-to-date concurrency value and correctly re-enqueues.
- //
- // Note that stale values the other way around (incorrectly re-enqueuing) are
- // not an issue because the queues support empty task sources.
- while (worker_count_before_sub != kInvalidWorkerCount &&
- !worker_count_.compare_exchange_weak(
- worker_count_before_sub, worker_count_before_sub - 1,
- std::memory_order_acquire, std::memory_order_relaxed)) {
- }
- if (worker_count_before_sub == kInvalidWorkerCount)
+ // Lock is needed to access |join_flag_| below and signal
+ // |worker_released_condition_|. If |transaction|, then |lock_| is already
+ // taken.
+ CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
+ AnnotateAcquiredLockAlias annotate(lock_, lock_);
+
+ // std::memory_order_acquire to establish Release-Acquire ordering with
+ // WillRunTask() (see Memory Ordering comment at top of the file).
+ const auto state_before_sub = state_.DecrementWorkerCountFromWorkerAcquire();
+
+ if (join_flag_.ShouldWorkerSignal())
+ worker_released_condition_->Signal();
+
+ // A canceled task source should never get re-enqueued.
+ if (state_before_sub.is_canceled())
return false;
- DCHECK_GT(worker_count_before_sub, 0U);
+ DCHECK_GT(state_before_sub.worker_count(), 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
- return worker_count_before_sub <= GetMaxConcurrency();
+ return state_before_sub.worker_count() <= GetMaxConcurrency();
}
SequenceSortKey JobTaskSource::GetSortKey() const {
@@ -197,12 +376,7 @@ SequenceSortKey JobTaskSource::GetSortKey() const {
}
Optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
- // Invalidate |worker_count_| so that further calls to WillRunTask() never
- // succeed. std::memory_order_relaxed is sufficient because this task source
- // never needs to be re-enqueued after Clear().
- size_t worker_count_before_store =
- worker_count_.exchange(kInvalidWorkerCount, std::memory_order_relaxed);
- DCHECK_GT(worker_count_before_store, 0U);
+ Cancel();
// Nothing is cleared since other workers might still racily run tasks. For
// simplicity, the destructor will take care of it once all references are
// released.