summaryrefslogtreecommitdiffstats
path: root/chromium/base/task
diff options
context:
space:
mode:
authorAllan Sandfeld Jensen <allan.jensen@qt.io>2020-01-23 17:21:03 +0100
committerAllan Sandfeld Jensen <allan.jensen@qt.io>2020-01-23 16:25:15 +0000
commitc551f43206405019121bd2b2c93714319a0a3300 (patch)
tree1f48c30631c421fd4bbb3c36da20183c8a2ed7d7 /chromium/base/task
parent7961cea6d1041e3e454dae6a1da660b453efd238 (diff)
BASELINE: Update Chromium to 79.0.3945.139
Change-Id: I336b7182fab9bca80b709682489c07db112eaca5 Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/base/task')
-rw-r--r--chromium/base/task/post_job.cc94
-rw-r--r--chromium/base/task/post_job.h100
-rw-r--r--chromium/base/task/post_job_unittest.cc40
-rw-r--r--chromium/base/task/post_task.cc57
-rw-r--r--chromium/base/task/post_task.h47
-rw-r--r--chromium/base/task/post_task_unittest.cc157
-rw-r--r--chromium/base/task/promise/abstract_promise.cc67
-rw-r--r--chromium/base/task/promise/abstract_promise.h208
-rw-r--r--chromium/base/task/promise/abstract_promise_unittest.cc13
-rw-r--r--chromium/base/task/promise/dependent_list.h2
-rw-r--r--chromium/base/task/promise/finally_executor.h12
-rw-r--r--chromium/base/task/promise/helpers.cc50
-rw-r--r--chromium/base/task/promise/helpers.h146
-rw-r--r--chromium/base/task/promise/helpers_unittest.cc17
-rw-r--r--chromium/base/task/promise/no_op_promise_executor.cc13
-rw-r--r--chromium/base/task/promise/no_op_promise_executor.h8
-rw-r--r--chromium/base/task/promise/post_task_executor.h12
-rw-r--r--chromium/base/task/promise/post_task_executor_unittest.cc22
-rw-r--r--chromium/base/task/promise/promise.h335
-rw-r--r--chromium/base/task/promise/promise_unittest.cc88
-rw-r--r--chromium/base/task/promise/promise_value.h6
-rw-r--r--chromium/base/task/promise/then_and_catch_executor.cc18
-rw-r--r--chromium/base/task/promise/then_and_catch_executor.h87
-rw-r--r--chromium/base/task/sequence_manager/sequence_manager.h19
-rw-r--r--chromium/base/task/sequence_manager/sequence_manager_impl.cc84
-rw-r--r--chromium/base/task/sequence_manager/sequence_manager_impl.h19
-rw-r--r--chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc16
-rw-r--r--chromium/base/task/sequence_manager/sequenced_task_source.h8
-rw-r--r--chromium/base/task/sequence_manager/task_queue_impl.cc14
-rw-r--r--chromium/base/task/sequence_manager/task_queue_selector.cc9
-rw-r--r--chromium/base/task/sequence_manager/task_queue_selector.h6
-rw-r--r--chromium/base/task/sequence_manager/task_queue_selector_unittest.cc45
-rw-r--r--chromium/base/task/sequence_manager/tasks.cc15
-rw-r--r--chromium/base/task/sequence_manager/tasks.h14
-rw-r--r--chromium/base/task/sequence_manager/thread_controller_impl.cc4
-rw-r--r--chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc4
-rw-r--r--chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc18
-rw-r--r--chromium/base/task/sequence_manager/work_queue.cc10
-rw-r--r--chromium/base/task/sequence_manager/work_queue.h5
-rw-r--r--chromium/base/task/sequence_manager/work_queue_sets.cc13
-rw-r--r--chromium/base/task/sequence_manager/work_queue_sets.h6
-rw-r--r--chromium/base/task/sequence_manager/work_queue_sets_unittest.cc30
-rw-r--r--chromium/base/task/sequence_manager/work_queue_unittest.cc30
-rw-r--r--chromium/base/task/simple_task_executor.cc62
-rw-r--r--chromium/base/task/simple_task_executor.h52
-rw-r--r--chromium/base/task/single_thread_task_executor.cc3
-rw-r--r--chromium/base/task/single_thread_task_executor.h2
-rw-r--r--chromium/base/task/single_thread_task_executor_unittest.cc58
-rw-r--r--chromium/base/task/task_executor.cc19
-rw-r--r--chromium/base/task/task_executor.h7
-rw-r--r--chromium/base/task/task_traits.h38
-rw-r--r--chromium/base/task/task_traits_unittest.nc2
-rw-r--r--chromium/base/task/thread_pool/historical_histogram_data.md92
-rw-r--r--chromium/base/task/thread_pool/initialization_util.cc3
-rw-r--r--chromium/base/task/thread_pool/job_task_source.cc330
-rw-r--r--chromium/base/task/thread_pool/job_task_source.h140
-rw-r--r--chromium/base/task/thread_pool/job_task_source_unittest.cc119
-rw-r--r--chromium/base/task/thread_pool/pooled_task_runner_delegate.h4
-rw-r--r--chromium/base/task/thread_pool/priority_queue.cc8
-rw-r--r--chromium/base/task/thread_pool/priority_queue.h2
-rw-r--r--chromium/base/task/thread_pool/priority_queue_unittest.cc14
-rw-r--r--chromium/base/task/thread_pool/sequence.cc26
-rw-r--r--chromium/base/task/thread_pool/sequence.h3
-rw-r--r--chromium/base/task/thread_pool/sequence_unittest.cc3
-rw-r--r--chromium/base/task/thread_pool/service_thread.cc28
-rw-r--r--chromium/base/task/thread_pool/service_thread_unittest.cc8
-rw-r--r--chromium/base/task/thread_pool/task_source.cc6
-rw-r--r--chromium/base/task/thread_pool/task_source.h23
-rw-r--r--chromium/base/task/thread_pool/task_tracker.cc316
-rw-r--r--chromium/base/task/thread_pool/task_tracker.h46
-rw-r--r--chromium/base/task/thread_pool/task_tracker_unittest.cc16
-rw-r--r--chromium/base/task/thread_pool/test_utils.cc5
-rw-r--r--chromium/base/task/thread_pool/test_utils.h1
-rw-r--r--chromium/base/task/thread_pool/thread_group.cc4
-rw-r--r--chromium/base/task/thread_pool/thread_group.h2
-rw-r--r--chromium/base/task/thread_pool/thread_group_impl.cc123
-rw-r--r--chromium/base/task/thread_pool/thread_group_impl.h5
-rw-r--r--chromium/base/task/thread_pool/thread_group_unittest.cc76
-rw-r--r--chromium/base/task/thread_pool/thread_pool_impl.cc27
-rw-r--r--chromium/base/task/thread_pool/thread_pool_impl.h5
80 files changed, 2676 insertions, 970 deletions
diff --git a/chromium/base/task/post_job.cc b/chromium/base/task/post_job.cc
index e57b32e4b40..3c83068995a 100644
--- a/chromium/base/task/post_job.cc
+++ b/chromium/base/task/post_job.cc
@@ -4,8 +4,11 @@
#include "base/task/post_job.h"
+#include "base/task/scoped_set_task_priority_for_current_thread.h"
#include "base/task/thread_pool/job_task_source.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
+#include "base/task/thread_pool/thread_pool_impl.h"
+#include "base/task/thread_pool/thread_pool_instance.h"
namespace base {
namespace experimental {
@@ -16,7 +19,6 @@ JobDelegate::JobDelegate(
: task_source_(task_source),
pooled_task_runner_delegate_(pooled_task_runner_delegate) {
DCHECK(task_source_);
- DCHECK(pooled_task_runner_delegate_);
#if DCHECK_IS_ON()
recorded_increase_version_ = task_source_->GetConcurrencyIncreaseVersion();
// Record max concurrency before running the worker task.
@@ -42,7 +44,9 @@ bool JobDelegate::ShouldYield() {
AssertExpectedConcurrency(recorded_max_concurrency_);
#endif // DCHECK_IS_ON()
const bool should_yield =
- pooled_task_runner_delegate_->ShouldYield(task_source_);
+ task_source_->ShouldYield() ||
+ (pooled_task_runner_delegate_ &&
+ pooled_task_runner_delegate_->ShouldYield(task_source_));
#if DCHECK_IS_ON()
last_should_yield_ = should_yield;
@@ -98,5 +102,91 @@ void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) {
#endif // DCHECK_IS_ON()
}
+JobHandle::JobHandle() = default;
+
+JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
+ : task_source_(std::move(task_source)) {}
+
+JobHandle::~JobHandle() {
+ DCHECK(!task_source_)
+ << "The Job must be cancelled, detached or joined before its "
+ "JobHandle is destroyed.";
+}
+
+JobHandle::JobHandle(JobHandle&&) = default;
+
+JobHandle& JobHandle::operator=(JobHandle&& other) {
+ DCHECK(!task_source_)
+ << "The Job must be cancelled, detached or joined before its "
+ "JobHandle is re-assigned.";
+ task_source_ = std::move(other.task_source_);
+ return *this;
+}
+
+void JobHandle::UpdatePriority(TaskPriority new_priority) {
+ task_source_->delegate()->UpdatePriority(task_source_, new_priority);
+}
+
+void JobHandle::NotifyConcurrencyIncrease() {
+ task_source_->NotifyConcurrencyIncrease();
+}
+
+void JobHandle::Join() {
+ DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
+ task_source_->priority_racy())
+ << "Join may not be called on Job with higher priority than the current "
+ "one.";
+ UpdatePriority(internal::GetTaskPriorityForCurrentThread());
+ bool must_run = task_source_->WillJoin();
+ while (must_run)
+ must_run = task_source_->RunJoinTask();
+ // Remove |task_source_| from the ThreadPool to prevent access to
+ // |max_concurrency_callback| after Join().
+ task_source_->delegate()->RemoveJobTaskSource(task_source_);
+ task_source_ = nullptr;
+}
+
+void JobHandle::Cancel() {
+ task_source_->Cancel();
+ Join();
+}
+
+void JobHandle::CancelAndDetach() {
+ task_source_->Cancel();
+ Detach();
+}
+
+void JobHandle::Detach() {
+ DCHECK(task_source_);
+ task_source_ = nullptr;
+}
+
+JobHandle PostJob(const Location& from_here,
+ const TaskTraits& traits,
+ RepeatingCallback<void(JobDelegate*)> worker_task,
+ RepeatingCallback<size_t()> max_concurrency_callback) {
+ DCHECK(ThreadPoolInstance::Get())
+ << "Ref. Prerequisite section of post_task.h.\n\n"
+ "Hint: if this is in a unit test, you're likely merely missing a "
+ "base::test::TaskEnvironment member in your fixture.\n";
+ DCHECK(traits.use_thread_pool())
+ << "The base::ThreadPool() trait is mandatory with PostJob().";
+ DCHECK_EQ(traits.extension_id(),
+ TaskTraitsExtensionStorage::kInvalidExtensionId)
+ << "Extension traits cannot be used with PostJob().";
+ TaskTraits adjusted_traits = traits;
+ adjusted_traits.InheritPriority(internal::GetTaskPriorityForCurrentThread());
+ auto task_source = base::MakeRefCounted<internal::JobTaskSource>(
+ from_here, adjusted_traits, std::move(worker_task),
+ std::move(max_concurrency_callback),
+ static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
+ const bool queued =
+ static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())
+ ->EnqueueJobTaskSource(task_source);
+ if (queued)
+ return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
+ return JobHandle();
+}
+
} // namespace experimental
} // namespace base \ No newline at end of file
diff --git a/chromium/base/task/post_job.h b/chromium/base/task/post_job.h
index de6b2d66ac3..cf8ca1aec70 100644
--- a/chromium/base/task/post_job.h
+++ b/chromium/base/task/post_job.h
@@ -6,8 +6,12 @@
#define BASE_TASK_POST_JOB_H_
#include "base/base_export.h"
+#include "base/callback.h"
+#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
+#include "base/memory/ref_counted.h"
+#include "base/task/task_traits.h"
#include "base/time/time.h"
namespace base {
@@ -23,8 +27,9 @@ class BASE_EXPORT JobDelegate {
public:
// A JobDelegate is instantiated for each worker task that is run.
// |task_source| is the task source whose worker task is running with this
- // delegate and |pooled_task_runner_delegate| provides communication with the
- // thread pool.
+ // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to
+ // check whether the pool wants this worker task to yield (null if this worker
+ // should never yield -- e.g. when the main thread is a worker).
JobDelegate(internal::JobTaskSource* task_source,
internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate);
~JobDelegate();
@@ -42,7 +47,7 @@ class BASE_EXPORT JobDelegate {
void YieldIfNeeded();
// Notifies the scheduler that max concurrency was increased, and the number
- // of worker should be adjusted.
+ // of worker should be adjusted accordingly. See PostJob() for more details.
void NotifyConcurrencyIncrease();
private:
@@ -67,6 +72,95 @@ class BASE_EXPORT JobDelegate {
DISALLOW_COPY_AND_ASSIGN(JobDelegate);
};
+// Handle returned when posting a Job. Provides methods to control execution of
+// the posted Job.
+class BASE_EXPORT JobHandle {
+ public:
+ JobHandle();
+ // A job must either be joined, canceled or detached before the JobHandle is
+ // destroyed.
+ ~JobHandle();
+
+ JobHandle(JobHandle&&);
+ JobHandle& operator=(JobHandle&&);
+
+ // Update this Job's priority.
+ void UpdatePriority(TaskPriority new_priority);
+
+ // Notifies the scheduler that max concurrency was increased, and the number
+ // of workers should be adjusted accordingly. See PostJob() for more details.
+ void NotifyConcurrencyIncrease();
+
+ // Contributes to the job on this thread. Doesn't return until all tasks have
+ // completed and max concurrency becomes 0. This also promotes this Job's
+ // priority to be at least as high as the calling thread's priority.
+ void Join();
+
+ // Forces all existing workers to yield ASAP. Waits until they have all
+ // returned from the Job's callback before returning.
+ void Cancel();
+
+ // Forces all existing workers to yield ASAP but doesn’t wait for them.
+ // Warning, this is dangerous if the Job's callback is bound to or has access
+ // to state which may be deleted after this call.
+ void CancelAndDetach();
+
+ // Can be invoked before ~JobHandle() to avoid waiting on the job completing.
+ void Detach();
+
+ private:
+ friend class internal::JobTaskSource;
+
+ explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source);
+
+ scoped_refptr<internal::JobTaskSource> task_source_;
+
+ DISALLOW_COPY_AND_ASSIGN(JobHandle);
+};
+
+// Posts a repeating |worker_task| with specific |traits| to run in parallel.
+// Returns a JobHandle associated with the Job, which can be joined, canceled or
+// detached.
+// To avoid scheduling overhead, |worker_task| should do as much work as
+// possible in a loop when invoked, and JobDelegate::ShouldYield() should be
+// periodically invoked to conditionally exit and let the scheduler prioritize
+// work.
+//
+// A canonical implementation of |worker_task| looks like:
+// void WorkerTask(JobDelegate* job_delegate) {
+// while (!job_delegate->ShouldYield()) {
+// auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work.
+// if (!work_item)
+// return:
+// ProcessWork(work_item);
+// }
+// }
+//
+// |max_concurrency_callback| controls the maximum number of threads calling
+// |worker_task| concurrently. |worker_task| is only invoked if the number of
+// threads previously running |worker_task| was less than the value returned by
+// |max_concurrency_callback|. In general, |max_concurrency_callback| should
+// return the latest number of incomplete work items (smallest unit of work)
+// left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must*
+// be invoked shortly after |max_concurrency_callback| starts returning a value
+// larger than previously returned values. This usually happens when new work
+// items are added and the API user wants additional threads to invoke
+// |worker_task| concurrently. The callbacks may be called concurrently on any
+// thread until the job is complete. If the job handle is detached, the
+// callbacks may still be called, so they must not access global state that
+// could be destroyed.
+//
+// |traits| requirements:
+// - base::ThreadPool() must be specified.
+// - Extension traits (e.g. BrowserThread) cannot be specified.
+// - base::ThreadPolicy must be specified if the priority of the task runner
+// will ever be increased from BEST_EFFORT.
+JobHandle BASE_EXPORT
+PostJob(const Location& from_here,
+ const TaskTraits& traits,
+ RepeatingCallback<void(JobDelegate*)> worker_task,
+ RepeatingCallback<size_t()> max_concurrency_callback);
+
} // namespace experimental
} // namespace base
diff --git a/chromium/base/task/post_job_unittest.cc b/chromium/base/task/post_job_unittest.cc
new file mode 100644
index 00000000000..3fc3d6d4b07
--- /dev/null
+++ b/chromium/base/task/post_job_unittest.cc
@@ -0,0 +1,40 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task/post_job.h"
+
+#include <atomic>
+
+#include "base/task/test_task_traits_extension.h"
+#include "base/test/bind_test_util.h"
+#include "base/test/gtest_util.h"
+#include "base/test/task_environment.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+TEST(PostJobTest, PostJobSimple) {
+ test::TaskEnvironment task_environment;
+ std::atomic_size_t num_tasks_to_run(4);
+ auto handle = experimental::PostJob(
+ FROM_HERE, ThreadPool(),
+ BindLambdaForTesting(
+ [&](experimental::JobDelegate* delegate) { --num_tasks_to_run; }),
+ BindLambdaForTesting([&]() -> size_t { return num_tasks_to_run; }));
+ handle.Join();
+ DCHECK_EQ(num_tasks_to_run, 0U);
+}
+
+TEST(PostJobTest, PostJobExtension) {
+ testing::FLAGS_gtest_death_test_style = "threadsafe";
+ EXPECT_DCHECK_DEATH({
+ auto handle = experimental::PostJob(
+ FROM_HERE, TestExtensionBoolTrait(),
+ BindRepeating([](experimental::JobDelegate* delegate) {}),
+ BindRepeating([]() -> size_t { return 0; }));
+ });
+}
+
+} // namespace base \ No newline at end of file
diff --git a/chromium/base/task/post_task.cc b/chromium/base/task/post_task.cc
index 76415fbfe56..244e5f6adc8 100644
--- a/chromium/base/task/post_task.cc
+++ b/chromium/base/task/post_task.cc
@@ -41,6 +41,13 @@ TaskTraits GetTaskTraitsWithExplicitPriority(TaskTraits traits) {
}
TaskExecutor* GetTaskExecutorForTraits(const TaskTraits& traits) {
+ if (traits.use_current_thread()) {
+ TaskExecutor* executor = GetTaskExecutorForCurrentThread();
+ DCHECK(executor) << "Couldn't find a TaskExecutor for this thread. Note "
+ "you can't use base::CurrentThread in a one-off "
+ "base::ThreadPool task.";
+ return executor;
+ }
TaskExecutor* executor = GetRegisteredTaskExecutorForTraits(traits);
DCHECK(executor || ThreadPoolInstance::Get())
<< "Ref. Prerequisite section of post_task.h.\n\n"
@@ -139,54 +146,4 @@ scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(
}
#endif // defined(OS_WIN)
-// TODO(crbug.com/968047): Update all call sites and remove these forwarding
-// wrappers.
-bool PostTaskWithTraits(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task) {
- return PostTask(from_here, traits, std::move(task));
-}
-
-bool PostDelayedTaskWithTraits(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task,
- TimeDelta delay) {
- return PostDelayedTask(from_here, traits, std::move(task), delay);
-}
-
-bool PostTaskWithTraitsAndReply(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task,
- OnceClosure reply) {
- return PostTaskAndReply(from_here, traits, std::move(task), std::move(reply));
-}
-
-scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(const TaskTraits& traits) {
- return CreateTaskRunner(traits);
-}
-
-scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits(
- const TaskTraits& traits) {
- return CreateSequencedTaskRunner(traits);
-}
-
-scoped_refptr<UpdateableSequencedTaskRunner>
-CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits) {
- return CreateUpdateableSequencedTaskRunner(traits);
-}
-
-scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits(
- const TaskTraits& traits,
- SingleThreadTaskRunnerThreadMode thread_mode) {
- return CreateSingleThreadTaskRunner(traits, thread_mode);
-}
-
-#if defined(OS_WIN)
-scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunnerWithTraits(
- const TaskTraits& traits,
- SingleThreadTaskRunnerThreadMode thread_mode) {
- return CreateCOMSTATaskRunner(traits, thread_mode);
-}
-#endif // defined(OS_WIN)
-
} // namespace base
diff --git a/chromium/base/task/post_task.h b/chromium/base/task/post_task.h
index b02fd65a032..9f9b5a24fa3 100644
--- a/chromium/base/task/post_task.h
+++ b/chromium/base/task/post_task.h
@@ -171,21 +171,6 @@ bool PostTaskAndReplyWithResult(const Location& from_here,
std::move(reply), Owned(result)));
}
-// Temporary wrapper for PostTaskAndReplyWithResult.
-// TODO(crbug.com/968047): Update all call sites and remove.
-template <template <typename> class CallbackType,
- typename TaskReturnType,
- typename ReplyArgType,
- typename = EnableIfIsBaseCallback<CallbackType>>
-bool PostTaskWithTraitsAndReplyWithResult(
- const Location& from_here,
- const TaskTraits& traits,
- CallbackType<TaskReturnType()> task,
- CallbackType<void(ReplyArgType)> reply) {
- return PostTaskAndReplyWithResult(from_here, traits, std::move(task),
- std::move(reply));
-}
-
// Returns a TaskRunner whose PostTask invocations result in scheduling tasks
// using |traits|. Tasks may run in any order and in parallel.
BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunner(
@@ -247,38 +232,6 @@ BASE_EXPORT scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(
SingleThreadTaskRunnerThreadMode::SHARED);
#endif // defined(OS_WIN)
-// Temporary wrappers for the task posting APIs while we remove the "WithTraits"
-// suffix.
-// TODO(crbug.com/968047): Update all call sites and remove.
-BASE_EXPORT bool PostTaskWithTraits(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task);
-BASE_EXPORT bool PostDelayedTaskWithTraits(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task,
- TimeDelta delay);
-BASE_EXPORT bool PostTaskWithTraitsAndReply(const Location& from_here,
- const TaskTraits& traits,
- OnceClosure task,
- OnceClosure reply);
-BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(
- const TaskTraits& traits);
-BASE_EXPORT scoped_refptr<SequencedTaskRunner>
-CreateSequencedTaskRunnerWithTraits(const TaskTraits& traits);
-BASE_EXPORT scoped_refptr<UpdateableSequencedTaskRunner>
-CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits);
-BASE_EXPORT scoped_refptr<SingleThreadTaskRunner>
-CreateSingleThreadTaskRunnerWithTraits(
- const TaskTraits& traits,
- SingleThreadTaskRunnerThreadMode thread_mode =
- SingleThreadTaskRunnerThreadMode::SHARED);
-#if defined(OS_WIN)
-BASE_EXPORT scoped_refptr<SingleThreadTaskRunner>
-CreateCOMSTATaskRunnerWithTraits(const TaskTraits& traits,
- SingleThreadTaskRunnerThreadMode thread_mode =
- SingleThreadTaskRunnerThreadMode::SHARED);
-#endif // defined(OS_WIN)
-
} // namespace base
#endif // BASE_TASK_POST_TASK_H_
diff --git a/chromium/base/task/post_task_unittest.cc b/chromium/base/task/post_task_unittest.cc
index ed7fb3db3bb..798eba8e580 100644
--- a/chromium/base/task/post_task_unittest.cc
+++ b/chromium/base/task/post_task_unittest.cc
@@ -5,9 +5,11 @@
#include "base/task/post_task.h"
#include "base/bind_helpers.h"
+#include "base/run_loop.h"
#include "base/task/scoped_set_task_priority_for_current_thread.h"
#include "base/task/task_executor.h"
#include "base/task/test_task_traits_extension.h"
+#include "base/test/bind_test_util.h"
#include "base/test/gtest_util.h"
#include "base/test/task_environment.h"
#include "base/test/test_simple_task_runner.h"
@@ -17,6 +19,8 @@
using ::testing::_;
using ::testing::Invoke;
+using ::testing::IsNull;
+using ::testing::NotNull;
using ::testing::Return;
namespace base {
@@ -177,6 +181,159 @@ TEST_F(PostTaskTestWithExecutor, PostTaskToTaskExecutor) {
}
}
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolTaskRunnerGetTaskExecutorForCurrentThread) {
+ auto task_runner = CreateTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ // We don't have an executor for a ThreadPool task runner becuse they
+ // are for one shot tasks.
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull());
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolSequencedTaskRunnerGetTaskExecutorForCurrentThread) {
+ auto sequenced_task_runner = CreateSequencedTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(sequenced_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull());
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolSingleThreadTaskRunnerGetTaskExecutorForCurrentThread) {
+ auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(single_thread_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull());
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor, ThreadPoolTaskRunnerCurrentThreadTrait) {
+ auto task_runner = CreateTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
+ // CurrentThread is meaningless in this
+ // context.
+ EXPECT_DCHECK_DEATH(
+ PostTask(FROM_HERE, {CurrentThread()},
+ DoNothing()));
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolSequencedTaskRunnerCurrentThreadTrait) {
+ auto sequenced_task_runner = CreateSequencedTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ auto current_thread_task = BindLambdaForTesting([&]() {
+ EXPECT_TRUE(sequenced_task_runner->RunsTasksInCurrentSequence());
+ run_loop.Quit();
+ });
+
+ EXPECT_TRUE(sequenced_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_TRUE(
+ PostTask(FROM_HERE, {CurrentThread()}, current_thread_task));
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolSingleThreadTaskRunnerCurrentThreadTrait) {
+ auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ auto current_thread_task = BindLambdaForTesting([&]() {
+ EXPECT_TRUE(single_thread_task_runner->RunsTasksInCurrentSequence());
+ run_loop.Quit();
+ });
+
+ EXPECT_TRUE(single_thread_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_TRUE(
+ PostTask(FROM_HERE, {CurrentThread()}, current_thread_task));
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor, ThreadPoolCurrentThreadChangePriority) {
+ auto single_thread_task_runner =
+ CreateSingleThreadTaskRunner({ThreadPool(), TaskPriority::USER_BLOCKING});
+ RunLoop run_loop;
+
+ auto current_thread_task = BindLambdaForTesting([&]() {
+ EXPECT_TRUE(single_thread_task_runner->RunsTasksInCurrentSequence());
+ run_loop.Quit();
+ });
+
+ EXPECT_TRUE(single_thread_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ // We should be able to request a priority change, although it may be
+ // ignored.
+ EXPECT_TRUE(PostTask(FROM_HERE,
+ {CurrentThread(), TaskPriority::USER_VISIBLE},
+ current_thread_task));
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolCurrentThreadCantChangeShutdownBehavior) {
+ auto single_thread_task_runner = CreateSingleThreadTaskRunner(
+ {ThreadPool(), TaskShutdownBehavior::SKIP_ON_SHUTDOWN});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(single_thread_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_DCHECK_DEATH(PostTask(
+ FROM_HERE, {CurrentThread(), TaskShutdownBehavior::BLOCK_SHUTDOWN},
+ DoNothing()));
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
+TEST_F(PostTaskTestWithExecutor,
+ ThreadPoolCurrentThreadCantSetSyncPrimitivesInNonSyncTaskRunner) {
+ auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()});
+ RunLoop run_loop;
+
+ EXPECT_TRUE(single_thread_task_runner->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_DCHECK_DEATH(
+ PostTask(FROM_HERE, {CurrentThread(), WithBaseSyncPrimitives()},
+ DoNothing()));
+ run_loop.Quit();
+ })));
+
+ run_loop.Run();
+}
+
TEST_F(PostTaskTestWithExecutor, RegisterExecutorTwice) {
testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DCHECK_DEATH(
diff --git a/chromium/base/task/promise/abstract_promise.cc b/chromium/base/task/promise/abstract_promise.cc
index f48ba8517e2..315cfd30bfc 100644
--- a/chromium/base/task/promise/abstract_promise.cc
+++ b/chromium/base/task/promise/abstract_promise.cc
@@ -8,6 +8,7 @@
#include "base/lazy_instance.h"
#include "base/sequenced_task_runner.h"
#include "base/task/promise/dependent_list.h"
+#include "base/task/promise/post_task_executor.h"
#include "base/threading/sequenced_task_runner_handle.h"
namespace base {
@@ -65,6 +66,10 @@ AbstractPromise::~AbstractPromise() {
OnCanceled();
}
+void AbstractPromise::EmplaceResolvedVoid() {
+ emplace(Resolved<void>());
+}
+
bool AbstractPromise::IsCanceled() const {
if (dependents_.IsCanceled())
return true;
@@ -290,7 +295,7 @@ AbstractPromise* AbstractPromise::GetCurriedPromise() {
const PromiseExecutor* AbstractPromise::GetExecutor() const {
if (!value_.ContainsPromiseExecutor())
return nullptr;
- return value_.Get<internal::PromiseExecutor>();
+ return value_.Get<PromiseExecutor>();
}
PromiseExecutor::PrerequisitePolicy AbstractPromise::GetPrerequisitePolicy() {
@@ -509,7 +514,7 @@ void AbstractPromise::OnRejectMakeDependantsUseCurriedPrerequisite(
void AbstractPromise::DispatchPromise() {
if (task_runner_) {
- task_runner_->PostPromiseInternal(this, TimeDelta());
+ task_runner_->PostPromiseInternal(WrappedPromise(this), TimeDelta());
} else {
Execute();
}
@@ -670,14 +675,66 @@ void AbstractPromise::AdjacencyList::Clear() {
prerequisite_list_.clear();
} else {
// If there's multiple prerequisites we can't do that because the
- // DependentList::Nodes may still be in use by some of them. Instead we
- // release our prerequisite references and rely on refcounting to release
- // the owning AbstractPromise.
+ // DependentList::Nodes may still be in use by some of them.
+ // Instead we release our prerequisite references and rely on refcounting to
+ // release the owning AbstractPromise.
for (DependentList::Node& node : prerequisite_list_) {
node.ClearPrerequisite();
}
}
}
+BasePromise::BasePromise() = default;
+
+BasePromise::BasePromise(
+ scoped_refptr<internal::AbstractPromise> abstract_promise)
+ : abstract_promise_(std::move(abstract_promise)) {}
+
+BasePromise::BasePromise(const BasePromise& other) = default;
+BasePromise::BasePromise(BasePromise&& other) = default;
+
+BasePromise& BasePromise::operator=(const BasePromise& other) = default;
+BasePromise& BasePromise::operator=(BasePromise&& other) = default;
+
+BasePromise::~BasePromise() = default;
+
} // namespace internal
+
+WrappedPromise::WrappedPromise() = default;
+
+WrappedPromise::WrappedPromise(scoped_refptr<internal::AbstractPromise> promise)
+ : promise_(std::move(promise)) {}
+
+WrappedPromise::WrappedPromise(internal::PassedPromise&& passed_promise)
+ : promise_(passed_promise.Release(), subtle::kAdoptRefTag) {
+ DCHECK(promise_);
+}
+
+WrappedPromise::WrappedPromise(const Location& from_here, OnceClosure task)
+ : WrappedPromise(internal::AbstractPromise::CreateNoPrerequisitePromise(
+ from_here,
+ RejectPolicy::kMustCatchRejection,
+ internal::DependentList::ConstructUnresolved(),
+ internal::PromiseExecutor::Data(
+ base::in_place_type_t<internal::PostTaskExecutor<void>>(),
+ std::move(task)))) {}
+
+WrappedPromise::WrappedPromise(const WrappedPromise& other) = default;
+WrappedPromise::WrappedPromise(WrappedPromise&& other) = default;
+
+WrappedPromise& WrappedPromise::operator=(const WrappedPromise& other) =
+ default;
+WrappedPromise& WrappedPromise::operator=(WrappedPromise&& other) = default;
+
+WrappedPromise::~WrappedPromise() = default;
+
+void WrappedPromise::Execute() {
+ DCHECK(promise_);
+ promise_->Execute();
+}
+
+void WrappedPromise::Clear() {
+ promise_ = nullptr;
+}
+
} // namespace base
diff --git a/chromium/base/task/promise/abstract_promise.h b/chromium/base/task/promise/abstract_promise.h
index 9f67c688bd4..c16132c187c 100644
--- a/chromium/base/task/promise/abstract_promise.h
+++ b/chromium/base/task/promise/abstract_promise.h
@@ -23,6 +23,9 @@ class TaskRunner;
template <typename ResolveType, typename RejectType>
class ManualPromiseResolver;
+template <typename ResolveType, typename RejectType>
+class Promise;
+
// AbstractPromise Memory Management.
//
// Consider a chain of promises: P1, P2 & P3
@@ -265,13 +268,71 @@ enum class RejectPolicy {
kCatchNotRequired,
};
+class WrappedPromise;
+
namespace internal {
template <typename T, typename... Args>
class PromiseCallbackHelper;
-class PromiseHolder;
+class AbstractPromise;
class AbstractPromiseTest;
+class BasePromise;
+
+// A binary size optimization to reduce the overhead of passing a scoped_refptr
+// to Promise<> returned by PostTask. There are many thousands of PostTasks so
+// even a single extra instruction (such as the scoped_refptr move constructor
+// clearing the pointer) adds up. This is why we're not constructing a Promise<>
+// with a scoped_refptr.
+//
+// The constructor calls AddRef, it's up to the owner of this object to either
+// call Clear (which calls Release) or AbstractPromise in order to pass
+// ownership onto a WrappedPromise.
+class BASE_EXPORT PassedPromise {
+ public:
+ explicit inline PassedPromise(const scoped_refptr<AbstractPromise>& promise);
+
+ PassedPromise() : promise_(nullptr) {}
+
+ PassedPromise(const PassedPromise&) = delete;
+ PassedPromise& operator=(const PassedPromise&) = delete;
+
+#if DCHECK_IS_ON()
+ PassedPromise(PassedPromise&& other) noexcept : promise_(other.promise_) {
+ DCHECK(promise_);
+ other.promise_ = nullptr;
+ }
+
+ PassedPromise& operator=(PassedPromise&& other) noexcept {
+ DCHECK(!promise_);
+ promise_ = other.promise_;
+ DCHECK(promise_);
+ other.promise_ = nullptr;
+ return *this;
+ }
+
+ ~PassedPromise() {
+ DCHECK(!promise_) << "The PassedPromise must be Cleared or passed onto a "
+ "Wrapped Promise";
+ }
+#else
+ PassedPromise(PassedPromise&&) noexcept = default;
+ PassedPromise& operator=(PassedPromise&&) noexcept = default;
+#endif
+
+ AbstractPromise* Release() {
+ AbstractPromise* promise = promise_;
+#if DCHECK_IS_ON()
+ promise_ = nullptr;
+#endif
+ return promise;
+ }
+
+ AbstractPromise* get() const { return promise_; }
+
+ private:
+ AbstractPromise* promise_;
+};
// Internal promise representation, maintains a graph of dependencies and posts
// promises as they become ready. In debug builds various sanity checks are
@@ -308,12 +369,11 @@ class BASE_EXPORT AbstractPromise
RejectPolicy reject_policy,
ConstructType tag,
PromiseExecutor::Data&& executor_data) noexcept {
- scoped_refptr<AbstractPromise> promise =
- subtle::AdoptRefIfNeeded(new internal::AbstractPromise(
- nullptr, from_here, nullptr, reject_policy,
- tag, std::move(executor_data)),
- AbstractPromise::kRefCountPreference);
- return promise;
+ return subtle::AdoptRefIfNeeded(
+ new internal::AbstractPromise(nullptr, from_here, nullptr,
+ reject_policy, tag,
+ std::move(executor_data)),
+ AbstractPromise::kRefCountPreference);
}
AbstractPromise(const AbstractPromise&) = delete;
@@ -347,7 +407,9 @@ class BASE_EXPORT AbstractPromise
public:
PromiseValue& value() { return value_; }
+#if DCHECK_IS_ON()
~ValueHandle() { value_.reset(); }
+#endif
private:
friend class AbstractPromise;
@@ -357,6 +419,8 @@ class BASE_EXPORT AbstractPromise
PromiseValue& value_;
};
+ // Used for promise results that require move semantics. E.g. a promise chain
+ // involving a std::unique_ptr<>.
ValueHandle TakeValue() { return ValueHandle(value_); }
// Returns nullptr if there isn't a curried promise.
@@ -380,6 +444,10 @@ class BASE_EXPORT AbstractPromise
"Use scoped_refptr<AbstractPromise> instead");
}
+ // An out-of line emplace(Resolved<void>()); Useful for reducing binary
+ // bloat in executor templates.
+ void EmplaceResolvedVoid();
+
// This is separate from AbstractPromise to reduce the memory footprint of
// regular PostTask without promise chains.
class BASE_EXPORT AdjacencyList {
@@ -457,9 +525,14 @@ class BASE_EXPORT AbstractPromise
void IgnoreUncaughtCatchForTesting();
- private:
- friend class AbstractPromiseTest;
+ // Signals that this promise was cancelled. If executor hasn't run yet, this
+ // will prevent it from running and cancels any dependent promises unless they
+ // have PrerequisitePolicy::kAny, in which case they will only be canceled if
+ // all of their prerequisites are canceled. If OnCanceled() or OnResolved() or
+ // OnRejected() has already run, this does nothing.
+ void OnCanceled();
+ private:
friend base::RefCountedThreadSafe<AbstractPromise>;
friend class AbstractPromiseTest;
@@ -470,8 +543,6 @@ class BASE_EXPORT AbstractPromise
template <typename T, typename... Args>
friend class PromiseCallbackHelper;
- friend class PromiseHolder;
-
template <typename ConstructType>
AbstractPromise(const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
@@ -520,13 +591,6 @@ class BASE_EXPORT AbstractPromise
// have been canceled, in which case null is returned.
AbstractPromise* FindCurriedAncestor();
- // Signals that this promise was cancelled. If executor hasn't run yet, this
- // will prevent it from running and cancels any dependent promises unless they
- // have PrerequisitePolicy::kAny, in which case they will only be canceled if
- // all of their prerequisites are canceled. If OnCanceled() or OnResolved() or
- // OnRejected() has already run, this does nothing.
- void OnCanceled();
-
// Signals that |value_| now contains a resolve value. Dependent promises may
// scheduled for execution.
void OnResolved();
@@ -714,7 +778,115 @@ class BASE_EXPORT AbstractPromise
std::unique_ptr<AdjacencyList> prerequisites_;
};
+PassedPromise::PassedPromise(const scoped_refptr<AbstractPromise>& promise)
+ : promise_(promise.get()) {
+ promise_->AddRef();
+}
+
+// Non-templatized base class of the Promise<> template. This is a binary size
+// optimization, letting us use an out of line destructor in the template
+// instead of the more complex scoped_refptr<> destructor.
+class BASE_EXPORT BasePromise {
+ public:
+ BasePromise();
+
+ BasePromise(const BasePromise& other);
+ BasePromise(BasePromise&& other) noexcept;
+
+ BasePromise& operator=(const BasePromise& other);
+ BasePromise& operator=(BasePromise&& other) noexcept;
+
+ // We want an out of line destructor to reduce binary size.
+ ~BasePromise();
+
+ // Returns true if the promise is not null.
+ operator bool() const { return abstract_promise_.get(); }
+
+ protected:
+ struct InlineConstructor {};
+
+ explicit BasePromise(
+ scoped_refptr<internal::AbstractPromise> abstract_promise);
+
+ // We want this to be inlined to reduce binary size for the Promise<>
+ // constructor. Its a template to bypass ChromiumStyle plugin which otherwise
+ // insists this is out of line.
+ template <typename T>
+ explicit BasePromise(internal::PassedPromise&& passed_promise,
+ T InlineConstructor)
+ : abstract_promise_(passed_promise.Release(), subtle::kAdoptRefTag) {}
+
+ scoped_refptr<internal::AbstractPromise> abstract_promise_;
+};
+
} // namespace internal
+
+// Wrapper around scoped_refptr<base::internal::AbstractPromise> which is
+// intended for use by TaskRunner implementations.
+class BASE_EXPORT WrappedPromise {
+ public:
+ WrappedPromise();
+
+ explicit WrappedPromise(scoped_refptr<internal::AbstractPromise> promise);
+
+ WrappedPromise(const WrappedPromise& other);
+ WrappedPromise(WrappedPromise&& other) noexcept;
+
+ WrappedPromise& operator=(const WrappedPromise& other);
+ WrappedPromise& operator=(WrappedPromise&& other) noexcept;
+
+ explicit WrappedPromise(internal::PassedPromise&& passed_promise);
+
+ // Constructs a promise to run |task|.
+ WrappedPromise(const Location& from_here, OnceClosure task);
+
+ // If the WrappedPromise hasn't been executed, cleared or taken by
+ // TakeForTesting, it will be canceled to prevent memory leaks of dependent
+ // tasks that will never run.
+ ~WrappedPromise();
+
+ // Returns true if the promise is not null.
+ operator bool() const { return promise_.get(); }
+
+ bool IsCanceled() const {
+ DCHECK(promise_);
+ return promise_->IsCanceled();
+ }
+
+ void OnCanceled() {
+ DCHECK(promise_);
+ promise_->OnCanceled();
+ }
+
+ // Can only be called once, clears |promise_| after execution.
+ void Execute();
+
+ // Clears |promise_|.
+ void Clear();
+
+ const Location& from_here() const {
+ DCHECK(promise_);
+ return promise_->from_here();
+ }
+
+ scoped_refptr<internal::AbstractPromise>& GetForTesting() { return promise_; }
+
+ scoped_refptr<internal::AbstractPromise> TakeForTesting() {
+ return std::move(promise_);
+ }
+
+ private:
+ template <typename ResolveType, typename RejectType>
+ friend class Promise;
+
+ template <typename T, typename... Args>
+ friend class internal::PromiseCallbackHelper;
+
+ friend class Promises;
+
+ scoped_refptr<internal::AbstractPromise> promise_;
+};
+
} // namespace base
#endif // BASE_TASK_PROMISE_ABSTRACT_PROMISE_H_
diff --git a/chromium/base/task/promise/abstract_promise_unittest.cc b/chromium/base/task/promise/abstract_promise_unittest.cc
index ea8d31fd898..5cbe4498d54 100644
--- a/chromium/base/task/promise/abstract_promise_unittest.cc
+++ b/chromium/base/task/promise/abstract_promise_unittest.cc
@@ -238,10 +238,13 @@ class AbstractPromiseTest : public testing::Test {
#endif
std::move(settings.callback));
- return AbstractPromise::Create(
- settings.task_runner, settings.from_here,
- std::move(settings.prerequisites), settings.reject_policy,
- DependentList::ConstructUnresolved(), std::move(executor_data));
+ return WrappedPromise(AbstractPromise::Create(
+ settings.task_runner, settings.from_here,
+ std::move(settings.prerequisites),
+ settings.reject_policy,
+ DependentList::ConstructUnresolved(),
+ std::move(executor_data)))
+ .TakeForTesting();
}
PromiseSettings settings;
@@ -293,7 +296,7 @@ class AbstractPromiseTest : public testing::Test {
PromiseSettingsBuilder AllPromise(
Location from_here,
- std::vector<internal::DependentList::Node> prerequisite_list) {
+ std::vector<DependentList::Node> prerequisite_list) {
PromiseSettingsBuilder builder(
from_here, std::make_unique<AbstractPromise::AdjacencyList>(
std::move(prerequisite_list)));
diff --git a/chromium/base/task/promise/dependent_list.h b/chromium/base/task/promise/dependent_list.h
index 020bdbfc77f..3245c1cb48f 100644
--- a/chromium/base/task/promise/dependent_list.h
+++ b/chromium/base/task/promise/dependent_list.h
@@ -59,7 +59,7 @@ class BASE_EXPORT DependentList {
// Align Node on an 8-byte boundary to ensure the first 3 bits are 0 and can
// be used to store additional state (see static_asserts below).
- class BASE_EXPORT alignas(8) Node {
+ class BASE_EXPORT ALIGNAS(8) Node {
public:
Node();
explicit Node(Node&& other) noexcept;
diff --git a/chromium/base/task/promise/finally_executor.h b/chromium/base/task/promise/finally_executor.h
index 7dc1c798a5c..a80f81c5ee9 100644
--- a/chromium/base/task/promise/finally_executor.h
+++ b/chromium/base/task/promise/finally_executor.h
@@ -43,9 +43,15 @@ class FinallyExecutor {
void Execute(AbstractPromise* promise) {
AbstractPromise* prerequisite = promise->GetOnlyPrerequisite();
- CallbackT* resolve_executor = static_cast<CallbackT*>(&common_.callback_);
- RunHelper<CallbackT, void, ResolveStorage, RejectStorage>::Run(
- std::move(*resolve_executor), prerequisite, promise);
+ // Internally RunHelper uses const RepeatingCallback<>& to avoid the
+ // binary size overhead of moving a scoped_refptr<> about. We respect
+ // the onceness of the callback and RunHelper will overwrite the callback
+ // with the result.
+ using RepeatingCB = typename ToRepeatingCallback<CallbackT>::value;
+ RepeatingCB* resolve_executor =
+ static_cast<RepeatingCB*>(&common_.callback_);
+ RunHelper<RepeatingCB, void, ResolveStorage, RejectStorage>::Run(
+ *resolve_executor, prerequisite, promise);
}
#if DCHECK_IS_ON()
diff --git a/chromium/base/task/promise/helpers.cc b/chromium/base/task/promise/helpers.cc
index 8a68a123956..f100cac0c29 100644
--- a/chromium/base/task/promise/helpers.cc
+++ b/chromium/base/task/promise/helpers.cc
@@ -11,32 +11,11 @@
namespace base {
namespace internal {
-PromiseHolder::PromiseHolder(scoped_refptr<AbstractPromise> promise)
- : promise_(std::move(promise)) {}
-
-PromiseHolder::~PromiseHolder() {
- // Detect if the promise was not executed and if so cancel to ensure memory
- // is released.
- if (promise_)
- promise_->OnCanceled();
-}
-
-PromiseHolder::PromiseHolder(PromiseHolder&& other)
- : promise_(std::move(other.promise_)) {}
-
-scoped_refptr<AbstractPromise> PromiseHolder::Unwrap() const {
- return std::move(promise_);
-}
-
-scoped_refptr<TaskRunner> GetCurrentSequence() {
- return SequencedTaskRunnerHandle::Get();
-}
-
DoNothing ToCallbackBase(DoNothing task) {
return task;
}
-scoped_refptr<AbstractPromise> ConstructAbstractPromiseWithSinglePrerequisite(
+PassedPromise ConstructAbstractPromiseWithSinglePrerequisite(
const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
AbstractPromise* prerequisite,
@@ -46,26 +25,35 @@ scoped_refptr<AbstractPromise> ConstructAbstractPromiseWithSinglePrerequisite(
if (!prerequisite) {
// Ensure the destructor for |executor_data| runs.
PromiseExecutor dummy_executor(std::move(executor_data));
- return nullptr;
+ return PassedPromise();
}
- return AbstractPromise::Create(
+ return PassedPromise(AbstractPromise::Create(
task_runner, from_here,
std::make_unique<AbstractPromise::AdjacencyList>(prerequisite),
RejectPolicy::kMustCatchRejection,
- internal::DependentList::ConstructUnresolved(), std::move(executor_data));
+ internal::DependentList::ConstructUnresolved(),
+ std::move(executor_data)));
}
-scoped_refptr<AbstractPromise> ConstructManualPromiseResolverPromise(
+PassedPromise ConstructHereAbstractPromiseWithSinglePrerequisite(
const Location& from_here,
- RejectPolicy reject_policy,
- bool can_resolve,
- bool can_reject) {
- return AbstractPromise::CreateNoPrerequisitePromise(
+ AbstractPromise* prerequisite,
+ internal::PromiseExecutor::Data&& executor_data) noexcept {
+ return ConstructAbstractPromiseWithSinglePrerequisite(
+ SequencedTaskRunnerHandle::Get(), from_here, prerequisite,
+ std::move(executor_data));
+}
+
+PassedPromise ConstructManualPromiseResolverPromise(const Location& from_here,
+ RejectPolicy reject_policy,
+ bool can_resolve,
+ bool can_reject) {
+ return PassedPromise(AbstractPromise::CreateNoPrerequisitePromise(
from_here, reject_policy, internal::DependentList::ConstructUnresolved(),
internal::PromiseExecutor::Data(
in_place_type_t<internal::NoOpPromiseExecutor>(), can_resolve,
- can_reject));
+ can_reject)));
}
} // namespace internal
diff --git a/chromium/base/task/promise/helpers.h b/chromium/base/task/promise/helpers.h
index 29cb49011ab..87d9ff64bec 100644
--- a/chromium/base/task/promise/helpers.h
+++ b/chromium/base/task/promise/helpers.h
@@ -19,11 +19,6 @@ class DoNothing;
namespace internal {
-// A wrapper around SequencedTaskRunnerHandle::Get(). This file is included by
-// base/task_runner.h which means we can't include anything that depends on
-// that!
-scoped_refptr<TaskRunner> BASE_EXPORT GetCurrentSequence();
-
template <typename T>
using ToNonVoidT = std::conditional_t<std::is_void<T>::value, Void, T>;
@@ -412,9 +407,26 @@ class ArgMoveSemanticsHelper {
}
};
+// Helper for converting a callback to its repeating variant.
+template <typename Cb>
+struct ToRepeatingCallback;
+
+template <typename Cb>
+struct ToRepeatingCallback<OnceCallback<Cb>> {
+ using value = RepeatingCallback<Cb>;
+};
+
+template <typename Cb>
+struct ToRepeatingCallback<RepeatingCallback<Cb>> {
+ using value = RepeatingCallback<Cb>;
+};
+
// Helper for running a promise callback and storing the result if any.
//
-// Callback = signature of the callback to execute,
+// Callback = signature of the callback to execute. Note we use repeating
+// callbacks to avoid the binary size overhead of a once callback which will
+// generate a destructor which is redundant because we overwrite the executor
+// with the promise result which also triggers the destructor.
// ArgStorageType = type of the callback parameter (or void if none)
// ResolveStorage = type to use for resolve, usually Resolved<T>.
// RejectStorage = type to use for reject, usually Rejected<T>.
@@ -431,18 +443,18 @@ template <typename CbResult,
typename ArgStorageType,
typename ResolveStorage,
typename RejectStorage>
-struct RunHelper<OnceCallback<CbResult(CbArg)>,
+struct RunHelper<RepeatingCallback<CbResult(CbArg)>,
ArgStorageType,
ResolveStorage,
RejectStorage> {
- using Callback = OnceCallback<CbResult(CbArg)>;
+ using Callback = RepeatingCallback<CbResult(CbArg)>;
- static void Run(Callback&& executor,
+ static void Run(const Callback& executor,
AbstractPromise* arg,
AbstractPromise* result) {
EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(
- result, std::move(executor).Run(
- ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg)));
+ result,
+ executor.Run(ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg)));
}
};
@@ -451,19 +463,18 @@ template <typename CbArg,
typename ArgStorageType,
typename ResolveStorage,
typename RejectStorage>
-struct RunHelper<OnceCallback<void(CbArg)>,
+struct RunHelper<RepeatingCallback<void(CbArg)>,
ArgStorageType,
ResolveStorage,
RejectStorage> {
- using Callback = OnceCallback<void(CbArg)>;
+ using Callback = RepeatingCallback<void(CbArg)>;
- static void Run(Callback&& executor,
+ static void Run(const Callback& executor,
AbstractPromise* arg,
AbstractPromise* result) {
static_assert(std::is_void<typename ResolveStorage::Type>::value, "");
- std::move(executor).Run(
- ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg));
- result->emplace(Resolved<void>());
+ executor.Run(ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg));
+ result->EmplaceResolvedVoid();
}
};
@@ -472,17 +483,17 @@ template <typename CbResult,
typename ArgStorageType,
typename ResolveStorage,
typename RejectStorage>
-struct RunHelper<OnceCallback<CbResult()>,
+struct RunHelper<RepeatingCallback<CbResult()>,
ArgStorageType,
ResolveStorage,
RejectStorage> {
- using Callback = OnceCallback<CbResult()>;
+ using Callback = RepeatingCallback<CbResult()>;
- static void Run(Callback&& executor,
+ static void Run(const Callback& executor,
AbstractPromise* arg,
AbstractPromise* result) {
- EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(
- result, std::move(executor).Run());
+ EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(result,
+ executor.Run());
}
};
@@ -490,16 +501,16 @@ struct RunHelper<OnceCallback<CbResult()>,
template <typename ArgStorageType,
typename ResolveStorage,
typename RejectStorage>
-struct RunHelper<OnceCallback<void()>,
+struct RunHelper<RepeatingCallback<void()>,
ArgStorageType,
ResolveStorage,
RejectStorage> {
- static void Run(OnceCallback<void()>&& executor,
+ static void Run(const RepeatingCallback<void()>& executor,
AbstractPromise* arg,
AbstractPromise* result) {
static_assert(std::is_void<typename ResolveStorage::Type>::value, "");
- std::move(executor).Run();
- result->emplace(Resolved<void>());
+ executor.Run();
+ result->EmplaceResolvedVoid();
}
};
@@ -538,79 +549,51 @@ template <typename CbResult,
typename... CbArgs,
typename ResolveStorage,
typename RejectStorage>
-struct RunHelper<OnceCallback<CbResult(CbArgs...)>,
+struct RunHelper<RepeatingCallback<CbResult(CbArgs...)>,
Resolved<std::tuple<CbArgs...>>,
ResolveStorage,
RejectStorage> {
- using Callback = OnceCallback<CbResult(CbArgs...)>;
+ using Callback = RepeatingCallback<CbResult(CbArgs...)>;
using StorageType = Resolved<std::tuple<CbArgs...>>;
using IndexSequence = std::index_sequence_for<CbArgs...>;
- static void Run(Callback&& executor,
+ static void Run(const Callback& executor,
AbstractPromise* arg,
AbstractPromise* result) {
AbstractPromise::ValueHandle value = arg->TakeValue();
std::tuple<CbArgs...>& tuple = value.value().Get<StorageType>()->value;
- RunInternal(std::move(executor), tuple, result,
+ RunInternal(executor, tuple, result,
std::integral_constant<bool, std::is_void<CbResult>::value>(),
IndexSequence{});
}
private:
template <typename Callback, size_t... Indices>
- static void RunInternal(Callback&& executor,
+ static void RunInternal(const Callback& executor,
std::tuple<CbArgs...>& tuple,
AbstractPromise* result,
std::false_type void_result,
std::index_sequence<Indices...>) {
- EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(
- std::move(executor).Run(
- TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>,
- Indices>::Get(tuple)...));
+ EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(executor.Run(
+ TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>,
+ Indices>::Get(tuple)...));
}
template <typename Callback, size_t... Indices>
- static void RunInternal(Callback&& executor,
+ static void RunInternal(const Callback& executor,
std::tuple<CbArgs...>& tuple,
AbstractPromise* result,
std::true_type void_result,
std::index_sequence<Indices...>) {
- std::move(executor).Run(
- TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>,
- Indices>::Get(tuple)...);
- result->emplace(Resolved<void>());
+ executor.Run(TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>,
+ Indices>::Get(tuple)...);
+ result->EmplaceResolvedVoid();
}
};
-// For use with base::Bind*. Cancels the promise if the callback was not run by
-// the time the callback is deleted.
-class BASE_EXPORT PromiseHolder {
- public:
- explicit PromiseHolder(scoped_refptr<internal::AbstractPromise> promise);
-
- ~PromiseHolder();
-
- PromiseHolder(PromiseHolder&& other);
-
- scoped_refptr<internal::AbstractPromise> Unwrap() const;
-
- private:
- mutable scoped_refptr<internal::AbstractPromise> promise_;
-};
-
-} // namespace internal
-
-template <>
-struct BindUnwrapTraits<internal::PromiseHolder> {
- static scoped_refptr<internal::AbstractPromise> Unwrap(
- const internal::PromiseHolder& o) {
- return o.Unwrap();
- }
-};
-
-namespace internal {
-
-// Used by ManualPromiseResolver<> to generate callbacks.
+// Used by ManualPromiseResolver<> to generate callbacks. Note the use of
+// WrappedPromise, this is necessary because we want to cancel the promise (to
+// release memory) if the callback gets deleted without having being run.
template <typename T, typename... Args>
class PromiseCallbackHelper {
public:
@@ -624,7 +607,7 @@ class PromiseCallbackHelper {
std::forward<Args>(args)...);
promise->OnResolved();
},
- PromiseHolder(promise));
+ promise);
}
static RepeatingCallback GetRepeatingResolveCallback(
@@ -635,7 +618,7 @@ class PromiseCallbackHelper {
std::forward<Args>(args)...);
promise->OnResolved();
},
- PromiseHolder(promise));
+ promise);
}
static Callback GetRejectCallback(scoped_refptr<AbstractPromise>& promise) {
@@ -645,7 +628,7 @@ class PromiseCallbackHelper {
std::forward<Args>(args)...);
promise->OnRejected();
},
- PromiseHolder(promise));
+ promise);
}
static RepeatingCallback GetRepeatingRejectCallback(
@@ -656,7 +639,7 @@ class PromiseCallbackHelper {
std::forward<Args>(args)...);
promise->OnRejected();
},
- PromiseHolder(promise));
+ promise);
}
};
@@ -679,8 +662,7 @@ struct IsValidPromiseArg<PromiseType&, CallbackArgType> {
// rejection storage type.
template <typename RejectT>
struct AllPromiseRejectHelper {
- static void Reject(AbstractPromise* result,
- const scoped_refptr<AbstractPromise>& prerequisite) {
+ static void Reject(AbstractPromise* result, AbstractPromise* prerequisite) {
result->emplace(scoped_refptr<AbstractPromise>(prerequisite));
}
};
@@ -709,14 +691,20 @@ CallbackBase&& ToCallbackBase(const CallbackT&& task) {
// Helps reduce template bloat by moving AbstractPromise construction out of
// line.
-scoped_refptr<AbstractPromise> BASE_EXPORT
-ConstructAbstractPromiseWithSinglePrerequisite(
+PassedPromise BASE_EXPORT ConstructAbstractPromiseWithSinglePrerequisite(
const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
AbstractPromise* prerequsite,
- internal::PromiseExecutor::Data&& executor_data) noexcept;
+ PromiseExecutor::Data&& executor_data) noexcept;
+
+// Like ConstructAbstractPromiseWithSinglePrerequisite except tasks are posted
+// onto SequencedTaskRunnerHandle::Get().
+PassedPromise BASE_EXPORT ConstructHereAbstractPromiseWithSinglePrerequisite(
+ const Location& from_here,
+ AbstractPromise* prerequsite,
+ PromiseExecutor::Data&& executor_data) noexcept;
-scoped_refptr<AbstractPromise> BASE_EXPORT
+PassedPromise BASE_EXPORT
ConstructManualPromiseResolverPromise(const Location& from_here,
RejectPolicy reject_policy,
bool can_resolve,
diff --git a/chromium/base/task/promise/helpers_unittest.cc b/chromium/base/task/promise/helpers_unittest.cc
index afdc9e7079e..f13fb5c8c7a 100644
--- a/chromium/base/task/promise/helpers_unittest.cc
+++ b/chromium/base/task/promise/helpers_unittest.cc
@@ -187,8 +187,9 @@ TEST(EmplaceHelper, EmplacePromiseResult) {
TEST(EmplaceHelper, EmplacePromise) {
scoped_refptr<AbstractPromise> promise =
DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true);
- scoped_refptr<AbstractPromise> curried = DoNothingPromiseBuilder(FROM_HERE);
+ PassedPromise curried = NoOpPromiseExecutor::Create(
+ FROM_HERE, false, false, RejectPolicy::kCatchNotRequired);
EmplaceHelper<Resolved<int>, Rejected<NoReject>>::Emplace(
promise.get(), Promise<int>(std::move(curried)));
@@ -243,8 +244,8 @@ TEST(RunHelper, CallbackVoidArgumentIntResult) {
scoped_refptr<AbstractPromise> result =
DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true);
- RunHelper<OnceCallback<int()>, Resolved<void>, Resolved<int>,
- Rejected<std::string>>::Run(BindOnce([]() { return 123; }),
+ RunHelper<RepeatingCallback<int()>, Resolved<void>, Resolved<int>,
+ Rejected<std::string>>::Run(BindRepeating([]() { return 123; }),
arg.get(), result.get());
EXPECT_EQ(result->value().template Get<Resolved<int>>()->value, 123);
@@ -255,8 +256,8 @@ TEST(RunHelper, CallbackVoidArgumentVoidResult) {
scoped_refptr<AbstractPromise> result =
DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true);
- RunHelper<OnceCallback<void()>, Resolved<void>, Resolved<void>,
- Rejected<std::string>>::Run(BindOnce([]() {}), arg.get(),
+ RunHelper<RepeatingCallback<void()>, Resolved<void>, Resolved<void>,
+ Rejected<std::string>>::Run(BindRepeating([]() {}), arg.get(),
result.get());
EXPECT_TRUE(result->value().ContainsResolved());
@@ -268,8 +269,8 @@ TEST(RunHelper, CallbackIntArgumentIntResult) {
DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true);
arg->emplace(Resolved<int>(123));
- RunHelper<OnceCallback<int(int)>, Resolved<int>, Resolved<int>,
- Rejected<std::string>>::Run(BindOnce([](int value) {
+ RunHelper<RepeatingCallback<int(int)>, Resolved<int>, Resolved<int>,
+ Rejected<std::string>>::Run(BindRepeating([](int value) {
return value + 1;
}),
arg.get(), result.get());
@@ -284,7 +285,7 @@ TEST(RunHelper, CallbackIntArgumentArgumentVoidResult) {
arg->emplace(Resolved<int>(123));
int value;
- RunHelper<OnceCallback<void(int)>, Resolved<int>, Resolved<void>,
+ RunHelper<RepeatingCallback<void(int)>, Resolved<int>, Resolved<void>,
Rejected<std::string>>::Run(BindLambdaForTesting([&](int arg) {
value = arg;
}),
diff --git a/chromium/base/task/promise/no_op_promise_executor.cc b/chromium/base/task/promise/no_op_promise_executor.cc
index 74a5bcc57c8..e168ed91007 100644
--- a/chromium/base/task/promise/no_op_promise_executor.cc
+++ b/chromium/base/task/promise/no_op_promise_executor.cc
@@ -45,15 +45,14 @@ bool NoOpPromiseExecutor::CanReject() const {
void NoOpPromiseExecutor::Execute(AbstractPromise* promise) {}
// static
-scoped_refptr<internal::AbstractPromise> NoOpPromiseExecutor::Create(
- Location from_here,
- bool can_resolve,
- bool can_reject,
- RejectPolicy reject_policy) {
- return AbstractPromise::CreateNoPrerequisitePromise(
+PassedPromise NoOpPromiseExecutor::Create(Location from_here,
+ bool can_resolve,
+ bool can_reject,
+ RejectPolicy reject_policy) {
+ return PassedPromise(AbstractPromise::CreateNoPrerequisitePromise(
from_here, reject_policy, DependentList::ConstructUnresolved(),
PromiseExecutor::Data(in_place_type_t<NoOpPromiseExecutor>(), can_resolve,
- can_reject));
+ can_reject)));
}
} // namespace internal
diff --git a/chromium/base/task/promise/no_op_promise_executor.h b/chromium/base/task/promise/no_op_promise_executor.h
index 005ee8c0130..13f8ef0891b 100644
--- a/chromium/base/task/promise/no_op_promise_executor.h
+++ b/chromium/base/task/promise/no_op_promise_executor.h
@@ -21,10 +21,10 @@ class BASE_EXPORT NoOpPromiseExecutor {
static constexpr PromiseExecutor::PrerequisitePolicy kPrerequisitePolicy =
PromiseExecutor::PrerequisitePolicy::kNever;
- static scoped_refptr<AbstractPromise> Create(Location from_here,
- bool can_resolve,
- bool can_reject,
- RejectPolicy reject_policy);
+ static PassedPromise Create(Location from_here,
+ bool can_resolve,
+ bool can_reject,
+ RejectPolicy reject_policy);
PromiseExecutor::PrerequisitePolicy GetPrerequisitePolicy() const;
bool IsCancelled() const;
diff --git a/chromium/base/task/promise/post_task_executor.h b/chromium/base/task/promise/post_task_executor.h
index c018113de63..e3882ba1712 100644
--- a/chromium/base/task/promise/post_task_executor.h
+++ b/chromium/base/task/promise/post_task_executor.h
@@ -52,10 +52,14 @@ class PostTaskExecutor {
static_assert(sizeof(CallbackBase) == sizeof(OnceCallback<ReturnType()>),
"We assume it's possible to cast from CallbackBase to "
"OnceCallback<ReturnType()>");
- OnceCallback<ReturnType()>* task =
- static_cast<OnceCallback<ReturnType()>*>(&task_);
- internal::RunHelper<OnceCallback<ReturnType()>, void, ResolveStorage,
- RejectStorage>::Run(std::move(*task), nullptr, promise);
+ // Internally RunHelper uses const RepeatingCallback<>& to avoid the
+ // binary size overhead of moving a scoped_refptr<> about. We respect
+ // the onceness of the callback and RunHelper will overwrite the callback
+ // with the result.
+ RepeatingCallback<ReturnType()>* task =
+ static_cast<RepeatingCallback<ReturnType()>*>(&task_);
+ internal::RunHelper<RepeatingCallback<ReturnType()>, void, ResolveStorage,
+ RejectStorage>::Run(*task, nullptr, promise);
}
private:
diff --git a/chromium/base/task/promise/post_task_executor_unittest.cc b/chromium/base/task/promise/post_task_executor_unittest.cc
index 8e86aaefad4..1d3fc57e4d0 100644
--- a/chromium/base/task/promise/post_task_executor_unittest.cc
+++ b/chromium/base/task/promise/post_task_executor_unittest.cc
@@ -16,9 +16,8 @@ namespace internal {
class PostTaskExecutorTest : public testing::Test {
public:
template <typename CallbackT>
- scoped_refptr<internal::AbstractPromise> CreatePostTaskPromise(
- const Location& from_here,
- CallbackT&& task) {
+ WrappedPromise CreatePostTaskPromise(const Location& from_here,
+ CallbackT&& task) {
// Extract properties from |task| callback.
using CallbackTraits = CallbackTraits<std::decay_t<CallbackT>>;
@@ -27,20 +26,20 @@ class PostTaskExecutorTest : public testing::Test {
internal::PostTaskExecutor<typename CallbackTraits::ReturnType>>(),
internal::ToCallbackBase(std::move(task)));
- return AbstractPromise::CreateNoPrerequisitePromise(
+ return WrappedPromise(AbstractPromise::CreateNoPrerequisitePromise(
from_here, RejectPolicy::kMustCatchRejection,
internal::DependentList::ConstructUnresolved(),
- std::move(executor_data));
+ std::move(executor_data)));
}
};
TEST_F(PostTaskExecutorTest, OnceClosure) {
bool run = false;
- scoped_refptr<AbstractPromise> p = CreatePostTaskPromise(
+ WrappedPromise p = CreatePostTaskPromise(
FROM_HERE, BindOnce([](bool* run) { *run = true; }, &run));
- p->Execute();
+ p.Execute();
EXPECT_TRUE(run);
}
@@ -48,20 +47,19 @@ TEST_F(PostTaskExecutorTest, OnceClosure) {
TEST_F(PostTaskExecutorTest, RepeatingClosure) {
bool run = false;
- scoped_refptr<AbstractPromise> p = CreatePostTaskPromise(
+ WrappedPromise p = CreatePostTaskPromise(
FROM_HERE, BindRepeating([](bool* run) { *run = true; }, &run));
- p->Execute();
+ p.Execute();
EXPECT_TRUE(run);
}
TEST_F(PostTaskExecutorTest, DoNothing) {
// Check it compiles and the executor doesn't crash when run.
- scoped_refptr<AbstractPromise> p =
- CreatePostTaskPromise(FROM_HERE, DoNothing());
+ WrappedPromise p = CreatePostTaskPromise(FROM_HERE, DoNothing());
- p->Execute();
+ p.Execute();
}
} // namespace internal
diff --git a/chromium/base/task/promise/promise.h b/chromium/base/task/promise/promise.h
index 4f2275ea183..467f74cb8e4 100644
--- a/chromium/base/task/promise/promise.h
+++ b/chromium/base/task/promise/promise.h
@@ -30,12 +30,12 @@ BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunner(
// callback will be posted immediately, otherwise it has to wait.
//
// Promise<> is copyable, moveable and thread safe. Under the hood
-// internal::AbstractPromise is refcounted so retaining multiple Promises<> will
+// AbstractPromise is refcounted so retaining multiple Promises<> will
// prevent that part of the promise graph from being released.
template <typename ResolveType, typename RejectType = NoReject>
-class Promise {
+class Promise : public internal::BasePromise {
public:
- Promise() : abstract_promise_(nullptr) {}
+ Promise() = default;
static_assert(
!std::is_reference<ResolveType>::value ||
@@ -49,16 +49,17 @@ class Promise {
explicit Promise(
scoped_refptr<internal::AbstractPromise> abstract_promise) noexcept
- : abstract_promise_(std::move(abstract_promise)) {}
+ : BasePromise(std::move(abstract_promise)) {}
- ~Promise() = default;
+ // Every PostTask calls this constructor so we need to be careful to avoid
+ // unnecessary binary bloat.
+ explicit Promise(internal::PassedPromise passed_promise) noexcept
+ : BasePromise(std::move(passed_promise),
+ BasePromise::InlineConstructor()) {}
- operator bool() const { return !!abstract_promise_; }
+ ~Promise() = default;
- bool IsCancelledForTesting() const {
- DCHECK(abstract_promise_);
- return abstract_promise_->IsCanceled();
- }
+ bool IsCancelledForTesting() const { return abstract_promise_->IsCanceled(); }
// Waits until the promise has settled and if resolved it returns the resolved
// value.
@@ -75,8 +76,10 @@ class Promise {
}
DCHECK(abstract_promise_->IsResolved())
<< "Can't take resolved value, promise wasn't resolved.";
- return std::move(
- abstract_promise_->TakeValue().value().Get<Resolved<T>>()->value);
+ return std::move(abstract_promise_->TakeValue()
+ .value()
+ .template Get<Resolved<T>>()
+ ->value);
}
// Waits until the promise has settled and if rejected it returns the rejected
@@ -95,8 +98,10 @@ class Promise {
abstract_promise_->IgnoreUncaughtCatchForTesting();
DCHECK(abstract_promise_->IsRejected())
<< "Can't take rejected value, promise wasn't rejected.";
- return std::move(
- abstract_promise_->TakeValue().value().Get<Rejected<T>>()->value);
+ return std::move(abstract_promise_->TakeValue()
+ .value()
+ .template Get<Rejected<T>>()
+ ->value);
}
bool IsResolvedForTesting() const {
@@ -122,22 +127,22 @@ class Promise {
//
// |task_runner| is const-ref to avoid bloat due the destructor (which posts a
// task).
- template <typename RejectCb>
+ template <typename CatchCb>
auto CatchOn(const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
- RejectCb on_reject) noexcept {
+ CatchCb on_reject) noexcept {
DCHECK(!on_reject.is_null());
// Extract properties from the |on_reject| callback.
- using RejectCallbackTraits = internal::CallbackTraits<RejectCb>;
- using RejectCallbackArgT = typename RejectCallbackTraits::ArgType;
+ using CatchCallbackTraits = internal::CallbackTraits<CatchCb>;
+ using CatchCallbackArgT = typename CatchCallbackTraits::ArgType;
// Compute the resolve and reject types of the returned Promise.
using ReturnedPromiseTraits =
internal::PromiseCombiner<ResolveType,
NoReject, // We've caught the reject case.
- typename RejectCallbackTraits::ResolveType,
- typename RejectCallbackTraits::RejectType>;
+ typename CatchCallbackTraits::ResolveType,
+ typename CatchCallbackTraits::RejectType>;
using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
@@ -148,13 +153,13 @@ class Promise {
static_assert(ReturnedPromiseTraits::valid,
"Ambiguous promise resolve type");
static_assert(
- internal::IsValidPromiseArg<RejectType, RejectCallbackArgT>::value ||
- std::is_void<RejectCallbackArgT>::value,
+ internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value ||
+ std::is_void<CatchCallbackArgT>::value,
"|on_reject| callback must accept Promise::RejectType or void.");
static_assert(
- !std::is_reference<RejectCallbackArgT>::value ||
- std::is_const<std::remove_reference_t<RejectCallbackArgT>>::value,
+ !std::is_reference<CatchCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value,
"Google C++ Style: References in function parameters must be const.");
return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
@@ -163,7 +168,7 @@ class Promise {
internal::PromiseExecutor::Data(
in_place_type_t<internal::ThenAndCatchExecutor<
OnceClosure, // Never called.
- OnceCallback<typename RejectCallbackTraits::SignatureType>,
+ OnceCallback<typename CatchCallbackTraits::SignatureType>,
internal::NoCallback, RejectType,
Resolved<ReturnedPromiseResolveT>,
Rejected<ReturnedPromiseRejectT>>>(),
@@ -171,18 +176,59 @@ class Promise {
internal::ToCallbackBase(std::move(on_reject)))));
}
- template <typename RejectCb>
+ template <typename CatchCb>
auto CatchOn(const TaskTraits& traits,
const Location& from_here,
- RejectCb&& on_reject) noexcept {
+ CatchCb&& on_reject) noexcept {
return CatchOn(CreateTaskRunner(traits), from_here,
- std::forward<RejectCb>(on_reject));
+ std::forward<CatchCb>(on_reject));
}
- template <typename RejectCb>
- auto CatchHere(const Location& from_here, RejectCb&& on_reject) noexcept {
- return CatchOn(internal::GetCurrentSequence(), from_here,
- std::forward<RejectCb>(on_reject));
+ template <typename CatchCb>
+ auto CatchHere(const Location& from_here, CatchCb&& on_reject) noexcept {
+ DCHECK(!on_reject.is_null());
+
+ // Extract properties from the |on_reject| callback.
+ using CatchCallbackTraits = internal::CallbackTraits<CatchCb>;
+ using CatchCallbackArgT = typename CatchCallbackTraits::ArgType;
+
+ // Compute the resolve and reject types of the returned Promise.
+ using ReturnedPromiseTraits =
+ internal::PromiseCombiner<ResolveType,
+ NoReject, // We've caught the reject case.
+ typename CatchCallbackTraits::ResolveType,
+ typename CatchCallbackTraits::RejectType>;
+ using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
+ using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
+
+ static_assert(!std::is_same<NoReject, RejectType>::value,
+ "Can't catch a NoReject promise.");
+
+ // Check we wouldn't need to return Promise<Variant<...>, ...>
+ static_assert(ReturnedPromiseTraits::valid,
+ "Ambiguous promise resolve type");
+ static_assert(
+ internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value ||
+ std::is_void<CatchCallbackArgT>::value,
+ "|on_reject| callback must accept Promise::RejectType or void.");
+
+ static_assert(
+ !std::is_reference<CatchCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value,
+ "Google C++ Style: References in function parameters must be const.");
+
+ return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
+ ConstructHereAbstractPromiseWithSinglePrerequisite(
+ from_here, abstract_promise_.get(),
+ internal::PromiseExecutor::Data(
+ in_place_type_t<internal::ThenAndCatchExecutor<
+ OnceClosure, // Never called.
+ OnceCallback<typename CatchCallbackTraits::SignatureType>,
+ internal::NoCallback, RejectType,
+ Resolved<ReturnedPromiseResolveT>,
+ Rejected<ReturnedPromiseRejectT>>>(),
+ OnceClosure(),
+ internal::ToCallbackBase(std::move(on_reject)))));
}
// A task to execute |on_resolve| is posted on |task_runner| as soon as this
@@ -198,23 +244,22 @@ class Promise {
//
// |task_runner| is const-ref to avoid bloat due the destructor (which posts a
// task).
- template <typename ResolveCb>
+ template <typename ThenCb>
auto ThenOn(const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
- ResolveCb on_resolve) noexcept {
+ ThenCb on_resolve) noexcept {
DCHECK(!on_resolve.is_null());
// Extract properties from the |on_resolve| callback.
- using ResolveCallbackTraits =
- internal::CallbackTraits<std::decay_t<ResolveCb>>;
- using ResolveCallbackArgT = typename ResolveCallbackTraits::ArgType;
+ using ThenCallbackTraits = internal::CallbackTraits<std::decay_t<ThenCb>>;
+ using ThenCallbackArgT = typename ThenCallbackTraits::ArgType;
// Compute the resolve and reject types of the returned Promise.
using ReturnedPromiseTraits =
internal::PromiseCombiner<NoResolve, // We've caught the resolve case.
RejectType,
- typename ResolveCallbackTraits::ResolveType,
- typename ResolveCallbackTraits::RejectType>;
+ typename ThenCallbackTraits::ResolveType,
+ typename ThenCallbackTraits::RejectType>;
using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
@@ -223,13 +268,13 @@ class Promise {
"Ambiguous promise reject type");
static_assert(
- internal::IsValidPromiseArg<ResolveType, ResolveCallbackArgT>::value ||
- std::is_void<ResolveCallbackArgT>::value,
+ internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value ||
+ std::is_void<ThenCallbackArgT>::value,
"|on_resolve| callback must accept Promise::ResolveType or void.");
static_assert(
- !std::is_reference<ResolveCallbackArgT>::value ||
- std::is_const<std::remove_reference_t<ResolveCallbackArgT>>::value,
+ !std::is_reference<ThenCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value,
"Google C++ Style: References in function parameters must be const.");
return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
@@ -237,7 +282,7 @@ class Promise {
task_runner, from_here, abstract_promise_.get(),
internal::PromiseExecutor::Data(
in_place_type_t<internal::ThenAndCatchExecutor<
- OnceCallback<typename ResolveCallbackTraits::SignatureType>,
+ OnceCallback<typename ThenCallbackTraits::SignatureType>,
OnceClosure, ResolveType, internal::NoCallback,
Resolved<ReturnedPromiseResolveT>,
Rejected<ReturnedPromiseRejectT>>>(),
@@ -245,18 +290,56 @@ class Promise {
OnceClosure())));
}
- template <typename ResolveCb>
+ template <typename ThenCb>
auto ThenOn(const TaskTraits& traits,
const Location& from_here,
- ResolveCb&& on_resolve) noexcept {
+ ThenCb&& on_resolve) noexcept {
return ThenOn(CreateTaskRunner(traits), from_here,
- std::forward<ResolveCb>(on_resolve));
+ std::forward<ThenCb>(on_resolve));
}
- template <typename ResolveCb>
- auto ThenHere(const Location& from_here, ResolveCb&& on_resolve) noexcept {
- return ThenOn(internal::GetCurrentSequence(), from_here,
- std::forward<ResolveCb>(on_resolve));
+ template <typename ThenCb>
+ auto ThenHere(const Location& from_here, ThenCb&& on_resolve) noexcept {
+ DCHECK(!on_resolve.is_null());
+
+ // Extract properties from the |on_resolve| callback.
+ using ThenCallbackTraits = internal::CallbackTraits<std::decay_t<ThenCb>>;
+ using ThenCallbackArgT = typename ThenCallbackTraits::ArgType;
+
+ // Compute the resolve and reject types of the returned Promise.
+ using ReturnedPromiseTraits =
+ internal::PromiseCombiner<NoResolve, // We've caught the resolve case.
+ RejectType,
+ typename ThenCallbackTraits::ResolveType,
+ typename ThenCallbackTraits::RejectType>;
+ using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
+ using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
+
+ // Check we wouldn't need to return Promise<..., Variant<...>>
+ static_assert(ReturnedPromiseTraits::valid,
+ "Ambiguous promise reject type");
+
+ static_assert(
+ internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value ||
+ std::is_void<ThenCallbackArgT>::value,
+ "|on_resolve| callback must accept Promise::ResolveType or void.");
+
+ static_assert(
+ !std::is_reference<ThenCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value,
+ "Google C++ Style: References in function parameters must be const.");
+
+ return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
+ ConstructHereAbstractPromiseWithSinglePrerequisite(
+ from_here, abstract_promise_.get(),
+ internal::PromiseExecutor::Data(
+ in_place_type_t<internal::ThenAndCatchExecutor<
+ OnceCallback<typename ThenCallbackTraits::SignatureType>,
+ OnceClosure, ResolveType, internal::NoCallback,
+ Resolved<ReturnedPromiseResolveT>,
+ Rejected<ReturnedPromiseRejectT>>>(),
+ internal::ToCallbackBase(std::move(on_resolve)),
+ OnceClosure())));
}
// A task to execute |on_reject| is posted on |task_runner| as soon as this
@@ -279,26 +362,26 @@ class Promise {
//
// |task_runner| is const-ref to avoid bloat due the destructor (which posts a
// task).
- template <typename ResolveCb, typename RejectCb>
+ template <typename ThenCb, typename CatchCb>
auto ThenOn(const scoped_refptr<TaskRunner>& task_runner,
const Location& from_here,
- ResolveCb on_resolve,
- RejectCb on_reject) noexcept {
+ ThenCb on_resolve,
+ CatchCb on_reject) noexcept {
DCHECK(!on_resolve.is_null());
DCHECK(!on_reject.is_null());
// Extract properties from the |on_resolve| and |on_reject| callbacks.
- using ResolveCallbackTraits = internal::CallbackTraits<ResolveCb>;
- using RejectCallbackTraits = internal::CallbackTraits<RejectCb>;
- using ResolveCallbackArgT = typename ResolveCallbackTraits::ArgType;
- using RejectCallbackArgT = typename RejectCallbackTraits::ArgType;
+ using ThenCallbackTraits = internal::CallbackTraits<ThenCb>;
+ using CatchCallbackTraits = internal::CallbackTraits<CatchCb>;
+ using ThenCallbackArgT = typename ThenCallbackTraits::ArgType;
+ using CatchCallbackArgT = typename CatchCallbackTraits::ArgType;
// Compute the resolve and reject types of the returned Promise.
using ReturnedPromiseTraits =
- internal::PromiseCombiner<typename ResolveCallbackTraits::ResolveType,
- typename ResolveCallbackTraits::RejectType,
- typename RejectCallbackTraits::ResolveType,
- typename RejectCallbackTraits::RejectType>;
+ internal::PromiseCombiner<typename ThenCallbackTraits::ResolveType,
+ typename ThenCallbackTraits::RejectType,
+ typename CatchCallbackTraits::ResolveType,
+ typename CatchCallbackTraits::RejectType>;
using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
@@ -310,23 +393,23 @@ class Promise {
"compatible types.");
static_assert(
- internal::IsValidPromiseArg<ResolveType, ResolveCallbackArgT>::value ||
- std::is_void<ResolveCallbackArgT>::value,
+ internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value ||
+ std::is_void<ThenCallbackArgT>::value,
"|on_resolve| callback must accept Promise::ResolveType or void.");
static_assert(
- internal::IsValidPromiseArg<RejectType, RejectCallbackArgT>::value ||
- std::is_void<RejectCallbackArgT>::value,
+ internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value ||
+ std::is_void<CatchCallbackArgT>::value,
"|on_reject| callback must accept Promise::RejectType or void.");
static_assert(
- !std::is_reference<ResolveCallbackArgT>::value ||
- std::is_const<std::remove_reference_t<ResolveCallbackArgT>>::value,
+ !std::is_reference<ThenCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value,
"Google C++ Style: References in function parameters must be const.");
static_assert(
- !std::is_reference<RejectCallbackArgT>::value ||
- std::is_const<std::remove_reference_t<RejectCallbackArgT>>::value,
+ !std::is_reference<CatchCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value,
"Google C++ Style: References in function parameters must be const.");
return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
@@ -334,31 +417,84 @@ class Promise {
task_runner, from_here, abstract_promise_.get(),
internal::PromiseExecutor::Data(
in_place_type_t<internal::ThenAndCatchExecutor<
- OnceCallback<typename ResolveCallbackTraits::SignatureType>,
- OnceCallback<typename RejectCallbackTraits::SignatureType>,
+ OnceCallback<typename ThenCallbackTraits::SignatureType>,
+ OnceCallback<typename CatchCallbackTraits::SignatureType>,
ResolveType, RejectType, Resolved<ReturnedPromiseResolveT>,
Rejected<ReturnedPromiseRejectT>>>(),
internal::ToCallbackBase(std::move(on_resolve)),
internal::ToCallbackBase(std::move(on_reject)))));
}
- template <typename ResolveCb, typename RejectCb>
+ template <typename ThenCb, typename CatchCb>
auto ThenOn(const TaskTraits& traits,
const Location& from_here,
- ResolveCb on_resolve,
- RejectCb on_reject) noexcept {
+ ThenCb on_resolve,
+ CatchCb on_reject) noexcept {
return ThenOn(CreateTaskRunner(traits), from_here,
- std::forward<ResolveCb>(on_resolve),
- std::forward<RejectCb>(on_reject));
+ std::forward<ThenCb>(on_resolve),
+ std::forward<CatchCb>(on_reject));
}
- template <typename ResolveCb, typename RejectCb>
+ template <typename ThenCb, typename CatchCb>
auto ThenHere(const Location& from_here,
- ResolveCb on_resolve,
- RejectCb on_reject) noexcept {
- return ThenOn(internal::GetCurrentSequence(), from_here,
- std::forward<ResolveCb>(on_resolve),
- std::forward<RejectCb>(on_reject));
+ ThenCb on_resolve,
+ CatchCb on_reject) noexcept {
+ DCHECK(!on_resolve.is_null());
+ DCHECK(!on_reject.is_null());
+
+ // Extract properties from the |on_resolve| and |on_reject| callbacks.
+ using ThenCallbackTraits = internal::CallbackTraits<ThenCb>;
+ using CatchCallbackTraits = internal::CallbackTraits<CatchCb>;
+ using ThenCallbackArgT = typename ThenCallbackTraits::ArgType;
+ using CatchCallbackArgT = typename CatchCallbackTraits::ArgType;
+
+ // Compute the resolve and reject types of the returned Promise.
+ using ReturnedPromiseTraits =
+ internal::PromiseCombiner<typename ThenCallbackTraits::ResolveType,
+ typename ThenCallbackTraits::RejectType,
+ typename CatchCallbackTraits::ResolveType,
+ typename CatchCallbackTraits::RejectType>;
+ using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType;
+ using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType;
+
+ static_assert(!std::is_same<NoReject, RejectType>::value,
+ "Can't catch a NoReject promise.");
+
+ static_assert(ReturnedPromiseTraits::valid,
+ "|on_resolve| callback and |on_resolve| callback must return "
+ "compatible types.");
+
+ static_assert(
+ internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value ||
+ std::is_void<ThenCallbackArgT>::value,
+ "|on_resolve| callback must accept Promise::ResolveType or void.");
+
+ static_assert(
+ internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value ||
+ std::is_void<CatchCallbackArgT>::value,
+ "|on_reject| callback must accept Promise::RejectType or void.");
+
+ static_assert(
+ !std::is_reference<ThenCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value,
+ "Google C++ Style: References in function parameters must be const.");
+
+ static_assert(
+ !std::is_reference<CatchCallbackArgT>::value ||
+ std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value,
+ "Google C++ Style: References in function parameters must be const.");
+
+ return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
+ ConstructHereAbstractPromiseWithSinglePrerequisite(
+ from_here, abstract_promise_.get(),
+ internal::PromiseExecutor::Data(
+ in_place_type_t<internal::ThenAndCatchExecutor<
+ OnceCallback<typename ThenCallbackTraits::SignatureType>,
+ OnceCallback<typename CatchCallbackTraits::SignatureType>,
+ ResolveType, RejectType, Resolved<ReturnedPromiseResolveT>,
+ Rejected<ReturnedPromiseRejectT>>>(),
+ internal::ToCallbackBase(std::move(on_resolve)),
+ internal::ToCallbackBase(std::move(on_reject)))));
}
// A task to execute |finally_callback| on |task_runner| is posted after the
@@ -405,8 +541,24 @@ class Promise {
template <typename FinallyCb>
auto FinallyHere(const Location& from_here,
FinallyCb finally_callback) noexcept {
- return FinallyOn(internal::GetCurrentSequence(), from_here,
- std::move(finally_callback));
+ // Extract properties from |finally_callback| callback.
+ using CallbackTraits = internal::CallbackTraits<FinallyCb>;
+ using ReturnedPromiseResolveT = typename CallbackTraits::ResolveType;
+ using ReturnedPromiseRejectT = typename CallbackTraits::RejectType;
+
+ using CallbackArgT = typename CallbackTraits::ArgType;
+ static_assert(std::is_void<CallbackArgT>::value,
+ "|finally_callback| callback must have no arguments");
+
+ return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>(
+ ConstructHereAbstractPromiseWithSinglePrerequisite(
+ from_here, abstract_promise_.get(),
+ internal::PromiseExecutor::Data(
+ in_place_type_t<internal::FinallyExecutor<
+ OnceCallback<typename CallbackTraits::ReturnType()>,
+ Resolved<ReturnedPromiseResolveT>,
+ Rejected<ReturnedPromiseRejectT>>>(),
+ internal::ToCallbackBase(std::move(finally_callback)))));
}
template <typename... Args>
@@ -442,8 +594,6 @@ class Promise {
nullptr, from_here, nullptr, RejectPolicy::kMustCatchRejection,
internal::DependentList::ConstructResolved(),
std::move(executor_data)));
- promise->emplace(in_place_type_t<Rejected<RejectType>>(),
- std::forward<Args>(args)...);
return Promise<ResolveType, RejectType>(std::move(promise));
}
@@ -454,6 +604,11 @@ class Promise {
abstract_promise_->IgnoreUncaughtCatchForTesting();
}
+ const scoped_refptr<internal::AbstractPromise>& GetScopedRefptrForTesting()
+ const {
+ return abstract_promise_;
+ }
+
private:
template <typename A, typename B>
friend class Promise;
@@ -471,8 +626,6 @@ class Promise {
template <typename A, typename B>
friend class ManualPromiseResolver;
-
- scoped_refptr<internal::AbstractPromise> abstract_promise_;
};
// Used for manually resolving and rejecting a Promise. This is for
@@ -624,8 +777,8 @@ class Promises {
std::vector<internal::DependentList::Node> prerequisite_list(
sizeof...(promises));
int i = 0;
- for (auto&& p : {promises.abstract_promise_...}) {
- prerequisite_list[i++].SetPrerequisite(p.get());
+ for (auto&& p : {promises.abstract_promise_.get()...}) {
+ prerequisite_list[i++].SetPrerequisite(p);
}
internal::PromiseExecutor::Data executor_data(
diff --git a/chromium/base/task/promise/promise_unittest.cc b/chromium/base/task/promise/promise_unittest.cc
index bb3dda41cb3..d81adacb1a0 100644
--- a/chromium/base/task/promise/promise_unittest.cc
+++ b/chromium/base/task/promise/promise_unittest.cc
@@ -106,6 +106,86 @@ TEST(PromiseMemoryLeakTest, TargetTaskRunnerClearsTasks) {
EXPECT_TRUE(delete_reply_flag);
}
+TEST(PromiseMemoryLeakTest, GetResolveCallbackNeverRun) {
+ test::TaskEnvironment task_environment_;
+ OnceCallback<void(int)> cb;
+ MockObject mock_object;
+ bool delete_task_flag = false;
+
+ {
+ ManualPromiseResolver<int> p(FROM_HERE);
+ cb = p.GetResolveCallback();
+
+ p.promise().ThenHere(
+ FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object),
+ MakeRefCounted<ObjectToDelete>(&delete_task_flag)));
+ }
+
+ EXPECT_FALSE(delete_task_flag);
+ cb = OnceCallback<void(int)>();
+ EXPECT_TRUE(delete_task_flag);
+}
+
+TEST(PromiseMemoryLeakTest, GetRepeatingResolveCallbackNeverRun) {
+ test::TaskEnvironment task_environment_;
+ RepeatingCallback<void(int)> cb;
+ MockObject mock_object;
+ bool delete_task_flag = false;
+
+ {
+ ManualPromiseResolver<int> p(FROM_HERE);
+ cb = p.GetRepeatingResolveCallback();
+
+ p.promise().ThenHere(
+ FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object),
+ MakeRefCounted<ObjectToDelete>(&delete_task_flag)));
+ }
+
+ EXPECT_FALSE(delete_task_flag);
+ cb = RepeatingCallback<void(int)>();
+ EXPECT_TRUE(delete_task_flag);
+}
+
+TEST(PromiseMemoryLeakTest, GetRejectCallbackNeverRun) {
+ test::TaskEnvironment task_environment_;
+ OnceCallback<void(int)> cb;
+ MockObject mock_object;
+ bool delete_task_flag = false;
+
+ {
+ ManualPromiseResolver<void, int> p(FROM_HERE);
+ cb = p.GetRejectCallback();
+
+ p.promise().CatchHere(
+ FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object),
+ MakeRefCounted<ObjectToDelete>(&delete_task_flag)));
+ }
+
+ EXPECT_FALSE(delete_task_flag);
+ cb = OnceCallback<void(int)>();
+ EXPECT_TRUE(delete_task_flag);
+}
+
+TEST(PromiseMemoryLeakTest, GetRepeatingRejectCallbackNeverRun) {
+ test::TaskEnvironment task_environment_;
+ RepeatingCallback<void(int)> cb;
+ MockObject mock_object;
+ bool delete_task_flag = false;
+
+ {
+ ManualPromiseResolver<void, int> p(FROM_HERE);
+ cb = p.GetRepeatingRejectCallback();
+
+ p.promise().CatchHere(
+ FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object),
+ MakeRefCounted<ObjectToDelete>(&delete_task_flag)));
+ }
+
+ EXPECT_FALSE(delete_task_flag);
+ cb = RepeatingCallback<void(int)>();
+ EXPECT_TRUE(delete_task_flag);
+}
+
TEST_F(PromiseTest, GetResolveCallbackThen) {
ManualPromiseResolver<int> p(FROM_HERE);
p.GetResolveCallback().Run(123);
@@ -1602,13 +1682,17 @@ TEST_F(PromiseTest, MoveOnlyTypeMultipleCatchesNotAllowed) {
auto p = Promise<void, std::unique_ptr<int>>::CreateRejected(
FROM_HERE, std::make_unique<int>(123));
- p.CatchHere(FROM_HERE,
- BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); }));
+ auto r = p.CatchHere(
+ FROM_HERE, BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); }));
EXPECT_DCHECK_DEATH({
p.CatchHere(FROM_HERE,
BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); }));
});
+
+ // TODO(alexclarke): Temporary, remove when SequenceManager handles promises
+ // natively.
+ r.GetScopedRefptrForTesting()->OnCanceled();
#endif
}
diff --git a/chromium/base/task/promise/promise_value.h b/chromium/base/task/promise/promise_value.h
index 32ec758716d..3b1cfc43464 100644
--- a/chromium/base/task/promise/promise_value.h
+++ b/chromium/base/task/promise/promise_value.h
@@ -40,9 +40,6 @@ struct Resolved {
"Can't have Resolved<NoResolve>");
}
- Resolved(const Resolved& other) = default;
- Resolved(Resolved&& other) = default;
-
// Conversion constructor accepts any arguments except Resolved<T>.
template <
typename... Args,
@@ -75,9 +72,6 @@ struct Rejected {
"Can't have Rejected<NoReject>");
}
- Rejected(const Rejected& other) = default;
- Rejected(Rejected&& other) = default;
-
// Conversion constructor accepts any arguments except Rejected<T>.
template <
typename... Args,
diff --git a/chromium/base/task/promise/then_and_catch_executor.cc b/chromium/base/task/promise/then_and_catch_executor.cc
index 5f7f8f09ad0..5eb38e99b78 100644
--- a/chromium/base/task/promise/then_and_catch_executor.cc
+++ b/chromium/base/task/promise/then_and_catch_executor.cc
@@ -8,14 +8,14 @@ namespace base {
namespace internal {
bool ThenAndCatchExecutorCommon::IsCancelled() const {
- if (!resolve_callback_.is_null()) {
+ if (!then_callback_.is_null()) {
// If there is both a resolve and a reject executor they must be canceled
// at the same time.
- DCHECK(reject_callback_.is_null() ||
- reject_callback_.IsCancelled() == resolve_callback_.IsCancelled());
- return resolve_callback_.IsCancelled();
+ DCHECK(catch_callback_.is_null() ||
+ catch_callback_.IsCancelled() == then_callback_.IsCancelled());
+ return then_callback_.IsCancelled();
}
- return reject_callback_.IsCancelled();
+ return catch_callback_.IsCancelled();
}
void ThenAndCatchExecutorCommon::Execute(AbstractPromise* promise,
@@ -23,16 +23,16 @@ void ThenAndCatchExecutorCommon::Execute(AbstractPromise* promise,
ExecuteCallback execute_catch) {
AbstractPromise* prerequisite = promise->GetOnlyPrerequisite();
if (prerequisite->IsResolved()) {
- if (ProcessNullCallback(resolve_callback_, prerequisite, promise))
+ if (ProcessNullCallback(then_callback_, prerequisite, promise))
return;
- execute_then(prerequisite, promise, &resolve_callback_);
+ execute_then(prerequisite, promise, &then_callback_);
} else {
DCHECK(prerequisite->IsRejected());
- if (ProcessNullCallback(reject_callback_, prerequisite, promise))
+ if (ProcessNullCallback(catch_callback_, prerequisite, promise))
return;
- execute_catch(prerequisite, promise, &reject_callback_);
+ execute_catch(prerequisite, promise, &catch_callback_);
}
}
diff --git a/chromium/base/task/promise/then_and_catch_executor.h b/chromium/base/task/promise/then_and_catch_executor.h
index 56d30ba2a11..7668fc1fbcb 100644
--- a/chromium/base/task/promise/then_and_catch_executor.h
+++ b/chromium/base/task/promise/then_and_catch_executor.h
@@ -17,11 +17,11 @@ namespace internal {
// Exists to reduce template bloat.
class BASE_EXPORT ThenAndCatchExecutorCommon {
public:
- ThenAndCatchExecutorCommon(internal::CallbackBase&& resolve_executor,
- internal::CallbackBase&& reject_executor) noexcept
- : resolve_callback_(std::move(resolve_executor)),
- reject_callback_(std::move(reject_executor)) {
- DCHECK(!resolve_callback_.is_null() || !reject_callback_.is_null());
+ ThenAndCatchExecutorCommon(internal::CallbackBase&& then_callback,
+ internal::CallbackBase&& catch_callback) noexcept
+ : then_callback_(std::move(then_callback)),
+ catch_callback_(std::move(catch_callback)) {
+ DCHECK(!then_callback_.is_null() || !catch_callback_.is_null());
}
~ThenAndCatchExecutorCommon() = default;
@@ -44,24 +44,23 @@ class BASE_EXPORT ThenAndCatchExecutorCommon {
AbstractPromise* arg,
AbstractPromise* result);
- CallbackBase resolve_callback_;
- CallbackBase reject_callback_;
+ CallbackBase then_callback_;
+ CallbackBase catch_callback_;
};
// Tag signals no callback which is used to eliminate dead code.
struct NoCallback {};
-template <typename ResolveOnceCallback,
- typename RejectOnceCallback,
+template <typename ThenOnceCallback,
+ typename CatchOnceCallback,
typename ArgResolve,
typename ArgReject,
typename ResolveStorage,
typename RejectStorage>
class ThenAndCatchExecutor {
public:
- using ResolveReturnT =
- typename CallbackTraits<ResolveOnceCallback>::ReturnType;
- using RejectReturnT = typename CallbackTraits<RejectOnceCallback>::ReturnType;
+ using ThenReturnT = typename CallbackTraits<ThenOnceCallback>::ReturnType;
+ using CatchReturnT = typename CallbackTraits<CatchOnceCallback>::ReturnType;
using PrerequisiteCouldResolve =
std::integral_constant<bool,
!std::is_same<ArgResolve, NoCallback>::value>;
@@ -69,8 +68,8 @@ class ThenAndCatchExecutor {
std::integral_constant<bool, !std::is_same<ArgReject, NoCallback>::value>;
ThenAndCatchExecutor(CallbackBase&& resolve_callback,
- CallbackBase&& reject_callback) noexcept
- : common_(std::move(resolve_callback), std::move(reject_callback)) {}
+ CallbackBase&& catch_callback) noexcept
+ : common_(std::move(resolve_callback), std::move(catch_callback)) {}
bool IsCancelled() const { return common_.IsCancelled(); }
@@ -85,29 +84,29 @@ class ThenAndCatchExecutor {
#if DCHECK_IS_ON()
PromiseExecutor::ArgumentPassingType ResolveArgumentPassingType() const {
- return common_.resolve_callback_.is_null()
+ return common_.then_callback_.is_null()
? PromiseExecutor::ArgumentPassingType::kNoCallback
- : CallbackTraits<ResolveOnceCallback>::argument_passing_type;
+ : CallbackTraits<ThenOnceCallback>::argument_passing_type;
}
PromiseExecutor::ArgumentPassingType RejectArgumentPassingType() const {
- return common_.reject_callback_.is_null()
+ return common_.catch_callback_.is_null()
? PromiseExecutor::ArgumentPassingType::kNoCallback
- : CallbackTraits<RejectOnceCallback>::argument_passing_type;
+ : CallbackTraits<CatchOnceCallback>::argument_passing_type;
}
bool CanResolve() const {
- return (!common_.resolve_callback_.is_null() &&
- PromiseCallbackTraits<ResolveReturnT>::could_resolve) ||
- (!common_.reject_callback_.is_null() &&
- PromiseCallbackTraits<RejectReturnT>::could_resolve);
+ return (!common_.then_callback_.is_null() &&
+ PromiseCallbackTraits<ThenReturnT>::could_resolve) ||
+ (!common_.catch_callback_.is_null() &&
+ PromiseCallbackTraits<CatchReturnT>::could_resolve);
}
bool CanReject() const {
- return (!common_.resolve_callback_.is_null() &&
- PromiseCallbackTraits<ResolveReturnT>::could_reject) ||
- (!common_.reject_callback_.is_null() &&
- PromiseCallbackTraits<RejectReturnT>::could_reject);
+ return (!common_.then_callback_.is_null() &&
+ PromiseCallbackTraits<ThenReturnT>::could_reject) ||
+ (!common_.catch_callback_.is_null() &&
+ PromiseCallbackTraits<CatchReturnT>::could_reject);
}
#endif
@@ -121,8 +120,8 @@ class ThenAndCatchExecutor {
static void ExecuteCatch(AbstractPromise* prerequisite,
AbstractPromise* promise,
- CallbackBase* reject_callback) {
- ExecuteCatchInternal(prerequisite, promise, reject_callback,
+ CallbackBase* catch_callback) {
+ ExecuteCatchInternal(prerequisite, promise, catch_callback,
PrerequisiteCouldReject());
}
@@ -130,10 +129,16 @@ class ThenAndCatchExecutor {
AbstractPromise* promise,
CallbackBase* resolve_callback,
std::true_type can_resolve) {
- RunHelper<ResolveOnceCallback, Resolved<ArgResolve>, ResolveStorage,
- RejectStorage>::
- Run(std::move(*static_cast<ResolveOnceCallback*>(resolve_callback)),
- prerequisite, promise);
+ // Internally RunHelper uses const RepeatingCallback<>& to avoid the
+ // binary size overhead of moving a scoped_refptr<> about. We respect
+ // the onceness of the callback and RunHelper will overwrite the callback
+ // with the result.
+ using RepeatingThenCB =
+ typename ToRepeatingCallback<ThenOnceCallback>::value;
+ RunHelper<
+ RepeatingThenCB, Resolved<ArgResolve>, ResolveStorage,
+ RejectStorage>::Run(*static_cast<RepeatingThenCB*>(resolve_callback),
+ prerequisite, promise);
}
static void ExecuteThenInternal(AbstractPromise* prerequisite,
@@ -145,17 +150,23 @@ class ThenAndCatchExecutor {
static void ExecuteCatchInternal(AbstractPromise* prerequisite,
AbstractPromise* promise,
- CallbackBase* reject_callback,
+ CallbackBase* catch_callback,
std::true_type can_reject) {
- RunHelper<RejectOnceCallback, Rejected<ArgReject>, ResolveStorage,
- RejectStorage>::
- Run(std::move(*static_cast<RejectOnceCallback*>(reject_callback)),
- prerequisite, promise);
+ // Internally RunHelper uses const RepeatingCallback<>& to avoid the
+ // binary size overhead of moving a scoped_refptr<> about. We respect
+ // the onceness of the callback and RunHelper will overwrite the callback
+ // with the result.
+ using RepeatingCatchCB =
+ typename ToRepeatingCallback<CatchOnceCallback>::value;
+ RunHelper<
+ RepeatingCatchCB, Rejected<ArgReject>, ResolveStorage,
+ RejectStorage>::Run(*static_cast<RepeatingCatchCB*>(catch_callback),
+ prerequisite, promise);
}
static void ExecuteCatchInternal(AbstractPromise* prerequisite,
AbstractPromise* promise,
- CallbackBase* reject_callback,
+ CallbackBase* catch_callback,
std::false_type can_reject) {
// |prerequisite| can't reject so don't generate dead code.
}
diff --git a/chromium/base/task/sequence_manager/sequence_manager.h b/chromium/base/task/sequence_manager/sequence_manager.h
index cd2163d7793..6e1cb0b752b 100644
--- a/chromium/base/task/sequence_manager/sequence_manager.h
+++ b/chromium/base/task/sequence_manager/sequence_manager.h
@@ -11,6 +11,7 @@
#include "base/macros.h"
#include "base/message_loop/message_pump_type.h"
#include "base/message_loop/timer_slack.h"
+#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/task/sequence_manager/task_queue_impl.h"
#include "base/task/sequence_manager/task_time_observer.h"
@@ -19,6 +20,7 @@
namespace base {
class MessagePump;
+class TaskObserver;
namespace sequence_manager {
@@ -102,6 +104,11 @@ class BASE_EXPORT SequenceManager {
kNone,
kEnabled,
kEnabledWithBacktrace,
+
+ // Logs high priority tasks and the lower priority tasks they skipped
+ // past. Useful for debugging test failures caused by scheduler policy
+ // changes.
+ kReorderedOnly,
};
TaskLogging task_execution_logging = TaskLogging::kNone;
@@ -141,6 +148,10 @@ class BASE_EXPORT SequenceManager {
// performs this initialization automatically.
virtual void BindToCurrentThread() = 0;
+ // Returns the task runner the current task was posted on. Returns null if no
+ // task is currently running. Must be called on the bound thread.
+ virtual scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() = 0;
+
// Finishes the initialization for a SequenceManager created via
// CreateUnboundSequenceManagerWithPump(). Must not be called in any other
// circumstances. The ownership of the pump is transferred to SequenceManager.
@@ -238,6 +249,14 @@ class BASE_EXPORT SequenceManager {
virtual std::unique_ptr<NativeWorkHandle> OnNativeWorkPending(
TaskQueue::QueuePriority priority) = 0;
+ // Adds an observer which reports task execution. Can only be called on the
+ // same thread that |this| is running on.
+ virtual void AddTaskObserver(TaskObserver* task_observer) = 0;
+
+ // Removes an observer which reports task execution. Can only be called on the
+ // same thread that |this| is running on.
+ virtual void RemoveTaskObserver(TaskObserver* task_observer) = 0;
+
protected:
virtual std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
const TaskQueue::Spec& spec) = 0;
diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl.cc b/chromium/base/task/sequence_manager/sequence_manager_impl.cc
index 26977d6687b..7eea742aeb2 100644
--- a/chromium/base/task/sequence_manager/sequence_manager_impl.cc
+++ b/chromium/base/task/sequence_manager/sequence_manager_impl.cc
@@ -320,6 +320,16 @@ void SequenceManagerImpl::BindToCurrentThread(
BindToMessagePump(std::move(pump));
}
+scoped_refptr<SequencedTaskRunner>
+SequenceManagerImpl::GetTaskRunnerForCurrentTask() {
+ DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
+ if (main_thread_only().task_execution_stack.empty())
+ return nullptr;
+ return main_thread_only()
+ .task_execution_stack.back()
+ .pending_task.task_runner;
+}
+
void SequenceManagerImpl::CompleteInitializationOnBoundThread() {
controller_->AddNestingObserver(this);
main_thread_only().nesting_observer_registered_ = true;
@@ -488,10 +498,10 @@ const char* RunTaskTraceNameForPriority(TaskQueue::QueuePriority priority) {
} // namespace
-Optional<Task> SequenceManagerImpl::TakeTask() {
- Optional<Task> task = TakeTaskImpl();
+Task* SequenceManagerImpl::SelectNextTask() {
+ Task* task = SelectNextTaskImpl();
if (!task)
- return base::nullopt;
+ return nullptr;
ExecutingTask& executing_task =
*main_thread_only().task_execution_stack.rbegin();
@@ -503,62 +513,70 @@ Optional<Task> SequenceManagerImpl::TakeTask() {
"task_type", executing_task.task_type);
TRACE_EVENT_BEGIN0("sequence_manager", executing_task.task_queue_name);
-#if DCHECK_IS_ON() && !defined(OS_NACL)
- LogTaskDebugInfo(executing_task);
-#endif
-
return task;
}
#if DCHECK_IS_ON() && !defined(OS_NACL)
void SequenceManagerImpl::LogTaskDebugInfo(
- const ExecutingTask& executing_task) {
+ const WorkQueue* selected_work_queue) const {
+ const Task* task = selected_work_queue->GetFrontTask();
switch (settings_.task_execution_logging) {
case Settings::TaskLogging::kNone:
break;
case Settings::TaskLogging::kEnabled:
- LOG(INFO) << "#"
- << static_cast<uint64_t>(
- executing_task.pending_task.enqueue_order())
- << " " << executing_task.task_queue_name
- << (executing_task.pending_task.cross_thread_
- ? " Run crossthread "
- : " Run ")
- << executing_task.pending_task.posted_from.ToString();
+ LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " "
+ << selected_work_queue->task_queue()->GetName()
+ << (task->cross_thread_ ? " Run crossthread " : " Run ")
+ << task->posted_from.ToString();
break;
case Settings::TaskLogging::kEnabledWithBacktrace: {
std::array<const void*, PendingTask::kTaskBacktraceLength + 1> task_trace;
- task_trace[0] = executing_task.pending_task.posted_from.program_counter();
- std::copy(executing_task.pending_task.task_backtrace.begin(),
- executing_task.pending_task.task_backtrace.end(),
+ task_trace[0] = task->posted_from.program_counter();
+ std::copy(task->task_backtrace.begin(), task->task_backtrace.end(),
task_trace.begin() + 1);
size_t length = 0;
while (length < task_trace.size() && task_trace[length])
++length;
if (length == 0)
break;
- LOG(INFO) << "#"
- << static_cast<uint64_t>(
- executing_task.pending_task.enqueue_order())
- << " " << executing_task.task_queue_name
- << (executing_task.pending_task.cross_thread_
- ? " Run crossthread "
- : " Run ")
+ LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " "
+ << selected_work_queue->task_queue()->GetName()
+ << (task->cross_thread_ ? " Run crossthread " : " Run ")
<< debug::StackTrace(task_trace.data(), length);
break;
}
+
+ case Settings::TaskLogging::kReorderedOnly: {
+ std::vector<const Task*> skipped_tasks;
+ main_thread_only().selector.CollectSkippedOverLowerPriorityTasks(
+ selected_work_queue, &skipped_tasks);
+
+ if (skipped_tasks.empty())
+ break;
+
+ LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " "
+ << selected_work_queue->task_queue()->GetName()
+ << (task->cross_thread_ ? " Run crossthread " : " Run ")
+ << task->posted_from.ToString();
+
+ for (const Task* skipped_task : skipped_tasks) {
+ LOG(INFO) << "# (skipped over) "
+ << static_cast<uint64_t>(skipped_task->enqueue_order()) << " "
+ << skipped_task->posted_from.ToString();
+ }
+ }
}
}
#endif // DCHECK_IS_ON() && !defined(OS_NACL)
-Optional<Task> SequenceManagerImpl::TakeTaskImpl() {
+Task* SequenceManagerImpl::SelectNextTaskImpl() {
CHECK(Validate());
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
- "SequenceManagerImpl::TakeTask");
+ "SequenceManagerImpl::SelectNextTask");
ReloadEmptyWorkQueues();
LazyNow lazy_now(controller_->GetClock());
@@ -579,7 +597,7 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() {
this, AsValueWithSelectorResult(work_queue, /* force_verbose */ false));
if (!work_queue)
- return nullopt;
+ return nullptr;
// If the head task was canceled, remove it and run the selector again.
if (UNLIKELY(work_queue->RemoveAllCanceledTasksFromFront()))
@@ -604,9 +622,13 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() {
work_queue->task_queue()->GetQueuePriority()))) {
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
"SequenceManager.YieldToNative");
- return nullopt;
+ return nullptr;
}
+#if DCHECK_IS_ON() && !defined(OS_NACL)
+ LogTaskDebugInfo(work_queue);
+#endif // DCHECK_IS_ON() && !defined(OS_NACL)
+
main_thread_only().task_execution_stack.emplace_back(
work_queue->TakeTaskFromWorkQueue(), work_queue->task_queue(),
InitializeTaskTiming(work_queue->task_queue()));
@@ -615,7 +637,7 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() {
*main_thread_only().task_execution_stack.rbegin();
NotifyWillProcessTask(&executing_task, &lazy_now);
- return std::move(executing_task.pending_task);
+ return &executing_task.pending_task;
}
}
diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl.h b/chromium/base/task/sequence_manager/sequence_manager_impl.h
index a8c1659f52a..1ee7944a06e 100644
--- a/chromium/base/task/sequence_manager/sequence_manager_impl.h
+++ b/chromium/base/task/sequence_manager/sequence_manager_impl.h
@@ -25,6 +25,7 @@
#include "base/message_loop/message_pump_type.h"
#include "base/pending_task.h"
#include "base/run_loop.h"
+#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/task/common/task_annotator.h"
@@ -41,8 +42,6 @@
namespace base {
-class TaskObserver;
-
namespace trace_event {
class ConvertableToTraceFormat;
} // namespace trace_event
@@ -103,6 +102,7 @@ class BASE_EXPORT SequenceManagerImpl
// SequenceManager implementation:
void BindToCurrentThread() override;
+ scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override;
void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override;
void SetObserver(Observer* observer) override;
void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
@@ -126,16 +126,16 @@ class BASE_EXPORT SequenceManagerImpl
std::string DescribeAllPendingTasks() const override;
std::unique_ptr<NativeWorkHandle> OnNativeWorkPending(
TaskQueue::QueuePriority priority) override;
+ void AddTaskObserver(TaskObserver* task_observer) override;
+ void RemoveTaskObserver(TaskObserver* task_observer) override;
// SequencedTaskSource implementation:
- Optional<Task> TakeTask() override;
+ Task* SelectNextTask() override;
void DidRunTask() override;
TimeDelta DelayTillNextTask(LazyNow* lazy_now) const override;
bool HasPendingHighResolutionTasks() override;
bool OnSystemIdle() override;
- void AddTaskObserver(TaskObserver* task_observer);
- void RemoveTaskObserver(TaskObserver* task_observer);
void AddDestructionObserver(
MessageLoopCurrent::DestructionObserver* destruction_observer);
void RemoveDestructionObserver(
@@ -230,7 +230,8 @@ class BASE_EXPORT SequenceManagerImpl
// We have to track rentrancy because we support nested runloops but the
// selector interface is unaware of those. This struct keeps track off all
- // task related state needed to make pairs of TakeTask() / DidRunTask() work.
+ // task related state needed to make pairs of SelectNextTask() / DidRunTask()
+ // work.
struct ExecutingTask {
ExecutingTask(Task&& task,
internal::TaskQueueImpl* task_queue,
@@ -377,8 +378,8 @@ class BASE_EXPORT SequenceManagerImpl
void RecordCrashKeys(const PendingTask&);
// Helper to terminate all scoped trace events to allow starting new ones
- // in TakeTask().
- Optional<Task> TakeTaskImpl();
+ // in SelectNextTask().
+ Task* SelectNextTaskImpl();
// Check if a task of priority |priority| should run given the pending set of
// native work.
@@ -388,7 +389,7 @@ class BASE_EXPORT SequenceManagerImpl
TimeDelta GetDelayTillNextDelayedTask(LazyNow* lazy_now) const;
#if DCHECK_IS_ON()
- void LogTaskDebugInfo(const ExecutingTask& executing_task);
+ void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const;
#endif
// Determines if wall time or thread time should be recorded for the next
diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc b/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc
index cb2f91b8176..3e1bb5dc3db 100644
--- a/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc
+++ b/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc
@@ -566,6 +566,18 @@ class TestCountUsesTimeSource : public TickClock {
DISALLOW_COPY_AND_ASSIGN(TestCountUsesTimeSource);
};
+TEST_P(SequenceManagerTest, GetCorrectTaskRunnerForCurrentTask) {
+ auto queue = CreateTaskQueue();
+
+ queue->task_runner()->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_EQ(queue->task_runner(),
+ sequence_manager()->GetTaskRunnerForCurrentTask());
+ }));
+
+ RunLoop().RunUntilIdle();
+}
+
TEST_P(SequenceManagerTest, NowNotCalledIfUnneeded) {
sequence_manager()->SetWorkBatchSize(6);
@@ -2356,9 +2368,9 @@ TEST_P(SequenceManagerTest, TaskQueueObserver_ImmediateTask) {
Mock::VerifyAndClearExpectations(&observer);
// Unless the immediate work queue is emptied.
- sequence_manager()->TakeTask();
+ sequence_manager()->SelectNextTask();
sequence_manager()->DidRunTask();
- sequence_manager()->TakeTask();
+ sequence_manager()->SelectNextTask();
sequence_manager()->DidRunTask();
EXPECT_CALL(observer, OnPostTask(_, TimeDelta()));
EXPECT_CALL(observer, OnQueueNextWakeUpChanged(_));
diff --git a/chromium/base/task/sequence_manager/sequenced_task_source.h b/chromium/base/task/sequence_manager/sequenced_task_source.h
index b1153fb32e8..5ea8874ab5e 100644
--- a/chromium/base/task/sequence_manager/sequenced_task_source.h
+++ b/chromium/base/task/sequence_manager/sequenced_task_source.h
@@ -19,13 +19,13 @@ class SequencedTaskSource {
public:
virtual ~SequencedTaskSource() = default;
- // Returns the next task to run from this source or nullopt if
+ // Returns the next task to run from this source or nullptr if
// there're no more tasks ready to run. If a task is returned,
- // DidRunTask() must be invoked before the next call to TakeTask().
- virtual Optional<Task> TakeTask() = 0;
+ // DidRunTask() must be invoked before the next call to SelectNextTask().
+ virtual Task* SelectNextTask() = 0;
// Notifies this source that the task previously obtained
- // from TakeTask() has been completed.
+ // from SelectNextTask() has been completed.
virtual void DidRunTask() = 0;
// Returns the delay till the next task or TimeDelta::Max()
diff --git a/chromium/base/task/sequence_manager/task_queue_impl.cc b/chromium/base/task/sequence_manager/task_queue_impl.cc
index d109f915ded..7c22f9c1121 100644
--- a/chromium/base/task/sequence_manager/task_queue_impl.cc
+++ b/chromium/base/task/sequence_manager/task_queue_impl.cc
@@ -78,16 +78,18 @@ TaskQueueImpl::TaskRunner::~TaskRunner() {}
bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
OnceClosure callback,
TimeDelta delay) {
- return task_poster_->PostTask(PostedTask(std::move(callback), location, delay,
- Nestable::kNestable, task_type_));
+ return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
+ delay, Nestable::kNestable,
+ task_type_));
}
bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
const Location& location,
OnceClosure callback,
TimeDelta delay) {
- return task_poster_->PostTask(PostedTask(std::move(callback), location, delay,
- Nestable::kNonNestable, task_type_));
+ return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
+ delay, Nestable::kNonNestable,
+ task_type_));
}
bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
@@ -423,8 +425,10 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
#endif
// TODO(altimin): Add a copy method to Task to capture metadata here.
+ auto task_runner = pending_task.task_runner;
PostImmediateTaskImpl(
- PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
+ PostedTask(std::move(task_runner),
+ BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
Unretained(this), std::move(pending_task)),
FROM_HERE, TimeDelta(), Nestable::kNonNestable,
pending_task.task_type),
diff --git a/chromium/base/task/sequence_manager/task_queue_selector.cc b/chromium/base/task/sequence_manager/task_queue_selector.cc
index 5e3a14a8e80..cb58d9b4f3a 100644
--- a/chromium/base/task/sequence_manager/task_queue_selector.cc
+++ b/chromium/base/task/sequence_manager/task_queue_selector.cc
@@ -159,6 +159,15 @@ void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
}
}
+void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
+ const internal::WorkQueue* selected_work_queue,
+ std::vector<const Task*>* result) const {
+ delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
+ selected_work_queue, result);
+ immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
+ selected_work_queue, result);
+}
+
#if DCHECK_IS_ON() || !defined(NDEBUG)
bool TaskQueueSelector::CheckContainsQueueForTest(
const internal::TaskQueueImpl* queue) const {
diff --git a/chromium/base/task/sequence_manager/task_queue_selector.h b/chromium/base/task/sequence_manager/task_queue_selector.h
index 2ae9b52ef05..8a79a5e52bd 100644
--- a/chromium/base/task/sequence_manager/task_queue_selector.h
+++ b/chromium/base/task/sequence_manager/task_queue_selector.h
@@ -76,6 +76,12 @@ class BASE_EXPORT TaskQueueSelector : public WorkQueueSets::Observer {
void WorkQueueSetBecameEmpty(size_t set_index) override;
void WorkQueueSetBecameNonEmpty(size_t set_index) override;
+ // Populates |result| with tasks with lower priority than the first task from
+ // |selected_work_queue| which could otherwise run now.
+ void CollectSkippedOverLowerPriorityTasks(
+ const internal::WorkQueue* selected_work_queue,
+ std::vector<const Task*>* result) const;
+
protected:
WorkQueueSets* delayed_work_queue_sets() { return &delayed_work_queue_sets_; }
diff --git a/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc b/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc
index c99366471ba..504477048dd 100644
--- a/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc
+++ b/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc
@@ -74,7 +74,7 @@ class TaskQueueSelectorTestBase : public testing::Test {
EnqueueOrderGenerator enqueue_order_generator;
for (size_t i = 0; i < num_tasks; i++) {
task_queues_[queue_indices[i]]->immediate_work_queue()->Push(
- Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(),
+ Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
EnqueueOrder(), enqueue_order_generator.GenerateNext()));
}
}
@@ -84,15 +84,15 @@ class TaskQueueSelectorTestBase : public testing::Test {
size_t num_tasks) {
for (size_t i = 0; i < num_tasks; i++) {
task_queues_[queue_indices[i]]->immediate_work_queue()->Push(Task(
- PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(enqueue_orders[i])));
+ PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
+ EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_orders[i])));
}
}
void PushTask(const size_t queue_index, const size_t enqueue_order) {
task_queues_[queue_index]->immediate_work_queue()->Push(
- Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(enqueue_order)));
+ Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
+ EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order)));
}
std::vector<size_t> PopTasksAndReturnQueueIndices() {
@@ -666,8 +666,8 @@ TEST_F(TaskQueueSelectorTest, ChooseWithPriority_Empty) {
TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyDelayed) {
task_queues_[0]->delayed_work_queue()->Push(
- Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(2)));
+ Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
+ EnqueueOrder(), EnqueueOrder::FromIntForTesting(2)));
bool chose_delayed_over_immediate = false;
EXPECT_EQ(
@@ -680,8 +680,8 @@ TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyDelayed) {
TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyImmediate) {
task_queues_[0]->immediate_work_queue()->Push(
- Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(2)));
+ Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
+ EnqueueOrder(), EnqueueOrder::FromIntForTesting(2)));
bool chose_delayed_over_immediate = false;
EXPECT_EQ(
@@ -706,8 +706,8 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithOneBlockedQueue) {
task_queue->SetQueueEnabled(false);
selector.DisableQueue(task_queue.get());
- Task task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(2));
+ Task task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
+ EnqueueOrder(), EnqueueOrder::FromIntForTesting(2));
task_queue->immediate_work_queue()->Push(std::move(task));
EXPECT_EQ(nullptr, selector.SelectWorkQueueToService());
@@ -736,10 +736,10 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithTwoBlockedQueues) {
selector.SetQueuePriority(task_queue2.get(), TaskQueue::kControlPriority);
- Task task1(PostedTask(test_closure_, FROM_HERE), TimeTicks(),
+ Task task1(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
EnqueueOrder::FromIntForTesting(2),
EnqueueOrder::FromIntForTesting(2));
- Task task2(PostedTask(test_closure_, FROM_HERE), TimeTicks(),
+ Task task2(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
EnqueueOrder::FromIntForTesting(3),
EnqueueOrder::FromIntForTesting(3));
task_queue->immediate_work_queue()->Push(std::move(task1));
@@ -762,6 +762,21 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithTwoBlockedQueues) {
task_queue2->UnregisterTaskQueue();
}
+TEST_F(TaskQueueSelectorTest, CollectSkippedOverLowerPriorityTasks) {
+ size_t queue_order[] = {0, 1, 2, 3, 2, 1, 0};
+ PushTasks(queue_order, 7);
+ selector_.SetQueuePriority(task_queues_[3].get(), TaskQueue::kHighPriority);
+
+ std::vector<const Task*> result;
+ selector_.CollectSkippedOverLowerPriorityTasks(
+ task_queues_[3]->immediate_work_queue(), &result);
+
+ ASSERT_EQ(3u, result.size());
+ EXPECT_EQ(2u, result[0]->enqueue_order()); // The order here isn't important.
+ EXPECT_EQ(3u, result[1]->enqueue_order());
+ EXPECT_EQ(4u, result[2]->enqueue_order());
+}
+
class DisabledAntiStarvationLogicTaskQueueSelectorTest
: public TaskQueueSelectorTestBase,
public testing::WithParamInterface<TaskQueue::QueuePriority> {
@@ -839,13 +854,13 @@ class ChooseWithPriorityTest
TEST_P(ChooseWithPriorityTest, RoundRobinTest) {
task_queues_[0]->immediate_work_queue()->Push(Task(
- PostedTask(test_closure_, FROM_HERE), TimeTicks(),
+ PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
EnqueueOrder::FromIntForTesting(GetParam().immediate_task_enqueue_order),
EnqueueOrder::FromIntForTesting(
GetParam().immediate_task_enqueue_order)));
task_queues_[0]->delayed_work_queue()->Push(Task(
- PostedTask(test_closure_, FROM_HERE), TimeTicks(),
+ PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(),
EnqueueOrder::FromIntForTesting(GetParam().delayed_task_enqueue_order),
EnqueueOrder::FromIntForTesting(GetParam().delayed_task_enqueue_order)));
diff --git a/chromium/base/task/sequence_manager/tasks.cc b/chromium/base/task/sequence_manager/tasks.cc
index 14bd306b947..a3bd5ce60f6 100644
--- a/chromium/base/task/sequence_manager/tasks.cc
+++ b/chromium/base/task/sequence_manager/tasks.cc
@@ -17,6 +17,7 @@ Task::Task(internal::PostedTask posted_task,
desired_run_time,
posted_task.nestable),
task_type(posted_task.task_type),
+ task_runner(std::move(posted_task.task_runner)),
enqueue_order_(enqueue_order) {
// We use |sequence_num| in DelayedWakeUp for ordering purposes and it
// may wrap around to a negative number during the static cast, hence,
@@ -28,9 +29,15 @@ Task::Task(internal::PostedTask posted_task,
queue_time = posted_task.queue_time;
}
-namespace internal {
+Task::Task(Task&& move_from) = default;
+
+Task::~Task() = default;
-PostedTask::PostedTask(OnceClosure callback,
+Task& Task::operator=(Task&& other) = default;
+
+namespace internal {
+PostedTask::PostedTask(scoped_refptr<SequencedTaskRunner> task_runner,
+ OnceClosure callback,
Location location,
TimeDelta delay,
Nestable nestable,
@@ -39,7 +46,8 @@ PostedTask::PostedTask(OnceClosure callback,
location(location),
delay(delay),
nestable(nestable),
- task_type(task_type) {}
+ task_type(task_type),
+ task_runner(std::move(task_runner)) {}
PostedTask::PostedTask(PostedTask&& move_from) noexcept
: callback(std::move(move_from.callback)),
@@ -47,6 +55,7 @@ PostedTask::PostedTask(PostedTask&& move_from) noexcept
delay(move_from.delay),
nestable(move_from.nestable),
task_type(move_from.task_type),
+ task_runner(std::move(move_from.task_runner)),
queue_time(move_from.queue_time) {}
PostedTask::~PostedTask() = default;
diff --git a/chromium/base/task/sequence_manager/tasks.h b/chromium/base/task/sequence_manager/tasks.h
index 0c886c4b9ca..d1e1912ead2 100644
--- a/chromium/base/task/sequence_manager/tasks.h
+++ b/chromium/base/task/sequence_manager/tasks.h
@@ -6,6 +6,7 @@
#define BASE_TASK_SEQUENCE_MANAGER_TASKS_H_
#include "base/pending_task.h"
+#include "base/sequenced_task_runner.h"
#include "base/task/sequence_manager/enqueue_order.h"
namespace base {
@@ -21,7 +22,8 @@ enum class WakeUpResolution { kLow, kHigh };
// Wrapper around PostTask method arguments and the assigned task type.
// Eventually it becomes a PendingTask once accepted by a TaskQueueImpl.
struct BASE_EXPORT PostedTask {
- explicit PostedTask(OnceClosure callback = OnceClosure(),
+ explicit PostedTask(scoped_refptr<SequencedTaskRunner> task_runner,
+ OnceClosure callback = OnceClosure(),
Location location = Location(),
TimeDelta delay = TimeDelta(),
Nestable nestable = Nestable::kNestable,
@@ -34,6 +36,9 @@ struct BASE_EXPORT PostedTask {
TimeDelta delay;
Nestable nestable;
TaskType task_type;
+ // The task runner this task is running on. Can be used by task runners that
+ // support posting back to the "current sequence".
+ scoped_refptr<SequencedTaskRunner> task_runner;
// The time at which the task was queued.
TimeTicks queue_time;
@@ -76,6 +81,9 @@ struct BASE_EXPORT Task : public PendingTask {
EnqueueOrder enqueue_order = EnqueueOrder(),
internal::WakeUpResolution wake_up_resolution =
internal::WakeUpResolution::kLow);
+ Task(Task&& move_from);
+ ~Task();
+ Task& operator=(Task&& other);
internal::DelayedWakeUp delayed_wake_up() const {
return internal::DelayedWakeUp{delayed_run_time, sequence_num};
@@ -97,6 +105,10 @@ struct BASE_EXPORT Task : public PendingTask {
TaskType task_type;
+ // The task runner this task is running on. Can be used by task runners that
+ // support posting back to the "current sequence".
+ scoped_refptr<SequencedTaskRunner> task_runner;
+
#if DCHECK_IS_ON()
bool cross_thread_;
#endif
diff --git a/chromium/base/task/sequence_manager/thread_controller_impl.cc b/chromium/base/task/sequence_manager/thread_controller_impl.cc
index 22bffdb8a5b..676b9a6708d 100644
--- a/chromium/base/task/sequence_manager/thread_controller_impl.cc
+++ b/chromium/base/task/sequence_manager/thread_controller_impl.cc
@@ -174,7 +174,7 @@ void ThreadControllerImpl::DoWork(WorkType work_type) {
WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr();
// TODO(scheduler-dev): Consider moving to a time based work batch instead.
for (int i = 0; i < main_sequence_only().work_batch_size_; i++) {
- Optional<PendingTask> task = sequence_->TakeTask();
+ Task* task = sequence_->SelectNextTask();
if (!task)
break;
@@ -189,7 +189,7 @@ void ThreadControllerImpl::DoWork(WorkType work_type) {
// Trace events should finish before we call DidRunTask to ensure that
// SequenceManager trace events do not interfere with them.
TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task);
- task_annotator_.RunTask("SequenceManager RunTask", &*task);
+ task_annotator_.RunTask("SequenceManager RunTask", task);
}
if (!weak_ptr)
diff --git a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc
index deeda0e4260..9b0a058efbc 100644
--- a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc
+++ b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc
@@ -342,7 +342,7 @@ TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl(
DCHECK(main_thread_only().task_source);
for (int i = 0; i < main_thread_only().work_batch_size; i++) {
- Optional<Task> task = main_thread_only().task_source->TakeTask();
+ Task* task = main_thread_only().task_source->SelectNextTask();
if (!task)
break;
@@ -362,7 +362,7 @@ TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl(
// Trace events should finish before we call DidRunTask to ensure that
// SequenceManager trace events do not interfere with them.
TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task);
- task_annotator_.RunTask("SequenceManager RunTask", &*task);
+ task_annotator_.RunTask("SequenceManager RunTask", task);
}
#if DCHECK_IS_ON()
diff --git a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc
index 64eb8f7e74b..3c02becf16d 100644
--- a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc
+++ b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc
@@ -79,17 +79,17 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource {
explicit FakeSequencedTaskSource(TickClock* clock) : clock_(clock) {}
~FakeSequencedTaskSource() override = default;
- Optional<Task> TakeTask() override {
+ Task* SelectNextTask() override {
if (tasks_.empty())
- return nullopt;
+ return nullptr;
if (tasks_.front().delayed_run_time > clock_->NowTicks())
- return nullopt;
- Task task = std::move(tasks_.front());
+ return nullptr;
+ running_stack_.push_back(std::move(tasks_.front()));
tasks_.pop();
- return task;
+ return &running_stack_.back();
}
- void DidRunTask() override {}
+ void DidRunTask() override { running_stack_.pop_back(); }
TimeDelta DelayTillNextTask(LazyNow* lazy_now) const override {
if (tasks_.empty())
@@ -106,8 +106,9 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource {
TimeTicks delayed_run_time) {
DCHECK(tasks_.empty() || delayed_run_time.is_null() ||
tasks_.back().delayed_run_time < delayed_run_time);
- tasks_.push(Task(internal::PostedTask(std::move(task), posted_from),
- delayed_run_time, EnqueueOrder::FromIntForTesting(13)));
+ tasks_.push(
+ Task(internal::PostedTask(nullptr, std::move(task), posted_from),
+ delayed_run_time, EnqueueOrder::FromIntForTesting(13)));
}
bool HasPendingHighResolutionTasks() override { return false; }
@@ -117,6 +118,7 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource {
private:
TickClock* clock_;
std::queue<Task> tasks_;
+ std::vector<Task> running_stack_;
};
TimeTicks Seconds(int seconds) {
diff --git a/chromium/base/task/sequence_manager/work_queue.cc b/chromium/base/task/sequence_manager/work_queue.cc
index 0d9df4a150a..2dfd04da230 100644
--- a/chromium/base/task/sequence_manager/work_queue.cc
+++ b/chromium/base/task/sequence_manager/work_queue.cc
@@ -305,6 +305,16 @@ void WorkQueue::PopTaskForTesting() {
tasks_.pop_front();
}
+void WorkQueue::CollectTasksOlderThan(EnqueueOrder reference,
+ std::vector<const Task*>* result) const {
+ for (const Task& task : tasks_) {
+ if (task.enqueue_order() >= reference)
+ break;
+
+ result->push_back(&task);
+ }
+}
+
} // namespace internal
} // namespace sequence_manager
} // namespace base
diff --git a/chromium/base/task/sequence_manager/work_queue.h b/chromium/base/task/sequence_manager/work_queue.h
index 849d48cac5d..65fdee4ca28 100644
--- a/chromium/base/task/sequence_manager/work_queue.h
+++ b/chromium/base/task/sequence_manager/work_queue.h
@@ -161,6 +161,11 @@ class BASE_EXPORT WorkQueue {
// Test support function. This should not be used in production code.
void PopTaskForTesting();
+ // Iterates through |tasks_| adding any that are older than |reference| to
+ // |result|.
+ void CollectTasksOlderThan(EnqueueOrder reference,
+ std::vector<const Task*>* result) const;
+
private:
bool InsertFenceImpl(EnqueueOrder fence);
diff --git a/chromium/base/task/sequence_manager/work_queue_sets.cc b/chromium/base/task/sequence_manager/work_queue_sets.cc
index c2f9886271d..68ec9613338 100644
--- a/chromium/base/task/sequence_manager/work_queue_sets.cc
+++ b/chromium/base/task/sequence_manager/work_queue_sets.cc
@@ -237,6 +237,19 @@ bool WorkQueueSets::ContainsWorkQueueForTest(
}
#endif
+void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
+ const internal::WorkQueue* selected_work_queue,
+ std::vector<const Task*>* result) const {
+ EnqueueOrder selected_enqueue_order;
+ CHECK(selected_work_queue->GetFrontTaskEnqueueOrder(&selected_enqueue_order));
+ for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
+ priority < TaskQueue::kQueuePriorityCount; priority++) {
+ for (const OldestTaskEnqueueOrder& pair : work_queue_heaps_[priority]) {
+ pair.value->CollectTasksOlderThan(selected_enqueue_order, result);
+ }
+ }
+}
+
} // namespace internal
} // namespace sequence_manager
} // namespace base
diff --git a/chromium/base/task/sequence_manager/work_queue_sets.h b/chromium/base/task/sequence_manager/work_queue_sets.h
index 90cc6d789c1..f128c62c369 100644
--- a/chromium/base/task/sequence_manager/work_queue_sets.h
+++ b/chromium/base/task/sequence_manager/work_queue_sets.h
@@ -95,6 +95,12 @@ class BASE_EXPORT WorkQueueSets {
const char* GetName() const { return name_; }
+ // Collects ready tasks which where skipped over when |selected_work_queue|
+ // was selected. Note this is somewhat expensive.
+ void CollectSkippedOverLowerPriorityTasks(
+ const internal::WorkQueue* selected_work_queue,
+ std::vector<const Task*>* result) const;
+
private:
struct OldestTaskEnqueueOrder {
EnqueueOrder key;
diff --git a/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc b/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc
index 30f3bb8fa68..6450573d5b1 100644
--- a/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc
+++ b/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc
@@ -51,14 +51,14 @@ class WorkQueueSetsTest : public testing::Test {
}
Task FakeTaskWithEnqueueOrder(int enqueue_order) {
- Task fake_task(PostedTask(BindOnce([] {}), FROM_HERE), TimeTicks(),
+ Task fake_task(PostedTask(nullptr, BindOnce([] {}), FROM_HERE), TimeTicks(),
EnqueueOrder(),
EnqueueOrder::FromIntForTesting(enqueue_order));
return fake_task;
}
Task FakeNonNestableTaskWithEnqueueOrder(int enqueue_order) {
- Task fake_task(PostedTask(BindOnce([] {}), FROM_HERE), TimeTicks(),
+ Task fake_task(PostedTask(nullptr, BindOnce([] {}), FROM_HERE), TimeTicks(),
EnqueueOrder(),
EnqueueOrder::FromIntForTesting(enqueue_order));
fake_task.nestable = Nestable::kNonNestable;
@@ -331,6 +331,32 @@ TEST_F(WorkQueueSetsTest, PushNonNestableTaskToFront) {
EXPECT_EQ(queue1, work_queue_sets_->GetOldestQueueInSet(set));
}
+TEST_F(WorkQueueSetsTest, CollectSkippedOverLowerPriorityTasks) {
+ WorkQueue* queue1 = NewTaskQueue("queue1");
+ WorkQueue* queue2 = NewTaskQueue("queue2");
+ WorkQueue* queue3 = NewTaskQueue("queue3");
+
+ work_queue_sets_->ChangeSetIndex(queue1, 3);
+ work_queue_sets_->ChangeSetIndex(queue2, 2);
+ work_queue_sets_->ChangeSetIndex(queue3, 1);
+
+ queue1->Push(FakeTaskWithEnqueueOrder(1));
+ queue1->Push(FakeTaskWithEnqueueOrder(2));
+ queue2->Push(FakeTaskWithEnqueueOrder(3));
+ queue3->Push(FakeTaskWithEnqueueOrder(4));
+ queue3->Push(FakeTaskWithEnqueueOrder(5));
+ queue2->Push(FakeTaskWithEnqueueOrder(6));
+ queue1->Push(FakeTaskWithEnqueueOrder(7));
+
+ std::vector<const Task*> result;
+ work_queue_sets_->CollectSkippedOverLowerPriorityTasks(queue3, &result);
+
+ ASSERT_EQ(3u, result.size());
+ EXPECT_EQ(3u, result[0]->enqueue_order()); // The order here isn't important.
+ EXPECT_EQ(1u, result[1]->enqueue_order());
+ EXPECT_EQ(2u, result[2]->enqueue_order());
+}
+
} // namespace internal
} // namespace sequence_manager
} // namespace base
diff --git a/chromium/base/task/sequence_manager/work_queue_unittest.cc b/chromium/base/task/sequence_manager/work_queue_unittest.cc
index 721f60fda7d..1ab88bf5065 100644
--- a/chromium/base/task/sequence_manager/work_queue_unittest.cc
+++ b/chromium/base/task/sequence_manager/work_queue_unittest.cc
@@ -73,23 +73,23 @@ class WorkQueueTest : public testing::Test {
protected:
Task FakeCancelableTaskWithEnqueueOrder(int enqueue_order,
WeakPtr<Cancelable> weak_ptr) {
- Task fake_task(
- PostedTask(BindOnce(&Cancelable::NopTask, weak_ptr), FROM_HERE),
- TimeTicks(), EnqueueOrder(),
- EnqueueOrder::FromIntForTesting(enqueue_order));
+ Task fake_task(PostedTask(nullptr, BindOnce(&Cancelable::NopTask, weak_ptr),
+ FROM_HERE),
+ TimeTicks(), EnqueueOrder(),
+ EnqueueOrder::FromIntForTesting(enqueue_order));
return fake_task;
}
Task FakeTaskWithEnqueueOrder(int enqueue_order) {
- Task fake_task(PostedTask(BindOnce(&NopTask), FROM_HERE), TimeTicks(),
- EnqueueOrder(),
+ Task fake_task(PostedTask(nullptr, BindOnce(&NopTask), FROM_HERE),
+ TimeTicks(), EnqueueOrder(),
EnqueueOrder::FromIntForTesting(enqueue_order));
return fake_task;
}
Task FakeNonNestableTaskWithEnqueueOrder(int enqueue_order) {
- Task fake_task(PostedTask(BindOnce(&NopTask), FROM_HERE), TimeTicks(),
- EnqueueOrder(),
+ Task fake_task(PostedTask(nullptr, BindOnce(&NopTask), FROM_HERE),
+ TimeTicks(), EnqueueOrder(),
EnqueueOrder::FromIntForTesting(enqueue_order));
fake_task.nestable = Nestable::kNonNestable;
return fake_task;
@@ -555,6 +555,20 @@ TEST_F(WorkQueueTest, RemoveAllCanceledTasksFromFrontQueueBlockedByFence) {
EXPECT_FALSE(work_queue_->GetFrontTaskEnqueueOrder(&enqueue_order));
}
+TEST_F(WorkQueueTest, CollectTasksOlderThan) {
+ work_queue_->Push(FakeTaskWithEnqueueOrder(2));
+ work_queue_->Push(FakeTaskWithEnqueueOrder(3));
+ work_queue_->Push(FakeTaskWithEnqueueOrder(4));
+
+ std::vector<const Task*> result;
+ work_queue_->CollectTasksOlderThan(EnqueueOrder::FromIntForTesting(4),
+ &result);
+
+ ASSERT_EQ(2u, result.size());
+ EXPECT_EQ(2u, result[0]->enqueue_order());
+ EXPECT_EQ(3u, result[1]->enqueue_order());
+}
+
} // namespace internal
} // namespace sequence_manager
} // namespace base
diff --git a/chromium/base/task/simple_task_executor.cc b/chromium/base/task/simple_task_executor.cc
new file mode 100644
index 00000000000..ffccf52be86
--- /dev/null
+++ b/chromium/base/task/simple_task_executor.cc
@@ -0,0 +1,62 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task/simple_task_executor.h"
+
+namespace base {
+
+SimpleTaskExecutor::SimpleTaskExecutor(
+ scoped_refptr<SingleThreadTaskRunner> task_queue)
+ : task_queue_(std::move(task_queue)),
+ previous_task_executor_(GetTaskExecutorForCurrentThread()) {
+ DCHECK(task_queue_);
+ // The TaskExecutor API does not expect nesting, but this can happen in tests
+ // so we have to work around it here.
+ if (previous_task_executor_)
+ SetTaskExecutorForCurrentThread(nullptr);
+ SetTaskExecutorForCurrentThread(this);
+}
+
+SimpleTaskExecutor::~SimpleTaskExecutor() {
+ if (previous_task_executor_)
+ SetTaskExecutorForCurrentThread(nullptr);
+ SetTaskExecutorForCurrentThread(previous_task_executor_);
+}
+
+bool SimpleTaskExecutor::PostDelayedTask(const Location& from_here,
+ const TaskTraits& traits,
+ OnceClosure task,
+ TimeDelta delay) {
+ return task_queue_->PostDelayedTask(from_here, std::move(task), delay);
+}
+
+scoped_refptr<TaskRunner> SimpleTaskExecutor::CreateTaskRunner(
+ const TaskTraits& traits) {
+ return task_queue_;
+}
+
+scoped_refptr<SequencedTaskRunner>
+SimpleTaskExecutor::CreateSequencedTaskRunner(const TaskTraits& traits) {
+ return task_queue_;
+}
+
+scoped_refptr<SingleThreadTaskRunner>
+SimpleTaskExecutor::CreateSingleThreadTaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) {
+ return task_queue_;
+}
+
+#if defined(OS_WIN)
+scoped_refptr<SingleThreadTaskRunner>
+SimpleTaskExecutor::CreateCOMSTATaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) {
+ // It seems pretty unlikely this will be used on a comsta task thread.
+ NOTREACHED();
+ return task_queue_;
+}
+#endif // defined(OS_WIN)
+
+} // namespace base
diff --git a/chromium/base/task/simple_task_executor.h b/chromium/base/task/simple_task_executor.h
new file mode 100644
index 00000000000..7d9a74dc5d2
--- /dev/null
+++ b/chromium/base/task/simple_task_executor.h
@@ -0,0 +1,52 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BASE_TASK_SIMPLE_TASK_EXECUTOR_H_
+#define BASE_TASK_SIMPLE_TASK_EXECUTOR_H_
+
+#include "base/task/task_executor.h"
+#include "build/build_config.h"
+
+namespace base {
+
+// A simple TaskExecutor with exactly one SingleThreadTaskRunner.
+// Must be instantiated and destroyed on the thread that runs tasks for the
+// SingleThreadTaskRunner.
+class BASE_EXPORT SimpleTaskExecutor : public TaskExecutor {
+ public:
+ explicit SimpleTaskExecutor(scoped_refptr<SingleThreadTaskRunner> task_queue);
+
+ ~SimpleTaskExecutor() override;
+
+ bool PostDelayedTask(const Location& from_here,
+ const TaskTraits& traits,
+ OnceClosure task,
+ TimeDelta delay) override;
+
+ scoped_refptr<TaskRunner> CreateTaskRunner(const TaskTraits& traits) override;
+
+ scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner(
+ const TaskTraits& traits) override;
+
+ scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) override;
+
+#if defined(OS_WIN)
+ scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) override;
+#endif // defined(OS_WIN)
+
+ private:
+ const scoped_refptr<SingleThreadTaskRunner> task_queue_;
+
+ // In tests there may already be a TaskExecutor registered for the thread, we
+ // keep tack of the previous TaskExecutor and restored it upon destruction.
+ TaskExecutor* const previous_task_executor_;
+};
+
+} // namespace base
+
+#endif // BASE_TASK_SIMPLE_TASK_EXECUTOR_H_
diff --git a/chromium/base/task/single_thread_task_executor.cc b/chromium/base/task/single_thread_task_executor.cc
index 7e12b784faf..01d993be952 100644
--- a/chromium/base/task/single_thread_task_executor.cc
+++ b/chromium/base/task/single_thread_task_executor.cc
@@ -17,7 +17,8 @@ SingleThreadTaskExecutor::SingleThreadTaskExecutor(MessagePumpType type)
.Build())),
default_task_queue_(sequence_manager_->CreateTaskQueue(
sequence_manager::TaskQueue::Spec("default_tq"))),
- type_(type) {
+ type_(type),
+ simple_task_executor_(task_runner()) {
sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner());
sequence_manager_->BindToMessagePump(MessagePump::Create(type));
diff --git a/chromium/base/task/single_thread_task_executor.h b/chromium/base/task/single_thread_task_executor.h
index d350420d797..c048a830cf1 100644
--- a/chromium/base/task/single_thread_task_executor.h
+++ b/chromium/base/task/single_thread_task_executor.h
@@ -11,6 +11,7 @@
#include "base/memory/scoped_refptr.h"
#include "base/message_loop/message_pump_type.h"
#include "base/single_thread_task_runner.h"
+#include "base/task/simple_task_executor.h"
namespace base {
@@ -40,6 +41,7 @@ class BASE_EXPORT SingleThreadTaskExecutor {
std::unique_ptr<sequence_manager::SequenceManager> sequence_manager_;
scoped_refptr<sequence_manager::TaskQueue> default_task_queue_;
MessagePumpType type_;
+ SimpleTaskExecutor simple_task_executor_;
DISALLOW_COPY_AND_ASSIGN(SingleThreadTaskExecutor);
};
diff --git a/chromium/base/task/single_thread_task_executor_unittest.cc b/chromium/base/task/single_thread_task_executor_unittest.cc
new file mode 100644
index 00000000000..b64294f404f
--- /dev/null
+++ b/chromium/base/task/single_thread_task_executor_unittest.cc
@@ -0,0 +1,58 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task/single_thread_task_executor.h"
+
+#include "base/run_loop.h"
+#include "base/task/post_task.h"
+#include "base/test/bind_test_util.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using ::testing::IsNull;
+using ::testing::NotNull;
+
+namespace base {
+
+TEST(SingleThreadTaskExecutorTest, GetTaskExecutorForCurrentThread) {
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull());
+
+ {
+ SingleThreadTaskExecutor single_thread_task_executor;
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull());
+ }
+
+ EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull());
+}
+
+TEST(SingleThreadTaskExecutorTest,
+ GetTaskExecutorForCurrentThreadInPostedTask) {
+ SingleThreadTaskExecutor single_thread_task_executor;
+ TaskExecutor* task_executor = GetTaskExecutorForCurrentThread();
+
+ EXPECT_THAT(task_executor, NotNull());
+
+ RunLoop run_loop;
+ single_thread_task_executor.task_runner()->PostTask(
+ FROM_HERE, BindLambdaForTesting([&]() {
+ EXPECT_EQ(GetTaskExecutorForCurrentThread(), task_executor);
+ run_loop.Quit();
+ }));
+
+ run_loop.Run();
+}
+
+TEST(SingleThreadTaskExecutorTest, CurrentThread) {
+ SingleThreadTaskExecutor single_thread_task_executor;
+
+ EXPECT_EQ(single_thread_task_executor.task_runner(),
+ base::CreateSingleThreadTaskRunner({base::CurrentThread()}));
+
+ // There's only one task queue so priority is ignored.
+ EXPECT_EQ(single_thread_task_executor.task_runner(),
+ base::CreateSingleThreadTaskRunner(
+ {base::CurrentThread(), base::TaskPriority::BEST_EFFORT}));
+}
+
+} // namespace base
diff --git a/chromium/base/task/task_executor.cc b/chromium/base/task/task_executor.cc
index 8b527b0cd5b..355fa3f445f 100644
--- a/chromium/base/task/task_executor.cc
+++ b/chromium/base/task/task_executor.cc
@@ -6,8 +6,10 @@
#include <type_traits>
+#include "base/no_destructor.h"
#include "base/task/task_traits.h"
#include "base/task/task_traits_extension.h"
+#include "base/threading/thread_local.h"
namespace base {
@@ -30,6 +32,21 @@ static_assert(
} // namespace
+ThreadLocalPointer<TaskExecutor>* GetTLSForCurrentTaskExecutor() {
+ static NoDestructor<ThreadLocalPointer<TaskExecutor>> instance;
+ return instance.get();
+}
+
+void SetTaskExecutorForCurrentThread(TaskExecutor* task_executor) {
+ DCHECK(!task_executor || !GetTLSForCurrentTaskExecutor()->Get() ||
+ GetTLSForCurrentTaskExecutor()->Get() == task_executor);
+ GetTLSForCurrentTaskExecutor()->Set(task_executor);
+}
+
+TaskExecutor* GetTaskExecutorForCurrentThread() {
+ return GetTLSForCurrentTaskExecutor()->Get();
+}
+
void RegisterTaskExecutor(uint8_t extension_id, TaskExecutor* task_executor) {
DCHECK_NE(extension_id, TaskTraitsExtensionStorage::kInvalidExtensionId);
DCHECK_LE(extension_id, TaskTraitsExtensionStorage::kMaxExtensionId);
@@ -57,4 +74,4 @@ TaskExecutor* GetRegisteredTaskExecutorForTraits(const TaskTraits& traits) {
return nullptr;
}
-} // namespace base \ No newline at end of file
+} // namespace base
diff --git a/chromium/base/task/task_executor.h b/chromium/base/task/task_executor.h
index b4e79e14c9a..a062fdc4391 100644
--- a/chromium/base/task/task_executor.h
+++ b/chromium/base/task/task_executor.h
@@ -74,6 +74,13 @@ void BASE_EXPORT RegisterTaskExecutor(uint8_t extension_id,
TaskExecutor* task_executor);
void BASE_EXPORT UnregisterTaskExecutorForTesting(uint8_t extension_id);
+// Stores the provided TaskExecutor in TLS for the current thread, to be used by
+// tasks with the CurrentThread() trait.
+void BASE_EXPORT SetTaskExecutorForCurrentThread(TaskExecutor* task_executor);
+
+// Returns the task executor registered for the current thread.
+BASE_EXPORT TaskExecutor* GetTaskExecutorForCurrentThread();
+
// Determines whether a registered TaskExecutor will handle tasks with the given
// |traits| and, if so, returns a pointer to it. Otherwise, returns |nullptr|.
TaskExecutor* GetRegisteredTaskExecutorForTraits(const TaskTraits& traits);
diff --git a/chromium/base/task/task_traits.h b/chromium/base/task/task_traits.h
index c1e52ed0f07..03abf17913f 100644
--- a/chromium/base/task/task_traits.h
+++ b/chromium/base/task/task_traits.h
@@ -185,6 +185,14 @@ struct WithBaseSyncPrimitives {};
// between tasks, see base::PostTask::CreateSequencedTaskRunner.
struct ThreadPool {};
+// Tasks and task runners with this thread will run tasks on the virtual thread
+// (sequence) they are posted/created from. Other traits may be specified
+// alongside this one to refine properties for the associated tasks
+// (e.g. base::TaskPriority or content::BrowserTaskType) as long as those traits
+// are compatible with the current thread (e.g. cannot specify base::MayBlock()
+// on a non-blocking thread or alter base::TaskShutdownBehavior).
+struct CurrentThread {};
+
// Describes metadata for a single task or a group of tasks.
class BASE_EXPORT TaskTraits {
public:
@@ -196,6 +204,7 @@ class BASE_EXPORT TaskTraits {
ValidTrait(MayBlock);
ValidTrait(WithBaseSyncPrimitives);
ValidTrait(ThreadPool);
+ ValidTrait(CurrentThread);
};
// Invoking this constructor without arguments produces TaskTraits that are
@@ -255,15 +264,22 @@ class BASE_EXPORT TaskTraits {
may_block_(trait_helpers::HasTrait<MayBlock, ArgTypes...>()),
with_base_sync_primitives_(
trait_helpers::HasTrait<WithBaseSyncPrimitives, ArgTypes...>()),
- use_thread_pool_(trait_helpers::HasTrait<ThreadPool, ArgTypes...>()) {
+ use_thread_pool_(trait_helpers::HasTrait<ThreadPool, ArgTypes...>()),
+ use_current_thread_(
+ trait_helpers::HasTrait<CurrentThread, ArgTypes...>()) {
constexpr bool has_thread_pool =
trait_helpers::HasTrait<ThreadPool, ArgTypes...>();
constexpr bool has_extension =
!trait_helpers::AreValidTraits<ValidTrait, ArgTypes...>::value;
+ constexpr bool has_current_thread =
+ trait_helpers::HasTrait<CurrentThread, ArgTypes...>();
+ static_assert(
+ !has_current_thread || !has_thread_pool,
+ "base::CurrentThread is mutually exclusive with base::ThreadPool");
static_assert(
- has_thread_pool ^ has_extension,
+ has_thread_pool ^ has_extension || has_current_thread,
"Traits must explicitly specify a destination (e.g. ThreadPool or a "
- "named thread like BrowserThread)");
+ "named thread like BrowserThread, or CurrentThread)");
}
constexpr TaskTraits(const TaskTraits& other) = default;
@@ -271,14 +287,15 @@ class BASE_EXPORT TaskTraits {
// TODO(eseckler): Default the comparison operator once C++20 arrives.
bool operator==(const TaskTraits& other) const {
- static_assert(sizeof(TaskTraits) == 15,
+ static_assert(sizeof(TaskTraits) == 16,
"Update comparison operator when TaskTraits change");
return extension_ == other.extension_ && priority_ == other.priority_ &&
shutdown_behavior_ == other.shutdown_behavior_ &&
thread_policy_ == other.thread_policy_ &&
may_block_ == other.may_block_ &&
with_base_sync_primitives_ == other.with_base_sync_primitives_ &&
- use_thread_pool_ == other.use_thread_pool_;
+ use_thread_pool_ == other.use_thread_pool_ &&
+ use_current_thread_ == other.use_current_thread_;
}
// Sets the priority of tasks with these traits to |priority|.
@@ -335,6 +352,10 @@ class BASE_EXPORT TaskTraits {
// Returns true if tasks with these traits execute on the thread pool.
constexpr bool use_thread_pool() const { return use_thread_pool_; }
+ // Returns true if tasks with these traits execute on the virtual thread
+ // (sequence) they are posted/created from.
+ constexpr bool use_current_thread() const { return use_current_thread_; }
+
uint8_t extension_id() const { return extension_.extension_id; }
// Access the extension data by parsing it into the provided extension type.
@@ -353,6 +374,7 @@ class BASE_EXPORT TaskTraits {
TaskPriority priority,
bool may_block,
bool use_thread_pool,
+ bool use_current_thread,
TaskTraitsExtensionStorage extension)
: extension_(extension),
priority_(static_cast<uint8_t>(priority) |
@@ -362,8 +384,9 @@ class BASE_EXPORT TaskTraits {
thread_policy_(static_cast<uint8_t>(ThreadPolicy::PREFER_BACKGROUND)),
may_block_(may_block),
with_base_sync_primitives_(false),
- use_thread_pool_(use_thread_pool) {
- static_assert(sizeof(TaskTraits) == 15, "Keep this constructor up to date");
+ use_thread_pool_(use_thread_pool),
+ use_current_thread_(use_current_thread) {
+ static_assert(sizeof(TaskTraits) == 16, "Keep this constructor up to date");
}
// This bit is set in |priority_|, |shutdown_behavior_| and |thread_policy_|
@@ -378,6 +401,7 @@ class BASE_EXPORT TaskTraits {
bool may_block_;
bool with_base_sync_primitives_;
bool use_thread_pool_;
+ bool use_current_thread_;
};
// Returns string literals for the enums defined in this file. These methods
diff --git a/chromium/base/task/task_traits_unittest.nc b/chromium/base/task/task_traits_unittest.nc
index 8755ca028ff..73fc7be5506 100644
--- a/chromium/base/task/task_traits_unittest.nc
+++ b/chromium/base/task/task_traits_unittest.nc
@@ -26,6 +26,8 @@ constexpr TaskTraits traits = {TaskShutdownBehavior::BLOCK_SHUTDOWN,
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN};
#elif defined(NCTEST_TASK_TRAITS_INVALID_TYPE) // [r"no matching constructor for initialization of 'const base::TaskTraits'"]
constexpr TaskTraits traits = {TaskShutdownBehavior::BLOCK_SHUTDOWN, true};
+#elif defined(NCTEST_TASK_TRAITS_CURRENT_THREAD_AND_THREADPOOL) // [r"base::CurrentThread is mutually exclusive with base::ThreadPool"]
+constexpr TaskTraits traits = {ThreadPool(), CurrentThread()};
#endif
} // namespace base
diff --git a/chromium/base/task/thread_pool/historical_histogram_data.md b/chromium/base/task/thread_pool/historical_histogram_data.md
new file mode 100644
index 00000000000..d253731b6fb
--- /dev/null
+++ b/chromium/base/task/thread_pool/historical_histogram_data.md
@@ -0,0 +1,92 @@
+# Historical Histogram Data
+
+This page presents data captured from `base::ThreadPool` histograms at a given
+point in time so it can be used in future design decisions.
+
+All data is 28-day aggregation on Stable channel.
+
+## Number of tasks between waits
+
+Number of tasks between two waits by a foreground worker thread in a
+browser/renderer process.
+
+Histogram name: ThreadPool.NumTasksBetweenWaits.(Browser/Renderer).Foreground
+Date: August 2019
+Values in tables below are percentiles.
+
+### Windows
+
+| Number of tasks | Browser process | Renderer process |
+|-----------------|-----------------|------------------|
+| 1 | 87 | 92 |
+| 2 | 95 | 98 |
+| 5 | 99 | 100 |
+
+### Mac
+
+| Number of tasks | Browser process | Renderer process |
+|-----------------|-----------------|------------------|
+| 1 | 81 | 90 |
+| 2 | 92 | 97 |
+| 5 | 98 | 100 |
+
+### Android
+
+| Number of tasks | Browser process | Renderer process |
+|-----------------|-----------------|------------------|
+| 1 | 92 | 96 |
+| 2 | 97 | 98 |
+| 5 | 99 | 100 |
+
+
+## Number of tasks run while queueing
+
+Number of tasks run by ThreadPool while task was queuing (from time task was
+posted until time it was run). Recorded for dummy heartbeat tasks in the
+*browser* process. The heartbeat recording avoids dependencies between this
+report and other work in the system.
+
+Histogram name: ThreadPool.NumTasksRunWhileQueuing.Browser.*
+Date: September 2019
+Values in tables below are percentiles.
+
+Note: In *renderer* processes, on all platforms/priorities, 0 tasks are run
+while queuing at 99.5th percentile.
+
+### Windows
+
+| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT |
+|-----------------|---------------|--------------|-------------|
+| 0 | 95 | 93 | 90 |
+| 1 | 98 | 95 | 92 |
+| 2 | 99 | 96 | 93 |
+| 5 | 100 | 98 | 95 |
+
+### Mac
+
+| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT |
+|-----------------|---------------|--------------|-------------|
+| 0 | 100 | 100 | 99 |
+| 1 | 100 | 100 | 99 |
+| 2 | 100 | 100 | 99 |
+| 5 | 100 | 100 | 100 |
+
+### Android
+
+| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT |
+|-----------------|---------------|--------------|-------------|
+| 0 | 99 | 98 | 97 |
+| 1 | 100 | 99 | 99 |
+| 2 | 100 | 99 | 99 |
+| 5 | 100 | 100 | 100 |
+
+### Chrome OS
+
+For all priorities, 0 tasks are run while queueing at 99.5th percentile.
+
+### Analysis
+
+The number of tasks that run while a BEST_EFFORT task is queued is unexpectedly
+low. We should explore creating threads less aggressively, at the expense of
+keeping BEST_EFFORT tasks in the queue for a longer time. See
+[Bug 906079](https://crbug.com/906079).
diff --git a/chromium/base/task/thread_pool/initialization_util.cc b/chromium/base/task/thread_pool/initialization_util.cc
index d70c4c3d27d..6fd197300c9 100644
--- a/chromium/base/task/thread_pool/initialization_util.cc
+++ b/chromium/base/task/thread_pool/initialization_util.cc
@@ -6,6 +6,7 @@
#include <algorithm>
+#include "base/numerics/ranges.h"
#include "base/system/sys_info.h"
namespace base {
@@ -16,7 +17,7 @@ int RecommendedMaxNumberOfThreadsInThreadGroup(int min,
int offset) {
const int num_of_cores = SysInfo::NumberOfProcessors();
const int threads = std::ceil<int>(num_of_cores * cores_multiplier) + offset;
- return std::min(max, std::max(min, threads));
+ return ClampToRange(threads, min, max);
}
} // namespace base
diff --git a/chromium/base/task/thread_pool/job_task_source.cc b/chromium/base/task/thread_pool/job_task_source.cc
index c6e5f9733c5..be52945d0d9 100644
--- a/chromium/base/task/thread_pool/job_task_source.cc
+++ b/chromium/base/task/thread_pool/job_task_source.cc
@@ -19,6 +19,120 @@
namespace base {
namespace internal {
+// Memory ordering on |state_| operations
+//
+// The write operation on |state_| in WillRunTask() uses
+// std::memory_order_release, matched by std::memory_order_acquire on read
+// operations (in DidProcessTask()) to establish a
+// Release-Acquire ordering. When a call to WillRunTask() is caused by an
+// increase of max concurrency followed by an associated
+// NotifyConcurrencyIncrease(), the priority queue lock guarantees an
+// happens-after relation with NotifyConcurrencyIncrease(). This ensures that an
+// increase of max concurrency that happened-before NotifyConcurrencyIncrease()
+// is visible to a read operation that happens-after WillRunTask().
+//
+// In DidProcessTask(), this is necessary to
+// ensure that the task source is always re-enqueued when it needs to. When the
+// task source needs to be queued, either because the current task yielded or
+// because of NotifyConcurrencyIncrease(), one of the following is true:
+// A) DidProcessTask() happens-after WillRunTask():
+// T1: Current task returns (because it is done) or yields.
+// T2: Increases the value returned by GetMaxConcurrency()
+// NotifyConcurrencyIncrease() enqueues the task source
+// T3: WillRunTask(), in response to the concurrency increase - Release
+// Does not keep the TaskSource in PriorityQueue because it is at max
+// concurrency
+// T1: DidProcessTask() - Acquire - Because of memory barrier, sees the same
+// (or newer) max concurrency as T2
+// Re-enqueues the TaskSource because no longer at max concurrency
+// Without the memory barrier, T1 may see an outdated max concurrency that
+// is lower than the actual max concurrency and won't re-enqueue the
+// task source, because it thinks it's already saturated.
+// The task source often needs to be re-enqueued if its task
+// completed because it yielded and |max_concurrency| wasn't decreased.
+// B) DidProcessTask() happens-before WillRunTask():
+// T1: Current task returns (because it is done) or yields
+// T2: Increases the value returned by GetMaxConcurrency()
+// NotifyConcurrencyIncrease() enqueues the task source
+// T1: DidProcessTask() - Acquire (ineffective)
+// Since the task source is already in the queue, it doesn't matter
+// whether T1 re-enqueues the task source or not.
+// Note that stale values the other way around can cause incorrectly
+// re-enqueuing this task_source, which is not an issue because the queues
+// support empty task sources.
+
+JobTaskSource::State::State() = default;
+JobTaskSource::State::~State() = default;
+
+JobTaskSource::State::Value JobTaskSource::State::Cancel() {
+ return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::TryIncrementWorkerCountFromWorkerRelease(
+ size_t max_concurrency) {
+ uint32_t value_before_add = value_.load(std::memory_order_relaxed);
+
+ // std::memory_order_release on success to establish Release-Acquire ordering
+ // with DecrementWorkerCountAcquire() (see Memory Ordering comment at top of
+ // the file).
+ while (!(value_before_add & kCanceledMask) &&
+ (value_before_add >> kWorkerCountBitOffset) < max_concurrency &&
+ !value_.compare_exchange_weak(
+ value_before_add, value_before_add + kWorkerCountIncrement,
+ std::memory_order_release, std::memory_order_relaxed)) {
+ }
+ return {value_before_add};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::DecrementWorkerCountFromWorkerAcquire() {
+ const size_t value_before_sub =
+ value_.fetch_sub(kWorkerCountIncrement, std::memory_order_acquire);
+ DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
+ return {value_before_sub};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::IncrementWorkerCountFromJoiningThread() {
+ size_t value_before_add =
+ value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
+ return {value_before_add};
+}
+
+JobTaskSource::State::Value
+JobTaskSource::State::DecrementWorkerCountFromJoiningThread() {
+ const size_t value_before_sub =
+ value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
+ DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
+ return {value_before_sub};
+}
+
+JobTaskSource::State::Value JobTaskSource::State::Load() const {
+ return {value_.load(std::memory_order_relaxed)};
+}
+
+JobTaskSource::JoinFlag::JoinFlag() = default;
+JobTaskSource::JoinFlag::~JoinFlag() = default;
+
+void JobTaskSource::JoinFlag::SetWaiting() {
+ const auto previous_value =
+ value_.exchange(kWaitingForWorkerToYield, std::memory_order_relaxed);
+ DCHECK(previous_value == kNotWaiting);
+}
+
+bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
+ // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
+ // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
+ return value_.fetch_and(kWaitingForWorkerToSignal,
+ std::memory_order_relaxed) ==
+ kWaitingForWorkerToYield;
+}
+
+bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
+ return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
+}
+
JobTaskSource::JobTaskSource(
const Location& from_here,
const TaskTraits& traits,
@@ -28,64 +142,122 @@ JobTaskSource::JobTaskSource(
: TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
from_here_(from_here),
max_concurrency_callback_(std::move(max_concurrency_callback)),
- worker_task_(base::BindRepeating(
- [](JobTaskSource* self,
- const RepeatingCallback<void(experimental::JobDelegate*)>&
- worker_task) {
+ worker_task_(std::move(worker_task)),
+ primary_task_(base::BindRepeating(
+ [](JobTaskSource* self) {
// Each worker task has its own delegate with associated state.
experimental::JobDelegate job_delegate{self, self->delegate_};
- worker_task.Run(&job_delegate);
+ self->worker_task_.Run(&job_delegate);
},
- base::Unretained(this),
- std::move(worker_task))),
+ base::Unretained(this))),
queue_time_(TimeTicks::Now()),
delegate_(delegate) {
DCHECK(delegate_);
}
JobTaskSource::~JobTaskSource() {
-#if DCHECK_IS_ON()
- auto worker_count = worker_count_.load(std::memory_order_relaxed);
// Make sure there's no outstanding active run operation left.
- DCHECK(worker_count == 0U || worker_count == kInvalidWorkerCount)
- << worker_count;
-#endif
+ DCHECK_EQ(state_.Load().worker_count(), 0U);
}
ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
return {SequenceToken::Create(), nullptr};
}
+bool JobTaskSource::WillJoin() {
+ {
+ CheckedAutoLock auto_lock(lock_);
+ DCHECK(!worker_released_condition_); // This may only be called once.
+ worker_released_condition_ = lock_.CreateConditionVariable();
+ }
+ // std::memory_order_relaxed on |worker_count_| is sufficient because call to
+ // GetMaxConcurrency() is used for a best effort early exit. Stale values will
+ // only cause WaitForParticipationOpportunity() to be called.
+ const auto state_before_add = state_.IncrementWorkerCountFromJoiningThread();
+
+ if (!state_before_add.is_canceled() &&
+ state_before_add.worker_count() < GetMaxConcurrency()) {
+ return true;
+ }
+ return WaitForParticipationOpportunity();
+}
+
+bool JobTaskSource::RunJoinTask() {
+ experimental::JobDelegate job_delegate{this, nullptr};
+ worker_task_.Run(&job_delegate);
+
+ // std::memory_order_relaxed on |worker_count_| is sufficient because the call
+ // to GetMaxConcurrency() is used for a best effort early exit. Stale values
+ // will only cause WaitForParticipationOpportunity() to be called.
+ const auto state = state_.Load();
+ if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency())
+ return true;
+
+ return WaitForParticipationOpportunity();
+}
+
+void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
+ // Sets the kCanceledMask bit on |state_| so that further calls to
+ // WillRunTask() never succeed. std::memory_order_relaxed is sufficient
+ // because this task source never needs to be re-enqueued after Cancel().
+ state_.Cancel();
+}
+
+bool JobTaskSource::WaitForParticipationOpportunity() {
+ CheckedAutoLock auto_lock(lock_);
+
+ // std::memory_order_relaxed is sufficient because no other state is
+ // synchronized with |state_| outside of |lock_|.
+ auto state = state_.Load();
+ size_t max_concurrency = GetMaxConcurrency();
+
+ // Wait until either:
+ // A) |worker_count| is below or equal to max concurrency and state is not
+ // canceled.
+ // B) All other workers returned and |worker_count| is 1.
+ while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
+ state.worker_count() == 1)) {
+ // std::memory_order_relaxed is sufficient because no other state is
+ // synchronized with |join_flag_| outside of |lock_|.
+ join_flag_.SetWaiting();
+
+ // To avoid unnecessarily waiting, if either condition A) or B) change
+ // |lock_| is taken and |worker_released_condition_| signaled if necessary:
+ // 1- In DidProcessTask(), after worker count is decremented.
+ // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
+ worker_released_condition_->Wait();
+ state = state_.Load();
+ max_concurrency = GetMaxConcurrency();
+ }
+ // Case A:
+ if (state.worker_count() <= max_concurrency && !state.is_canceled())
+ return true;
+ // Case B:
+ // Only the joining thread remains.
+ DCHECK_EQ(state.worker_count(), 1U);
+ DCHECK(state.is_canceled() || max_concurrency == 0U);
+ state_.DecrementWorkerCountFromJoiningThread();
+ return false;
+}
+
TaskSource::RunStatus JobTaskSource::WillRunTask() {
- // When this call is caused by an increase of max concurrency followed by an
- // associated NotifyConcurrencyIncrease(), the priority queue lock guarantees
- // an happens-after relation with NotifyConcurrencyIncrease(). The memory
- // operations on |worker_count| below and in DidProcessTask() use
- // std::memory_order_release and std::memory_order_acquire respectively to
- // establish a Release-Acquire ordering. This ensures that all memory
- // side-effects made before this point, including an increase of max
- // concurrency followed by NotifyConcurrencyIncrease() are visible to a
- // DidProcessTask() call which is ordered after this one.
const size_t max_concurrency = GetMaxConcurrency();
- size_t worker_count_before_add =
- worker_count_.load(std::memory_order_relaxed);
-
- // std::memory_order_release on success to make the newest |max_concurrency|
- // visible to a thread that calls DidProcessTask() containing a matching
- // std::memory_order_acquire.
- while (worker_count_before_add < max_concurrency &&
- !worker_count_.compare_exchange_weak(
- worker_count_before_add, worker_count_before_add + 1,
- std::memory_order_release, std::memory_order_relaxed)) {
- }
+ // std::memory_order_release on success to establish Release-Acquire ordering
+ // with read operations (see Memory Ordering comment at top of the file).
+ const auto state_before_add =
+ state_.TryIncrementWorkerCountFromWorkerRelease(max_concurrency);
+
// Don't allow this worker to run the task if either:
- // A) |worker_count_| is already at |max_concurrency|.
- // B) |max_concurrency| was lowered below or to |worker_count_|.
- // C) |worker_count_| was invalidated.
- if (worker_count_before_add >= max_concurrency) {
- // The caller is prevented from running a task from this TaskSource.
+ // A) |state_| was canceled.
+ // B) |worker_count| is already at |max_concurrency|.
+ // C) |max_concurrency| was lowered below or to |worker_count|.
+ // Case A:
+ if (state_before_add.is_canceled())
+ return RunStatus::kDisallowed;
+ const size_t worker_count_before_add = state_before_add.worker_count();
+ // Case B) or C):
+ if (worker_count_before_add >= max_concurrency)
return RunStatus::kDisallowed;
- }
DCHECK_LT(worker_count_before_add, max_concurrency);
return max_concurrency == worker_count_before_add + 1
@@ -96,15 +268,23 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() {
size_t JobTaskSource::GetRemainingConcurrency() const {
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with GetRemainingConcurrency().
- const size_t worker_count = worker_count_.load(std::memory_order_relaxed);
+ const auto state = state_.Load();
const size_t max_concurrency = GetMaxConcurrency();
// Avoid underflows.
- if (worker_count > max_concurrency)
+ if (state.is_canceled() || state.worker_count() > max_concurrency)
return 0;
- return max_concurrency - worker_count;
+ return max_concurrency - state.worker_count();
}
void JobTaskSource::NotifyConcurrencyIncrease() {
+ {
+ // Lock is taken to access |join_flag_| below and signal
+ // |worker_released_condition_|.
+ CheckedAutoLock auto_lock(lock_);
+ if (join_flag_.ShouldWorkerSignal())
+ worker_released_condition_->Signal();
+ }
+
#if DCHECK_IS_ON()
{
AutoLock auto_lock(version_lock_);
@@ -125,6 +305,14 @@ size_t JobTaskSource::GetMaxConcurrency() const {
return max_concurrency_callback_.Run();
}
+bool JobTaskSource::ShouldYield() {
+ // It is safe to read |join_flag_| without a lock since this
+ // variable is atomic, keeping in mind that threads may not immediately see
+ // the new value when it is updated.
+ return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
+ state_.Load().is_canceled();
+}
+
#if DCHECK_IS_ON()
size_t JobTaskSource::GetConcurrencyIncreaseVersion() const {
@@ -151,45 +339,36 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) {
#endif // DCHECK_IS_ON()
Optional<Task> JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
- DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U);
- DCHECK(worker_task_);
- return base::make_optional<Task>(from_here_, worker_task_, TimeDelta());
+ // JobTaskSource members are not lock-protected so no need to acquire a lock
+ // if |transaction| is nullptr.
+ DCHECK_GT(state_.Load().worker_count(), 0U);
+ DCHECK(primary_task_);
+ return base::make_optional<Task>(from_here_, primary_task_, TimeDelta());
}
bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) {
- size_t worker_count_before_sub =
- worker_count_.load(std::memory_order_relaxed);
-
- // std::memory_order_acquire on |worker_count_| is necessary to establish
- // Release-Acquire ordering (see WillRunTask()).
- // When the task source needs to be queued, either because the current task
- // yielded or because of NotifyConcurrencyIncrease(), one of the following is
- // true:
- // A) The JobTaskSource is already in the queue (no worker picked up the
- // extra work yet): Incorrectly returning false is fine and the memory
- // barrier may be ineffective.
- // B) The JobTaskSource() is no longer in the queue: The Release-Acquire
- // ordering with WillRunTask() established by |worker_count| ensures that
- // the upcoming call for GetMaxConcurrency() happens-after any
- // NotifyConcurrencyIncrease() that happened-before WillRunTask(). If
- // this task completed because it yielded, this barrier guarantees that
- // it sees an up-to-date concurrency value and correctly re-enqueues.
- //
- // Note that stale values the other way around (incorrectly re-enqueuing) are
- // not an issue because the queues support empty task sources.
- while (worker_count_before_sub != kInvalidWorkerCount &&
- !worker_count_.compare_exchange_weak(
- worker_count_before_sub, worker_count_before_sub - 1,
- std::memory_order_acquire, std::memory_order_relaxed)) {
- }
- if (worker_count_before_sub == kInvalidWorkerCount)
+ // Lock is needed to access |join_flag_| below and signal
+ // |worker_released_condition_|. If |transaction|, then |lock_| is already
+ // taken.
+ CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
+ AnnotateAcquiredLockAlias annotate(lock_, lock_);
+
+ // std::memory_order_acquire to establish Release-Acquire ordering with
+ // WillRunTask() (see Memory Ordering comment at top of the file).
+ const auto state_before_sub = state_.DecrementWorkerCountFromWorkerAcquire();
+
+ if (join_flag_.ShouldWorkerSignal())
+ worker_released_condition_->Signal();
+
+ // A canceled task source should never get re-enqueued.
+ if (state_before_sub.is_canceled())
return false;
- DCHECK_GT(worker_count_before_sub, 0U);
+ DCHECK_GT(state_before_sub.worker_count(), 0U);
// Re-enqueue the TaskSource if the task ran and the worker count is below the
// max concurrency.
- return worker_count_before_sub <= GetMaxConcurrency();
+ return state_before_sub.worker_count() <= GetMaxConcurrency();
}
SequenceSortKey JobTaskSource::GetSortKey() const {
@@ -197,12 +376,7 @@ SequenceSortKey JobTaskSource::GetSortKey() const {
}
Optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
- // Invalidate |worker_count_| so that further calls to WillRunTask() never
- // succeed. std::memory_order_relaxed is sufficient because this task source
- // never needs to be re-enqueued after Clear().
- size_t worker_count_before_store =
- worker_count_.exchange(kInvalidWorkerCount, std::memory_order_relaxed);
- DCHECK_GT(worker_count_before_store, 0U);
+ Cancel();
// Nothing is cleared since other workers might still racily run tasks. For
// simplicity, the destructor will take care of it once all references are
// released.
diff --git a/chromium/base/task/thread_pool/job_task_source.h b/chromium/base/task/thread_pool/job_task_source.h
index ddaca19b343..57838679e7a 100644
--- a/chromium/base/task/thread_pool/job_task_source.h
+++ b/chromium/base/task/thread_pool/job_task_source.h
@@ -38,10 +38,33 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
RepeatingCallback<size_t()> max_concurrency_callback,
PooledTaskRunnerDelegate* delegate);
+ static experimental::JobHandle CreateJobHandle(
+ scoped_refptr<internal::JobTaskSource> task_source) {
+ return experimental::JobHandle(std::move(task_source));
+ }
+
// Notifies this task source that max concurrency was increased, and the
// number of worker should be adjusted.
void NotifyConcurrencyIncrease();
+ // Informs this JobTaskSource that the current thread would like to join and
+ // contribute to running |worker_task|. Returns true if the joining thread can
+ // contribute (RunJoinTask() can be called), or false if joining was completed
+ // and all other workers returned because either there's no work remaining or
+ // Job was cancelled.
+ bool WillJoin();
+
+ // Contributes to running |worker_task| and returns true if the joining thread
+ // can contribute again (RunJoinTask() can be called again), or false if
+ // joining was completed and all other workers returned because either there's
+ // no work remaining or Job was cancelled. This should be called only after
+ // WillJoin() or RunJoinTask() previously returned true.
+ bool RunJoinTask();
+
+ // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
+ // to return RunStatus::kDisallowed.
+ void Cancel(TaskSource::Transaction* transaction = nullptr);
+
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
@@ -50,6 +73,12 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
// concurrently.
size_t GetMaxConcurrency() const;
+ // Returns true if a worker should return from the worker task on the current
+ // thread ASAP.
+ bool ShouldYield();
+
+ PooledTaskRunnerDelegate* delegate() const { return delegate_; }
+
#if DCHECK_IS_ON()
size_t GetConcurrencyIncreaseVersion() const;
// Returns true if the concurrency version was updated above
@@ -58,11 +87,98 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
#endif // DCHECK_IS_ON()
private:
- static constexpr size_t kInvalidWorkerCount =
- std::numeric_limits<size_t>::max();
+ // Atomic internal state to track the number of workers running a task from
+ // this JobTaskSource and whether this JobTaskSource is canceled.
+ class State {
+ public:
+ static constexpr size_t kCanceledMask = 1;
+ static constexpr size_t kWorkerCountBitOffset = 1;
+ static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset;
+
+ struct Value {
+ size_t worker_count() const { return value >> kWorkerCountBitOffset; }
+ // Returns true if canceled.
+ bool is_canceled() const { return value & kCanceledMask; }
+
+ uint32_t value;
+ };
+
+ State();
+ ~State();
+
+ // Sets as canceled using std::memory_order_relaxed. Returns the state
+ // before the operation.
+ Value Cancel();
+
+ // Increments the worker count by 1 if smaller than |max_concurrency| and if
+ // |!is_canceled()|, using std::memory_order_release, and returns the state
+ // before the operation. Equivalent to Load() otherwise.
+ Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency);
+
+ // Decrements the worker count by 1 using std::memory_order_acquire. Returns
+ // the state before the operation.
+ Value DecrementWorkerCountFromWorkerAcquire();
+
+ // Increments the worker count by 1 using std::memory_order_relaxed. Returns
+ // the state before the operation.
+ Value IncrementWorkerCountFromJoiningThread();
+
+ // Decrements the worker count by 1 using std::memory_order_relaxed. Returns
+ // the state before the operation.
+ Value DecrementWorkerCountFromJoiningThread();
+
+ // Loads and returns the state, using std::memory_order_relaxed.
+ Value Load() const;
+
+ private:
+ std::atomic<uint32_t> value_{0};
+ };
+
+ // Atomic flag that indicates if the joining thread is currently waiting on
+ // another worker to yield or to signal.
+ class JoinFlag {
+ public:
+ static constexpr uint32_t kNotWaiting = 0;
+ static constexpr uint32_t kWaitingForWorkerToSignal = 1;
+ static constexpr uint32_t kWaitingForWorkerToYield = 3;
+ // kWaitingForWorkerToYield is 3 because the impl relies on the following
+ // property.
+ static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
+ kWaitingForWorkerToSignal,
+ "");
+
+ JoinFlag();
+ ~JoinFlag();
+
+ // Sets the status as kWaitingForWorkerToYield using
+ // std::memory_order_relaxed.
+ void SetWaiting();
+
+ // If the flag is kWaitingForWorkerToYield, returns true indicating that the
+ // worker should yield, and atomically updates to kWaitingForWorkerToSignal
+ // (using std::memory_order_relaxed) to ensure that a single worker yields
+ // in response to SetWaiting().
+ bool ShouldWorkerYield();
+
+ // If the flag is kWaiting*, returns true indicating that the worker should
+ // signal, and atomically updates to kNotWaiting (using
+ // std::memory_order_relaxed) to ensure that a single worker signals in
+ // response to SetWaiting().
+ bool ShouldWorkerSignal();
+
+ private:
+ std::atomic<uint32_t> value_{kNotWaiting};
+ };
~JobTaskSource() override;
+ // Called from the joining thread. Waits for the worker count to be below or
+ // equal to max concurrency (will happen when a worker calls
+ // DidProcessTask()). Returns true if the joining thread should run a task, or
+ // false if joining was completed and all other workers returned because
+ // either there's no work remaining or Job was cancelled.
+ bool WaitForParticipationOpportunity();
+
// TaskSource:
RunStatus WillRunTask() override;
Optional<Task> TakeTask(TaskSource::Transaction* transaction) override;
@@ -70,13 +186,23 @@ class BASE_EXPORT JobTaskSource : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
- // The current number of workers concurrently running tasks from this
- // TaskSource.
- std::atomic_size_t worker_count_{0U};
+ // Current atomic state.
+ State state_;
+ // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
+ // hence the use of atomics.
+ JoinFlag join_flag_ GUARDED_BY(lock_);
+ // Signaled when |join_flag_| is kWaiting* and a worker returns.
+ std::unique_ptr<ConditionVariable> worker_released_condition_
+ GUARDED_BY(lock_);
const Location from_here_;
- base::RepeatingCallback<size_t()> max_concurrency_callback_;
- base::RepeatingClosure worker_task_;
+ RepeatingCallback<size_t()> max_concurrency_callback_;
+
+ // Worker task set by the job owner.
+ RepeatingCallback<void(experimental::JobDelegate*)> worker_task_;
+ // Task returned from TakeTask(), that calls |worker_task_| internally.
+ RepeatingClosure primary_task_;
+
const TimeTicks queue_time_;
PooledTaskRunnerDelegate* delegate_;
diff --git a/chromium/base/task/thread_pool/job_task_source_unittest.cc b/chromium/base/task/thread_pool/job_task_source_unittest.cc
index 789cf5ab7a0..413771f2235 100644
--- a/chromium/base/task/thread_pool/job_task_source_unittest.cc
+++ b/chromium/base/task/thread_pool/job_task_source_unittest.cc
@@ -29,6 +29,8 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate {
MOCK_CONST_METHOD1(ShouldYield, bool(const TaskSource* task_source));
MOCK_METHOD1(EnqueueJobTaskSource,
bool(scoped_refptr<JobTaskSource> task_source));
+ MOCK_METHOD1(RemoveJobTaskSource,
+ void(scoped_refptr<JobTaskSource> task_source));
MOCK_CONST_METHOD1(IsRunningPoolWithTraits, bool(const TaskTraits& traits));
MOCK_METHOD2(UpdatePriority,
void(scoped_refptr<TaskSource> task_source,
@@ -108,13 +110,17 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
EXPECT_EQ(registered_task_source_d.WillRunTask(),
TaskSource::RunStatus::kAllowedNotSaturated);
+ EXPECT_FALSE(task_source->ShouldYield());
+
{
EXPECT_EQ(1U, task_source->GetRemainingConcurrency());
auto task = registered_task_source_c.Clear();
std::move(task->task).Run();
+ registered_task_source_c.DidProcessTask();
EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
}
// The task source shouldn't allow any further tasks after Clear.
+ EXPECT_TRUE(task_source->ShouldYield());
EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
TaskSource::RunStatus::kDisallowed);
@@ -122,6 +128,7 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
{
auto task = registered_task_source_d.Clear();
std::move(task->task).Run();
+ registered_task_source_d.DidProcessTask();
EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
}
@@ -129,15 +136,53 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) {
std::move(task_a->task).Run();
registered_task_source_a.DidProcessTask();
- // A valid outstanding RunStatus can also take & run a task.
+ // A valid outstanding RunStatus can also take and run a task.
{
auto task = registered_task_source_b.TakeTask();
std::move(task->task).Run();
registered_task_source_b.DidProcessTask();
}
- // Sanity check.
+}
+
+// Verifies that a job task source doesn't return an "allowed" RunStatus after
+// Cancel() is called.
+TEST_F(ThreadPoolJobTaskSourceTest, Cancel) {
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ DoNothing(), /* num_tasks_to_run */ 3);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, {ThreadPool(), TaskPriority::BEST_EFFORT},
+ &pooled_task_runner_delegate_);
+
+ auto registered_task_source_a =
+ RegisteredTaskSource::CreateForTesting(task_source);
+ EXPECT_EQ(registered_task_source_a.WillRunTask(),
+ TaskSource::RunStatus::kAllowedNotSaturated);
+ auto task_a = registered_task_source_a.TakeTask();
+
+ auto registered_task_source_b =
+ RegisteredTaskSource::CreateForTesting(task_source);
+ EXPECT_EQ(registered_task_source_b.WillRunTask(),
+ TaskSource::RunStatus::kAllowedNotSaturated);
+
+ EXPECT_FALSE(task_source->ShouldYield());
+
+ task_source->Cancel();
+ EXPECT_TRUE(task_source->ShouldYield());
+
+ // The task source shouldn't allow any further tasks after Cancel.
EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
TaskSource::RunStatus::kDisallowed);
+
+ // A task that was already acquired can still run.
+ std::move(task_a->task).Run();
+ registered_task_source_a.DidProcessTask();
+
+ // A RegisteredTaskSource that's ready can also take and run a task.
+ {
+ auto task = registered_task_source_b.TakeTask();
+ std::move(task->task).Run();
+ registered_task_source_b.DidProcessTask();
+ }
}
// Verifies that multiple tasks can run in parallel up to |max_concurrency|.
@@ -183,6 +228,68 @@ TEST_F(ThreadPoolJobTaskSourceTest, RunTasksInParallel) {
EXPECT_FALSE(registered_task_source_c.DidProcessTask());
}
+// Verifies the normal flow of running the join task until completion.
+TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTask) {
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ DoNothing(), /* num_tasks_to_run */ 2);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_);
+
+ EXPECT_TRUE(task_source->WillJoin());
+ // Intentionally run |worker_task| twice to make sure RunJoinTask() calls
+ // it again. This can happen in production if the joining thread spuriously
+ // return and needs to run again.
+ EXPECT_TRUE(task_source->RunJoinTask());
+ EXPECT_FALSE(task_source->RunJoinTask());
+}
+
+// Verifies that WillJoin() doesn't allow a joining thread to contribute
+// after Cancel() is called.
+TEST_F(ThreadPoolJobTaskSourceTest, CancelJoinTask) {
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ DoNothing(), /* num_tasks_to_run */ 2);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_);
+
+ task_source->Cancel();
+ EXPECT_FALSE(task_source->WillJoin());
+}
+
+// Verifies that RunJoinTask() doesn't allow a joining thread to contribute
+// after Cancel() is called.
+TEST_F(ThreadPoolJobTaskSourceTest, JoinCancelTask) {
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ DoNothing(), /* num_tasks_to_run */ 2);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_);
+
+ EXPECT_TRUE(task_source->WillJoin());
+ task_source->Cancel();
+ EXPECT_FALSE(task_source->RunJoinTask());
+}
+
+// Verifies that the join task can run in parallel with worker tasks up to
+// |max_concurrency|.
+TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTaskInParallel) {
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ DoNothing(), /* num_tasks_to_run */ 2);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_);
+
+ auto registered_task_source =
+ RegisteredTaskSource::CreateForTesting(task_source);
+ EXPECT_EQ(registered_task_source.WillRunTask(),
+ TaskSource::RunStatus::kAllowedNotSaturated);
+ auto worker_task = registered_task_source.TakeTask();
+
+ EXPECT_TRUE(task_source->WillJoin());
+
+ std::move(worker_task->task).Run();
+ EXPECT_FALSE(registered_task_source.DidProcessTask());
+
+ EXPECT_FALSE(task_source->RunJoinTask());
+}
+
// Verifies that a call to NotifyConcurrencyIncrease() calls the delegate
// and allows to run additional tasks.
TEST_F(ThreadPoolJobTaskSourceTest, NotifyConcurrencyIncrease) {
@@ -342,13 +449,9 @@ TEST_F(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) {
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(task_source);
- EXPECT_EQ(registered_task_source.WillRunTask(),
- TaskSource::RunStatus::kAllowedSaturated);
- // Can not be called before TakeTask().
- EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask());
- auto task = registered_task_source.TakeTask();
- registered_task_source.DidProcessTask();
+ // Can not be called before WillRunTask().
+ EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask());
}
} // namespace internal
diff --git a/chromium/base/task/thread_pool/pooled_task_runner_delegate.h b/chromium/base/task/thread_pool/pooled_task_runner_delegate.h
index e00f481c6a5..0ec1b87c10b 100644
--- a/chromium/base/task/thread_pool/pooled_task_runner_delegate.h
+++ b/chromium/base/task/thread_pool/pooled_task_runner_delegate.h
@@ -46,6 +46,10 @@ class BASE_EXPORT PooledTaskRunnerDelegate {
virtual bool EnqueueJobTaskSource(
scoped_refptr<JobTaskSource> task_source) = 0;
+ // Removes |task_source| from the priority queue.
+ virtual void RemoveJobTaskSource(
+ scoped_refptr<JobTaskSource> task_source) = 0;
+
// Invoked when RunsTasksInCurrentSequence() is called on a
// PooledParallelTaskRunner. Returns true if the current thread is part of the
// ThreadGroup associated with |traits|.
diff --git a/chromium/base/task/thread_pool/priority_queue.cc b/chromium/base/task/thread_pool/priority_queue.cc
index 28077b8740e..85eae1fe469 100644
--- a/chromium/base/task/thread_pool/priority_queue.cc
+++ b/chromium/base/task/thread_pool/priority_queue.cc
@@ -139,20 +139,18 @@ RegisteredTaskSource PriorityQueue::PopTaskSource() {
}
RegisteredTaskSource PriorityQueue::RemoveTaskSource(
- scoped_refptr<TaskSource> task_source) {
- DCHECK(task_source);
-
+ const TaskSource& task_source) {
if (IsEmpty())
return nullptr;
- const HeapHandle heap_handle = task_source->heap_handle();
+ const HeapHandle heap_handle = task_source.heap_handle();
if (!heap_handle.IsValid())
return nullptr;
TaskSourceAndSortKey& task_source_and_sort_key =
const_cast<PriorityQueue::TaskSourceAndSortKey&>(
container_.at(heap_handle));
- DCHECK_EQ(task_source_and_sort_key.task_source().get(), task_source);
+ DCHECK_EQ(task_source_and_sort_key.task_source().get(), &task_source);
RegisteredTaskSource registered_task_source =
task_source_and_sort_key.take_task_source();
diff --git a/chromium/base/task/thread_pool/priority_queue.h b/chromium/base/task/thread_pool/priority_queue.h
index fbe391e9384..d4d88842126 100644
--- a/chromium/base/task/thread_pool/priority_queue.h
+++ b/chromium/base/task/thread_pool/priority_queue.h
@@ -49,7 +49,7 @@ class BASE_EXPORT PriorityQueue {
// RegisteredTaskSource which evaluates to true if successful, or false if
// |task_source| is not currently in the PriorityQueue or the PriorityQueue is
// empty.
- RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source);
+ RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source);
// Updates the sort key of the TaskSource in |transaction| to
// match its current traits. No-ops if the TaskSource is not in the
diff --git a/chromium/base/task/thread_pool/priority_queue_unittest.cc b/chromium/base/task/thread_pool/priority_queue_unittest.cc
index d5af9167457..30ad206ac98 100644
--- a/chromium/base/task/thread_pool/priority_queue_unittest.cc
+++ b/chromium/base/task/thread_pool/priority_queue_unittest.cc
@@ -55,7 +55,7 @@ class PriorityQueueWithSequencesTest : public testing::Test {
}
test::TaskEnvironment task_environment{
- test::ScopedTaskEnvironment::TimeSource::MOCK_TIME};
+ test::TaskEnvironment::TimeSource::MOCK_TIME};
scoped_refptr<TaskSource> sequence_a = MakeSequenceWithTraitsAndTask(
TaskTraits(ThreadPool(), TaskPriority::USER_VISIBLE));
@@ -144,34 +144,34 @@ TEST_F(PriorityQueueWithSequencesTest, RemoveSequence) {
// Remove |sequence_a| from the PriorityQueue. |sequence_b| is still the
// sequence with the highest priority.
- EXPECT_TRUE(pq.RemoveTaskSource(sequence_a).Unregister());
+ EXPECT_TRUE(pq.RemoveTaskSource(*sequence_a).Unregister());
EXPECT_EQ(sort_key_b, pq.PeekSortKey());
ExpectNumSequences(1U, 0U, 2U);
// RemoveTaskSource() should return false if called on a sequence not in the
// PriorityQueue.
- EXPECT_FALSE(pq.RemoveTaskSource(sequence_a).Unregister());
+ EXPECT_FALSE(pq.RemoveTaskSource(*sequence_a).Unregister());
ExpectNumSequences(1U, 0U, 2U);
// Remove |sequence_b| from the PriorityQueue. |sequence_c| becomes the
// sequence with the highest priority.
- EXPECT_TRUE(pq.RemoveTaskSource(sequence_b).Unregister());
+ EXPECT_TRUE(pq.RemoveTaskSource(*sequence_b).Unregister());
EXPECT_EQ(sort_key_c, pq.PeekSortKey());
ExpectNumSequences(1U, 0U, 1U);
// Remove |sequence_d| from the PriorityQueue. |sequence_c| is still the
// sequence with the highest priority.
- EXPECT_TRUE(pq.RemoveTaskSource(sequence_d).Unregister());
+ EXPECT_TRUE(pq.RemoveTaskSource(*sequence_d).Unregister());
EXPECT_EQ(sort_key_c, pq.PeekSortKey());
ExpectNumSequences(0U, 0U, 1U);
// Remove |sequence_c| from the PriorityQueue, making it empty.
- EXPECT_TRUE(pq.RemoveTaskSource(sequence_c).Unregister());
+ EXPECT_TRUE(pq.RemoveTaskSource(*sequence_c).Unregister());
EXPECT_TRUE(pq.IsEmpty());
ExpectNumSequences(0U, 0U, 0U);
// Return false if RemoveTaskSource() is called on an empty PriorityQueue.
- EXPECT_FALSE(pq.RemoveTaskSource(sequence_c).Unregister());
+ EXPECT_FALSE(pq.RemoveTaskSource(*sequence_c).Unregister());
ExpectNumSequences(0U, 0U, 0U);
}
diff --git a/chromium/base/task/thread_pool/sequence.cc b/chromium/base/task/thread_pool/sequence.cc
index 3a4139c551e..ea7ce3053de 100644
--- a/chromium/base/task/thread_pool/sequence.cc
+++ b/chromium/base/task/thread_pool/sequence.cc
@@ -91,6 +91,7 @@ bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
// WillRunTask().
DCHECK(has_worker_);
has_worker_ = false;
+ // See comment on TaskSource::task_runner_ for lifetime management details.
if (queue_.empty()) {
ReleaseTaskRunner();
return false;
@@ -108,20 +109,17 @@ SequenceSortKey Sequence::GetSortKey() const {
Optional<Task> Sequence::Clear(TaskSource::Transaction* transaction) {
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
- has_worker_ = false;
- return base::make_optional<Task>(
- FROM_HERE,
- base::BindOnce(
- [](scoped_refptr<Sequence> self, base::queue<Task> queue) {
- bool queue_was_empty = queue.empty();
- while (!queue.empty())
- queue.pop();
- if (!queue_was_empty) {
- self->ReleaseTaskRunner();
- }
- },
- scoped_refptr<Sequence>(this), std::move(queue_)),
- TimeDelta());
+ // See comment on TaskSource::task_runner_ for lifetime management details.
+ if (!queue_.empty() && !has_worker_)
+ ReleaseTaskRunner();
+ return base::make_optional<Task>(FROM_HERE,
+ base::BindOnce(
+ [](base::queue<Task> queue) {
+ while (!queue.empty())
+ queue.pop();
+ },
+ std::move(queue_)),
+ TimeDelta());
}
void Sequence::ReleaseTaskRunner() {
diff --git a/chromium/base/task/thread_pool/sequence.h b/chromium/base/task/thread_pool/sequence.h
index cbbbda42645..ca505ae84db 100644
--- a/chromium/base/task/thread_pool/sequence.h
+++ b/chromium/base/task/thread_pool/sequence.h
@@ -104,8 +104,7 @@ class BASE_EXPORT Sequence : public TaskSource {
bool DidProcessTask(TaskSource::Transaction* transaction) override;
SequenceSortKey GetSortKey() const override;
- // Releases reference to TaskRunner. This might cause this object to be
- // deleted; therefore, no member access should be made after this method.
+ // Releases reference to TaskRunner.
void ReleaseTaskRunner();
const SequenceToken token_ = SequenceToken::Create();
diff --git a/chromium/base/task/thread_pool/sequence_unittest.cc b/chromium/base/task/thread_pool/sequence_unittest.cc
index 7257a20d6f1..c5166239151 100644
--- a/chromium/base/task/thread_pool/sequence_unittest.cc
+++ b/chromium/base/task/thread_pool/sequence_unittest.cc
@@ -184,7 +184,7 @@ TEST(ThreadPoolSequenceTest, GetSortKeyForeground) {
// Verify that a DCHECK fires if DidProcessTask() is called on a sequence which
// didn't return a Task.
-TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutTakeTask) {
+TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutWillRunTask) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(ThreadPool()), nullptr, TaskSourceExecutionMode::kParallel);
Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
@@ -193,7 +193,6 @@ TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutTakeTask) {
auto registered_task_source =
RegisteredTaskSource::CreateForTesting(sequence);
EXPECT_DCHECK_DEATH({
- registered_task_source.WillRunTask();
registered_task_source.DidProcessTask(&sequence_transaction);
});
}
diff --git a/chromium/base/task/thread_pool/service_thread.cc b/chromium/base/task/thread_pool/service_thread.cc
index 400fbb0590e..a7e1d21061a 100644
--- a/chromium/base/task/thread_pool/service_thread.cc
+++ b/chromium/base/task/thread_pool/service_thread.cc
@@ -71,39 +71,27 @@ void ServiceThread::PerformHeartbeatLatencyReport() const {
if (!task_tracker_)
return;
- static constexpr TaskTraits kReportedTraits[] = {
- {ThreadPool(), TaskPriority::BEST_EFFORT},
- {ThreadPool(), TaskPriority::BEST_EFFORT, MayBlock()},
- {ThreadPool(), TaskPriority::USER_VISIBLE},
- {ThreadPool(), TaskPriority::USER_VISIBLE, MayBlock()},
- {ThreadPool(), TaskPriority::USER_BLOCKING},
- {ThreadPool(), TaskPriority::USER_BLOCKING, MayBlock()}};
-
- // Only record latency for one set of TaskTraits per report to avoid bias in
- // the order in which tasks are posted (should we record all at once) as well
- // as to avoid spinning up many worker threads to process this report if the
+ // Only record latency for one TaskPriority per report to avoid bias in the
+ // order in which tasks are posted (should we record all at once) as well as
+ // to avoid spinning up many worker threads to process this report if the
// thread pool is currently idle (each thread group keeps at least one idle
// thread so a single task isn't an issue).
// Invoke RandInt() out-of-line to ensure it's obtained before
// TimeTicks::Now().
- const TaskTraits& profiled_traits =
- kReportedTraits[RandInt(0, base::size(kReportedTraits) - 1)];
+ const TaskPriority profiled_priority = static_cast<TaskPriority>(
+ RandInt(static_cast<int>(TaskPriority::LOWEST),
+ static_cast<int>(TaskPriority::HIGHEST)));
// Post through the static API to time the full stack. Use a new Now() for
// every set of traits in case PostTask() itself is slow.
// Bonus: this approach also includes the overhead of BindOnce() in the
// reported latency.
- // TODO(jessemckenna): pass |profiled_traits| directly to
- // RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms() once compiler
- // error on NaCl is fixed
- TaskPriority task_priority = profiled_traits.priority();
- bool may_block = profiled_traits.may_block();
base::PostTask(
- FROM_HERE, profiled_traits,
+ FROM_HERE, {base::ThreadPool(), profiled_priority},
BindOnce(
&TaskTracker::RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms,
- Unretained(task_tracker_), task_priority, may_block, TimeTicks::Now(),
+ Unretained(task_tracker_), profiled_priority, TimeTicks::Now(),
task_tracker_->GetNumTasksRun()));
}
diff --git a/chromium/base/task/thread_pool/service_thread_unittest.cc b/chromium/base/task/thread_pool/service_thread_unittest.cc
index ce098456877..7b3d3c21031 100644
--- a/chromium/base/task/thread_pool/service_thread_unittest.cc
+++ b/chromium/base/task/thread_pool/service_thread_unittest.cc
@@ -65,15 +65,9 @@ TEST(ThreadPoolServiceThreadIntegrationTest, HeartbeatLatencyReport) {
"ThreadPool.HeartbeatLatencyMicroseconds.Test."
"UserBlockingTaskPriority",
"ThreadPool.HeartbeatLatencyMicroseconds.Test."
- "UserBlockingTaskPriority_MayBlock",
- "ThreadPool.HeartbeatLatencyMicroseconds.Test."
"UserVisibleTaskPriority",
"ThreadPool.HeartbeatLatencyMicroseconds.Test."
- "UserVisibleTaskPriority_MayBlock",
- "ThreadPool.HeartbeatLatencyMicroseconds.Test."
- "BackgroundTaskPriority",
- "ThreadPool.HeartbeatLatencyMicroseconds.Test."
- "BackgroundTaskPriority_MayBlock"};
+ "BackgroundTaskPriority"};
// Each report hits a single histogram above (randomly selected). But 1000
// reports should touch all histograms at least once the vast majority of the
diff --git a/chromium/base/task/thread_pool/task_source.cc b/chromium/base/task/thread_pool/task_source.cc
index 271988c2400..9674346b962 100644
--- a/chromium/base/task/thread_pool/task_source.cc
+++ b/chromium/base/task/thread_pool/task_source.cc
@@ -131,7 +131,6 @@ Optional<Task> RegisteredTaskSource::TakeTask(
DCHECK(!transaction || transaction->task_source() == get());
#if DCHECK_IS_ON()
DCHECK_EQ(State::kReady, run_step_);
- run_step_ = State::kTaskAcquired;
#endif // DCHECK_IS_ON()
return task_source_->TakeTask(transaction);
}
@@ -139,9 +138,6 @@ Optional<Task> RegisteredTaskSource::TakeTask(
Optional<Task> RegisteredTaskSource::Clear(
TaskSource::Transaction* transaction) {
DCHECK(!transaction || transaction->task_source() == get());
-#if DCHECK_IS_ON()
- run_step_ = State::kInitial;
-#endif // DCHECK_IS_ON()
return task_source_->Clear(transaction);
}
@@ -149,7 +145,7 @@ bool RegisteredTaskSource::DidProcessTask(
TaskSource::Transaction* transaction) {
DCHECK(!transaction || transaction->task_source() == get());
#if DCHECK_IS_ON()
- DCHECK_EQ(State::kTaskAcquired, run_step_);
+ DCHECK_EQ(State::kReady, run_step_);
run_step_ = State::kInitial;
#endif // DCHECK_IS_ON()
return task_source_->DidProcessTask(transaction);
diff --git a/chromium/base/task/thread_pool/task_source.h b/chromium/base/task/thread_pool/task_source.h
index e56b42eda69..4dfa0875885 100644
--- a/chromium/base/task/thread_pool/task_source.h
+++ b/chromium/base/task/thread_pool/task_source.h
@@ -55,9 +55,9 @@ struct BASE_EXPORT ExecutionEnvironment {
// RegisteredTaskSource after obtaining it from the queue:
// 1- Check whether a task can run with WillRunTask() (and register/enqueue the
// task source again if not saturated).
-// 2- Iff (1) determined that a task can run, access the next task with
-// TakeTask().
-// 3- Execute the task.
+// 2- (optional) Iff (1) determined that a task can run, access the next task
+// with TakeTask().
+// 3- (optional) Execute the task.
// 4- Inform the task source that a task was processed with DidProcessTask(),
// and re-enqueue the task source iff requested.
// When a task source is registered multiple times, many overlapping chains of
@@ -86,10 +86,10 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
enum class RunStatus {
// TakeTask() cannot be called.
kDisallowed,
- // TakeTask() must be called, and the TaskSource has not reached its maximum
+ // TakeTask() may called, and the TaskSource has not reached its maximum
// concurrency (i.e. the TaskSource still needs to be queued).
kAllowedNotSaturated,
- // TakeTask() must be called, and the TaskSource has reached its maximum
+ // TakeTask() may called, and the TaskSource has reached its maximum
// concurrency (i.e. the TaskSource no longer needs to be queued).
kAllowedSaturated,
};
@@ -218,7 +218,7 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
// A pointer to the TaskRunner that posts to this TaskSource, if any. The
// derived class is responsible for calling AddRef() when a TaskSource from
// which no Task is executing becomes non-empty and Release() when
- // DidProcessTask() returns false.
+ // it becomes empty again (e.g. when DidProcessTask() returns false).
TaskRunner* task_runner_;
TaskSourceExecutionMode execution_mode_;
@@ -270,11 +270,11 @@ class BASE_EXPORT RegisteredTaskSource {
Optional<Task> TakeTask(TaskSource::Transaction* transaction = nullptr)
WARN_UNUSED_RESULT;
- // Must be called once the task was run. This resets this RegisteredTaskSource
- // to its initial state so that WillRunTask() may be called again.
- // |transaction| is optional and should only be provided if this operation is
- // already part of a transaction. Returns true if the TaskSource should be
- // queued after this operation.
+ // Must be called after WillRunTask() or once the task was run if TakeTask()
+ // was called. This resets this RegisteredTaskSource to its initial state so
+ // that WillRunTask() may be called again. |transaction| is optional and
+ // should only be provided if this operation is already part of a transaction.
+ // Returns true if the TaskSource should be queued after this operation.
bool DidProcessTask(TaskSource::Transaction* transaction = nullptr);
// Returns a task that clears this TaskSource to make it empty. |transaction|
@@ -293,7 +293,6 @@ class BASE_EXPORT RegisteredTaskSource {
enum class State {
kInitial, // WillRunTask() may be called.
kReady, // After WillRunTask() returned a valid RunStatus.
- kTaskAcquired, // After TakeTask().
};
State run_step_ = State::kInitial;
diff --git a/chromium/base/task/thread_pool/task_tracker.cc b/chromium/base/task/thread_pool/task_tracker.cc
index d21422bc9fa..a74f14003b3 100644
--- a/chromium/base/task/thread_pool/task_tracker.cc
+++ b/chromium/base/task/thread_pool/task_tracker.cc
@@ -20,6 +20,7 @@
#include "base/sequence_token.h"
#include "base/synchronization/condition_variable.h"
#include "base/task/scoped_set_task_priority_for_current_thread.h"
+#include "base/task/task_executor.h"
#include "base/threading/sequence_local_storage_map.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/thread_restrictions.h"
@@ -27,6 +28,7 @@
#include "base/time/time.h"
#include "base/trace_event/trace_event.h"
#include "base/values.h"
+#include "build/build_config.h"
namespace base {
namespace internal {
@@ -75,59 +77,34 @@ void TaskTracingInfo::AppendAsTraceFormat(std::string* out) const {
out->append(tmp);
}
-// Constructs a histogram to track latency which is logging to
-// "ThreadPool.{histogram_name}.{histogram_label}.{task_type_suffix}".
-HistogramBase* GetLatencyHistogram(StringPiece histogram_name,
- StringPiece histogram_label,
- StringPiece task_type_suffix) {
- DCHECK(!histogram_name.empty());
- DCHECK(!histogram_label.empty());
- DCHECK(!task_type_suffix.empty());
- // Mimics the UMA_HISTOGRAM_HIGH_RESOLUTION_CUSTOM_TIMES macro. The minimums
- // and maximums were chosen to place the 1ms mark at around the 70% range
- // coverage for buckets giving us good info for tasks that have a latency
- // below 1ms (most of them) and enough info to assess how bad the latency is
- // for tasks that exceed this threshold.
- const std::string histogram = JoinString(
- {"ThreadPool", histogram_name, histogram_label, task_type_suffix}, ".");
- return Histogram::FactoryMicrosecondsTimeGet(
- histogram, TimeDelta::FromMicroseconds(1),
- TimeDelta::FromMilliseconds(20), 50,
- HistogramBase::kUmaTargetedHistogramFlag);
-}
-
-// Constructs a histogram to track task count which is logging to
-// "ThreadPool.{histogram_name}.{histogram_label}.{task_type_suffix}".
-HistogramBase* GetCountHistogram(StringPiece histogram_name,
- StringPiece histogram_label,
- StringPiece task_type_suffix) {
- DCHECK(!histogram_name.empty());
- DCHECK(!histogram_label.empty());
- DCHECK(!task_type_suffix.empty());
- // Mimics the UMA_HISTOGRAM_CUSTOM_COUNTS macro.
- const std::string histogram = JoinString(
- {"ThreadPool", histogram_name, histogram_label, task_type_suffix}, ".");
- // 500 was chosen as the maximum number of tasks run while queuing because
- // values this high would likely indicate an error, beyond which knowing the
- // actual number of tasks is not informative.
- return Histogram::FactoryGet(histogram, 1, 500, 50,
- HistogramBase::kUmaTargetedHistogramFlag);
-}
-
-// Returns a histogram stored in a 2D array indexed by task priority and
-// whether it may block.
-// TODO(jessemckenna): use the STATIC_HISTOGRAM_POINTER_GROUP macro from
-// histogram_macros.h instead.
-HistogramBase* GetHistogramForTaskTraits(
- TaskTraits task_traits,
- HistogramBase* const (*histograms)[2]) {
- return histograms[static_cast<int>(task_traits.priority())]
- [task_traits.may_block() ||
- task_traits.with_base_sync_primitives()
- ? 1
- : 0];
+const char* GetTaskPrioritySuffix(TaskPriority priority) {
+ switch (priority) {
+ case TaskPriority::BEST_EFFORT:
+ return "BackgroundTaskPriority";
+ case TaskPriority::USER_VISIBLE:
+ return "UserVisibleTaskPriority";
+ case TaskPriority::USER_BLOCKING:
+ return "UserBlockingTaskPriority";
+ }
}
+// Records |time_sample| to the histogram |histogram_name|.|priority suffix|,
+// where |priority_suffix| is derived from |priority|.
+//
+// The minimums and maximums were chosen to place the 1ms mark at around the 70%
+// range coverage for buckets giving us good info for tasks that have a latency
+// below 1ms (most of them) and enough info to assess how bad the latency is for
+// tasks that exceed this threshold.
+#define STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(histogram_name, priority, \
+ time_sample) \
+ STATIC_HISTOGRAM_POINTER_GROUP( \
+ histogram_name, static_cast<int>(priority), \
+ static_cast<int>(TaskPriority::HIGHEST) + 1, AddTime(time_sample), \
+ Histogram::FactoryMicrosecondsTimeGet( \
+ histogram_name, TimeDelta::FromMicroseconds(1), \
+ TimeDelta::FromMilliseconds(20), 50, \
+ HistogramBase::kUmaTargetedHistogramFlag));
+
bool HasLogBestEffortTasksSwitch() {
// The CommandLine might not be initialized if ThreadPool is initialized in a
// dynamic library which doesn't have access to argc/argv.
@@ -136,6 +113,84 @@ bool HasLogBestEffortTasksSwitch() {
switches::kLogBestEffortTasks);
}
+// Needed for PostTaskHere and CurrentThread. This executor lives for the
+// duration of a threadpool task invocation.
+class EphemeralTaskExecutor : public TaskExecutor {
+ public:
+ // |sequenced_task_runner| and |single_thread_task_runner| must outlive this
+ // EphemeralTaskExecutor.
+ EphemeralTaskExecutor(SequencedTaskRunner* sequenced_task_runner,
+ SingleThreadTaskRunner* single_thread_task_runner,
+ const TaskTraits* sequence_traits)
+ : sequenced_task_runner_(sequenced_task_runner),
+ single_thread_task_runner_(single_thread_task_runner),
+ sequence_traits_(sequence_traits) {
+ SetTaskExecutorForCurrentThread(this);
+ }
+
+ ~EphemeralTaskExecutor() override {
+ SetTaskExecutorForCurrentThread(nullptr);
+ }
+
+ // TaskExecutor:
+ bool PostDelayedTask(const Location& from_here,
+ const TaskTraits& traits,
+ OnceClosure task,
+ TimeDelta delay) override {
+ CheckTraitsCompatibleWithSequenceTraits(traits);
+ return sequenced_task_runner_->PostDelayedTask(from_here, std::move(task),
+ delay);
+ }
+
+ scoped_refptr<TaskRunner> CreateTaskRunner(
+ const TaskTraits& traits) override {
+ CheckTraitsCompatibleWithSequenceTraits(traits);
+ return sequenced_task_runner_;
+ }
+
+ scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner(
+ const TaskTraits& traits) override {
+ CheckTraitsCompatibleWithSequenceTraits(traits);
+ return sequenced_task_runner_;
+ }
+
+ scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) override {
+ CheckTraitsCompatibleWithSequenceTraits(traits);
+ return single_thread_task_runner_;
+ }
+
+#if defined(OS_WIN)
+ scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(
+ const TaskTraits& traits,
+ SingleThreadTaskRunnerThreadMode thread_mode) override {
+ CheckTraitsCompatibleWithSequenceTraits(traits);
+ return single_thread_task_runner_;
+ }
+#endif // defined(OS_WIN)
+
+ private:
+ // Currently ignores |traits.priority()|.
+ void CheckTraitsCompatibleWithSequenceTraits(const TaskTraits& traits) {
+ if (traits.shutdown_behavior_set_explicitly()) {
+ DCHECK_EQ(traits.shutdown_behavior(),
+ sequence_traits_->shutdown_behavior());
+ }
+
+ DCHECK(!traits.may_block() ||
+ traits.may_block() == sequence_traits_->may_block());
+
+ DCHECK(!traits.with_base_sync_primitives() ||
+ traits.with_base_sync_primitives() ==
+ sequence_traits_->with_base_sync_primitives());
+ }
+
+ SequencedTaskRunner* const sequenced_task_runner_;
+ SingleThreadTaskRunner* const single_thread_task_runner_;
+ const TaskTraits* const sequence_traits_;
+};
+
} // namespace
// Atomic internal state used by TaskTracker to track items that are blocking
@@ -228,78 +283,14 @@ class TaskTracker::State {
DISALLOW_COPY_AND_ASSIGN(State);
};
-// TODO(jessemckenna): Write a helper function to avoid code duplication below.
TaskTracker::TaskTracker(StringPiece histogram_label)
- : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
+ : histogram_label_(histogram_label),
+ has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
state_(new State),
can_run_policy_(CanRunPolicy::kAll),
flush_cv_(flush_lock_.CreateConditionVariable()),
shutdown_lock_(&flush_lock_),
- task_latency_histograms_{
- {GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "BackgroundTaskPriority"),
- GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "BackgroundTaskPriority_MayBlock")},
- {GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "UserVisibleTaskPriority"),
- GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "UserVisibleTaskPriority_MayBlock")},
- {GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "UserBlockingTaskPriority"),
- GetLatencyHistogram("TaskLatencyMicroseconds",
- histogram_label,
- "UserBlockingTaskPriority_MayBlock")}},
- heartbeat_latency_histograms_{
- {GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "BackgroundTaskPriority"),
- GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "BackgroundTaskPriority_MayBlock")},
- {GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "UserVisibleTaskPriority"),
- GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "UserVisibleTaskPriority_MayBlock")},
- {GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "UserBlockingTaskPriority"),
- GetLatencyHistogram("HeartbeatLatencyMicroseconds",
- histogram_label,
- "UserBlockingTaskPriority_MayBlock")}},
- num_tasks_run_while_queuing_histograms_{
- {GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "BackgroundTaskPriority"),
- GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "BackgroundTaskPriority_MayBlock")},
- {GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "UserVisibleTaskPriority"),
- GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "UserVisibleTaskPriority_MayBlock")},
- {GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "UserBlockingTaskPriority"),
- GetCountHistogram("NumTasksRunWhileQueuing",
- histogram_label,
- "UserBlockingTaskPriority_MayBlock")}},
- tracked_ref_factory_(this) {
- // Confirm that all |task_latency_histograms_| have been initialized above.
- for (TaskPriorityType i = 0; i < kNumTaskPriorities; ++i) {
- for (uint8_t j = 0; j < kNumBlockingModes; ++j) {
- DCHECK(task_latency_histograms_[i][j]);
- }
- }
-}
+ tracked_ref_factory_(this) {}
TaskTracker::~TaskTracker() = default;
@@ -441,7 +432,7 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask(
RegisteredTaskSource task_source) {
DCHECK(task_source);
- const bool can_run_worker_task =
+ const bool task_is_worker_task =
BeforeRunTask(task_source->shutdown_behavior());
// Run the next task in |task_source|.
@@ -449,7 +440,7 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask(
TaskTraits traits{ThreadPool()};
{
auto transaction = task_source->BeginTransaction();
- task = can_run_worker_task ? task_source.TakeTask(&transaction)
+ task = task_is_worker_task ? task_source.TakeTask(&transaction)
: task_source.Clear(&transaction);
traits = transaction.traits();
}
@@ -458,13 +449,12 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask(
// Run the |task| (whether it's a worker task or the Clear() closure).
RunTask(std::move(task.value()), task_source.get(), traits);
}
- if (can_run_worker_task) {
+ if (task_is_worker_task)
AfterRunTask(task_source->shutdown_behavior());
- const bool task_source_must_be_queued = task_source.DidProcessTask();
- // |task_source| should be reenqueued iff requested by DidProcessTask().
- if (task_source_must_be_queued)
- return task_source;
- }
+ const bool task_source_must_be_queued = task_source.DidProcessTask();
+ // |task_source| should be reenqueued iff requested by DidProcessTask().
+ if (task_source_must_be_queued)
+ return task_source;
return nullptr;
}
@@ -477,37 +467,50 @@ bool TaskTracker::IsShutdownComplete() const {
return shutdown_event_ && shutdown_event_->IsSignaled();
}
-void TaskTracker::RecordLatencyHistogram(
- LatencyHistogramType latency_histogram_type,
- TaskTraits task_traits,
- TimeTicks posted_time) const {
- const TimeDelta task_latency = TimeTicks::Now() - posted_time;
+void TaskTracker::RecordLatencyHistogram(TaskPriority priority,
+ TimeTicks posted_time) const {
+ if (histogram_label_.empty())
+ return;
- DCHECK(latency_histogram_type == LatencyHistogramType::TASK_LATENCY ||
- latency_histogram_type == LatencyHistogramType::HEARTBEAT_LATENCY);
- const auto& histograms =
- latency_histogram_type == LatencyHistogramType::TASK_LATENCY
- ? task_latency_histograms_
- : heartbeat_latency_histograms_;
- GetHistogramForTaskTraits(task_traits, histograms)
- ->AddTimeMicrosecondsGranularity(task_latency);
+ auto get_latency_histogram_name = [this, priority]() {
+ return JoinString({"ThreadPool.TaskLatencyMicroseconds", histogram_label_,
+ GetTaskPrioritySuffix(priority)},
+ ".");
+ };
+ STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(get_latency_histogram_name(), priority,
+ TimeTicks::Now() - posted_time);
}
void TaskTracker::RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms(
- TaskPriority task_priority,
- bool may_block,
+ TaskPriority priority,
TimeTicks posted_time,
int num_tasks_run_when_posted) const {
- TaskTraits task_traits{ThreadPool()};
- if (may_block)
- task_traits = TaskTraits(ThreadPool(), task_priority, MayBlock());
- else
- task_traits = TaskTraits(ThreadPool(), task_priority);
- RecordLatencyHistogram(LatencyHistogramType::HEARTBEAT_LATENCY, task_traits,
- posted_time);
- GetHistogramForTaskTraits(task_traits,
- num_tasks_run_while_queuing_histograms_)
- ->Add(GetNumTasksRun() - num_tasks_run_when_posted);
+ if (histogram_label_.empty())
+ return;
+
+ auto get_heartbeat_latency_histogram_name = [this, priority]() {
+ return JoinString({"ThreadPool.HeartbeatLatencyMicroseconds",
+ histogram_label_, GetTaskPrioritySuffix(priority)},
+ ".");
+ };
+ STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(get_heartbeat_latency_histogram_name(),
+ priority,
+ TimeTicks::Now() - posted_time);
+
+ auto get_num_tasks_run_while_queuing_histogram_name = [this, priority]() {
+ return JoinString({"ThreadPool.NumTasksRunWhileQueuing", histogram_label_,
+ GetTaskPrioritySuffix(priority)},
+ ".");
+ };
+ STATIC_HISTOGRAM_POINTER_GROUP(
+ get_num_tasks_run_while_queuing_histogram_name(),
+ static_cast<int>(priority), static_cast<int>(TaskPriority::HIGHEST) + 1,
+ Add(GetNumTasksRun() - num_tasks_run_when_posted),
+ // 500 was chosen as the maximum number of tasks run while queuing because
+ // values this high would likely indicate an error, beyond which knowing
+ // the actual number of tasks is not informative.
+ Histogram::FactoryGet(get_num_tasks_run_while_queuing_histogram_name(), 1,
+ 500, 50, HistogramBase::kUmaTargetedHistogramFlag));
}
int TaskTracker::GetNumTasksRun() const {
@@ -522,8 +525,7 @@ void TaskTracker::RunTask(Task task,
TaskSource* task_source,
const TaskTraits& traits) {
DCHECK(task_source);
- RecordLatencyHistogram(LatencyHistogramType::TASK_LATENCY, traits,
- task.queue_time);
+ RecordLatencyHistogram(traits.priority(), task.queue_time);
const auto environment = task_source->GetExecutionEnvironment();
@@ -557,6 +559,7 @@ void TaskTracker::RunTask(Task task,
// Set up TaskRunnerHandle as expected for the scope of the task.
Optional<SequencedTaskRunnerHandle> sequenced_task_runner_handle;
Optional<ThreadTaskRunnerHandle> single_thread_task_runner_handle;
+ Optional<EphemeralTaskExecutor> ephemiral_task_executor;
switch (task_source->execution_mode()) {
case TaskSourceExecutionMode::kJob:
case TaskSourceExecutionMode::kParallel:
@@ -565,11 +568,18 @@ void TaskTracker::RunTask(Task task,
DCHECK(task_source->task_runner());
sequenced_task_runner_handle.emplace(
static_cast<SequencedTaskRunner*>(task_source->task_runner()));
+ ephemiral_task_executor.emplace(
+ static_cast<SequencedTaskRunner*>(task_source->task_runner()),
+ nullptr, &traits);
break;
case TaskSourceExecutionMode::kSingleThread:
DCHECK(task_source->task_runner());
single_thread_task_runner_handle.emplace(
static_cast<SingleThreadTaskRunner*>(task_source->task_runner()));
+ ephemiral_task_executor.emplace(
+ static_cast<SequencedTaskRunner*>(task_source->task_runner()),
+ static_cast<SingleThreadTaskRunner*>(task_source->task_runner()),
+ &traits);
break;
}
diff --git a/chromium/base/task/thread_pool/task_tracker.h b/chromium/base/task/thread_pool/task_tracker.h
index bb5296b7da1..59f0726b8eb 100644
--- a/chromium/base/task/thread_pool/task_tracker.h
+++ b/chromium/base/task/thread_pool/task_tracker.h
@@ -31,7 +31,6 @@
namespace base {
class ConditionVariable;
-class HistogramBase;
namespace internal {
@@ -54,7 +53,8 @@ enum class CanRunPolicy {
// and records metrics and trace events. This class is thread-safe.
class BASE_EXPORT TaskTracker {
public:
- // |histogram_label| is used as a suffix for histograms, it must not be empty.
+ // |histogram_label| is used to label histograms. No histograms are recorded
+ // if it is empty.
TaskTracker(StringPiece histogram_label);
virtual ~TaskTracker();
@@ -131,26 +131,15 @@ class BASE_EXPORT TaskTracker {
// no tasks are blocking shutdown).
bool IsShutdownComplete() const;
- enum class LatencyHistogramType {
- // Records the latency of each individual task posted through TaskTracker.
- TASK_LATENCY,
- // Records the latency of heartbeat tasks which are independent of current
- // workload. These avoid a bias towards TASK_LATENCY reporting that high-
- // priority tasks are "slower" than regular tasks because high-priority
- // tasks tend to be correlated with heavy workloads.
- HEARTBEAT_LATENCY,
- };
-
// Records two histograms
// 1. ThreadPool.[label].HeartbeatLatencyMicroseconds.[suffix]:
// Now() - posted_time
// 2. ThreadPool.[label].NumTasksRunWhileQueuing.[suffix]:
// GetNumTasksRun() - num_tasks_run_when_posted.
// [label] is the histogram label provided to the constructor.
- // [suffix] is derived from |task_priority| and |may_block|.
+ // [suffix] is derived from |task_priority|.
void RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms(
TaskPriority task_priority,
- bool may_block,
TimeTicks posted_time,
int num_tasks_run_when_posted) const;
@@ -213,10 +202,9 @@ class BASE_EXPORT TaskTracker {
// manner.
void CallFlushCallbackForTesting();
- // Records |Now() - posted_time| to the appropriate |latency_histogram_type|
- // based on |task_traits|.
- void RecordLatencyHistogram(LatencyHistogramType latency_histogram_type,
- TaskTraits task_traits,
+ // Records |Now() - posted_time| to the
+ // ThreadPool.TaskLatencyMicroseconds.[label].[priority] histogram.
+ void RecordLatencyHistogram(TaskPriority priority,
TimeTicks posted_time) const;
void IncrementNumTasksRun();
@@ -230,6 +218,9 @@ class BASE_EXPORT TaskTracker {
TaskAnnotator task_annotator_;
+ // Suffix for histograms recorded by this TaskTracker.
+ const std::string histogram_label_;
+
// Indicates whether logging information about TaskPriority::BEST_EFFORT tasks
// was enabled with a command line switch.
const bool has_log_best_effort_tasks_switch_;
@@ -277,25 +268,6 @@ class BASE_EXPORT TaskTracker {
// a task queued to histogram.
std::atomic_int num_tasks_run_{0};
- // ThreadPool.TaskLatencyMicroseconds.*,
- // ThreadPool.HeartbeatLatencyMicroseconds.*, and
- // ThreadPool.NumTasksRunWhileQueuing.* histograms. The first index is
- // a TaskPriority. The second index is 0 for non-blocking tasks, 1 for
- // blocking tasks. Intentionally leaked.
- // TODO(scheduler-dev): Consider using STATIC_HISTOGRAM_POINTER_GROUP for
- // these.
- using TaskPriorityType = std::underlying_type<TaskPriority>::type;
- static constexpr TaskPriorityType kNumTaskPriorities =
- static_cast<TaskPriorityType>(TaskPriority::HIGHEST) + 1;
- static constexpr uint8_t kNumBlockingModes = 2;
- HistogramBase* const task_latency_histograms_[kNumTaskPriorities]
- [kNumBlockingModes];
- HistogramBase* const heartbeat_latency_histograms_[kNumTaskPriorities]
- [kNumBlockingModes];
- HistogramBase* const
- num_tasks_run_while_queuing_histograms_[kNumTaskPriorities]
- [kNumBlockingModes];
-
// Ensures all state (e.g. dangling cleaned up workers) is coalesced before
// destroying the TaskTracker (e.g. in test environments).
// Ref. https://crbug.com/827615.
diff --git a/chromium/base/task/thread_pool/task_tracker_unittest.cc b/chromium/base/task/thread_pool/task_tracker_unittest.cc
index fa231c9b7be..a3ba41cf559 100644
--- a/chromium/base/task/thread_pool/task_tracker_unittest.cc
+++ b/chromium/base/task/thread_pool/task_tracker_unittest.cc
@@ -19,7 +19,6 @@
#include "base/memory/ref_counted.h"
#include "base/metrics/histogram_base.h"
#include "base/metrics/histogram_samples.h"
-#include "base/metrics/statistics_recorder.h"
#include "base/sequence_token.h"
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
@@ -37,6 +36,7 @@
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "base/threading/simple_thread.h"
+#include "base/threading/thread_restrictions.h"
#include "base/threading/thread_task_runner_handle.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -1244,8 +1244,6 @@ TEST(ThreadPoolTaskTrackerWaitAllowedTest, WaitAllowed) {
// Verify that ThreadPool.TaskLatency.* histograms are correctly recorded
// when a task runs.
TEST(ThreadPoolTaskTrackerHistogramTest, TaskLatency) {
- auto statistics_recorder = StatisticsRecorder::CreateTemporaryForTesting();
-
TaskTracker tracker("Test");
struct {
@@ -1257,28 +1255,28 @@ TEST(ThreadPoolTaskTrackerHistogramTest, TaskLatency) {
"BackgroundTaskPriority"},
{{ThreadPool(), MayBlock(), TaskPriority::BEST_EFFORT},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "BackgroundTaskPriority_MayBlock"},
+ "BackgroundTaskPriority"},
{{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "BackgroundTaskPriority_MayBlock"},
+ "BackgroundTaskPriority"},
{{ThreadPool(), TaskPriority::USER_VISIBLE},
"ThreadPool.TaskLatencyMicroseconds.Test."
"UserVisibleTaskPriority"},
{{ThreadPool(), MayBlock(), TaskPriority::USER_VISIBLE},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "UserVisibleTaskPriority_MayBlock"},
+ "UserVisibleTaskPriority"},
{{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::USER_VISIBLE},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "UserVisibleTaskPriority_MayBlock"},
+ "UserVisibleTaskPriority"},
{{ThreadPool(), TaskPriority::USER_BLOCKING},
"ThreadPool.TaskLatencyMicroseconds.Test."
"UserBlockingTaskPriority"},
{{ThreadPool(), MayBlock(), TaskPriority::USER_BLOCKING},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "UserBlockingTaskPriority_MayBlock"},
+ "UserBlockingTaskPriority"},
{{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::USER_BLOCKING},
"ThreadPool.TaskLatencyMicroseconds.Test."
- "UserBlockingTaskPriority_MayBlock"}};
+ "UserBlockingTaskPriority"}};
for (const auto& test : kTests) {
Task task(FROM_HERE, DoNothing(), TimeDelta());
diff --git a/chromium/base/task/thread_pool/test_utils.cc b/chromium/base/task/thread_pool/test_utils.cc
index a02e24bf3a6..d6236373f84 100644
--- a/chromium/base/task/thread_pool/test_utils.cc
+++ b/chromium/base/task/thread_pool/test_utils.cc
@@ -239,6 +239,11 @@ bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource(
return true;
}
+void MockPooledTaskRunnerDelegate::RemoveJobTaskSource(
+ scoped_refptr<JobTaskSource> task_source) {
+ thread_group_->RemoveTaskSource(*task_source);
+}
+
bool MockPooledTaskRunnerDelegate::IsRunningPoolWithTraits(
const TaskTraits& traits) const {
// |thread_group_| must be initialized with SetThreadGroup() before
diff --git a/chromium/base/task/thread_pool/test_utils.h b/chromium/base/task/thread_pool/test_utils.h
index 1d85ee8cea2..ebdfbe28467 100644
--- a/chromium/base/task/thread_pool/test_utils.h
+++ b/chromium/base/task/thread_pool/test_utils.h
@@ -62,6 +62,7 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate {
bool PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) override;
bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
+ void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
bool ShouldYield(const TaskSource* task_source) const override;
bool IsRunningPoolWithTraits(const TaskTraits& traits) const override;
void UpdatePriority(scoped_refptr<TaskSource> task_source,
diff --git a/chromium/base/task/thread_pool/thread_group.cc b/chromium/base/task/thread_pool/thread_group.cc
index 7f6dcef2630..8de5b518fde 100644
--- a/chromium/base/task/thread_pool/thread_group.cc
+++ b/chromium/base/task/thread_pool/thread_group.cc
@@ -139,9 +139,9 @@ ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
}
RegisteredTaskSource ThreadGroup::RemoveTaskSource(
- scoped_refptr<TaskSource> task_source) {
+ const TaskSource& task_source) {
CheckedAutoLock auto_lock(lock_);
- return priority_queue_.RemoveTaskSource(std::move(task_source));
+ return priority_queue_.RemoveTaskSource(task_source);
}
void ThreadGroup::ReEnqueueTaskSourceLockRequired(
diff --git a/chromium/base/task/thread_pool/thread_group.h b/chromium/base/task/thread_pool/thread_group.h
index c00755413f5..4db7bd5f66d 100644
--- a/chromium/base/task/thread_pool/thread_group.h
+++ b/chromium/base/task/thread_pool/thread_group.h
@@ -65,7 +65,7 @@ class BASE_EXPORT ThreadGroup {
// RegisteredTaskSource that evaluats to true if successful, or false if
// |task_source| is not currently in |priority_queue_|, such as when a worker
// is running a task from it.
- RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source);
+ RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source);
// Updates the position of the TaskSource in |transaction| in this
// ThreadGroup's PriorityQueue based on the TaskSource's current traits.
diff --git a/chromium/base/task/thread_pool/thread_group_impl.cc b/chromium/base/task/thread_pool/thread_group_impl.cc
index 378493afc78..0c17b47d31b 100644
--- a/chromium/base/task/thread_pool/thread_group_impl.cc
+++ b/chromium/base/task/thread_pool/thread_group_impl.cc
@@ -41,25 +41,6 @@
#include "base/win/windows_version.h"
#endif // defined(OS_WIN)
-// Data from deprecated UMA histograms:
-//
-// ThreadPool.NumTasksBetweenWaits.(Browser/Renderer).Foreground, August 2019
-// Number of tasks between two waits by a foreground worker thread in a
-// browser/renderer process.
-//
-// Windows (browser/renderer)
-// 1 at 87th percentile / 92th percentile
-// 2 at 95th percentile / 98th percentile
-// 5 at 99th percentile / 100th percentile
-// Mac (browser/renderer)
-// 1 at 81th percentile / 90th percentile
-// 2 at 92th percentile / 97th percentile
-// 5 at 98th percentile / 100th percentile
-// Android (browser/renderer)
-// 1 at 92th percentile / 96th percentile
-// 2 at 97th percentile / 98th percentile
-// 5 at 99th percentile / 100th percentile
-
namespace base {
namespace internal {
@@ -342,40 +323,57 @@ ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
priority_hint_(priority_hint),
idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
// Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
- detach_duration_histogram_(Histogram::FactoryTimeGet(
- JoinString({kDetachDurationHistogramPrefix, histogram_label}, ""),
- TimeDelta::FromMilliseconds(1),
- TimeDelta::FromHours(1),
- 50,
- HistogramBase::kUmaTargetedHistogramFlag)),
+ detach_duration_histogram_(
+ histogram_label.empty()
+ ? nullptr
+ : Histogram::FactoryTimeGet(
+ JoinString(
+ {kDetachDurationHistogramPrefix, histogram_label},
+ ""),
+ TimeDelta::FromMilliseconds(1),
+ TimeDelta::FromHours(1),
+ 50,
+ HistogramBase::kUmaTargetedHistogramFlag)),
// Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
// than 1000 tasks before detaching, there is no need to know the exact
// number of tasks that ran.
- num_tasks_before_detach_histogram_(Histogram::FactoryGet(
- JoinString({kNumTasksBeforeDetachHistogramPrefix, histogram_label},
- ""),
- 1,
- 1000,
- 50,
- HistogramBase::kUmaTargetedHistogramFlag)),
+ num_tasks_before_detach_histogram_(
+ histogram_label.empty()
+ ? nullptr
+ : Histogram::FactoryGet(
+ JoinString(
+ {kNumTasksBeforeDetachHistogramPrefix, histogram_label},
+ ""),
+ 1,
+ 1000,
+ 50,
+ HistogramBase::kUmaTargetedHistogramFlag)),
// Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A ThreadGroup is
// expected to run between zero and a few tens of workers.
// When it runs more than 100 worker, there is no need to know the exact
// number of workers that ran.
- num_workers_histogram_(Histogram::FactoryGet(
- JoinString({kNumWorkersHistogramPrefix, histogram_label}, ""),
- 1,
- 100,
- 50,
- HistogramBase::kUmaTargetedHistogramFlag)),
- num_active_workers_histogram_(Histogram::FactoryGet(
- JoinString({kNumActiveWorkersHistogramPrefix, histogram_label}, ""),
- 1,
- 100,
- 50,
- HistogramBase::kUmaTargetedHistogramFlag)),
+ num_workers_histogram_(
+ histogram_label.empty()
+ ? nullptr
+ : Histogram::FactoryGet(
+ JoinString({kNumWorkersHistogramPrefix, histogram_label},
+ ""),
+ 1,
+ 100,
+ 50,
+ HistogramBase::kUmaTargetedHistogramFlag)),
+ num_active_workers_histogram_(
+ histogram_label.empty()
+ ? nullptr
+ : Histogram::FactoryGet(
+ JoinString(
+ {kNumActiveWorkersHistogramPrefix, histogram_label},
+ ""),
+ 1,
+ 100,
+ 50,
+ HistogramBase::kUmaTargetedHistogramFlag)),
tracked_ref_factory_(this) {
- DCHECK(!histogram_label.empty());
DCHECK(!thread_group_label_.empty());
}
@@ -484,10 +482,6 @@ void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
}
void ThreadGroupImpl::JoinForTesting() {
-#if DCHECK_IS_ON()
- join_for_testing_started_.Set();
-#endif
-
decltype(workers_) workers_copy;
{
CheckedAutoLock auto_lock(lock_);
@@ -496,6 +490,8 @@ void ThreadGroupImpl::JoinForTesting() {
DCHECK_GT(workers_.size(), size_t(0))
<< "Joined an unstarted thread group.";
+ join_for_testing_started_ = true;
+
// Ensure WorkerThreads in |workers_| do not attempt to cleanup while
// being joined.
worker_cleanup_disallowed_for_testing_ = true;
@@ -531,10 +527,13 @@ size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
void ThreadGroupImpl::ReportHeartbeatMetrics() const {
CheckedAutoLock auto_lock(lock_);
- num_workers_histogram_->Add(workers_.size());
-
- num_active_workers_histogram_->Add(workers_.size() -
- idle_workers_stack_.Size());
+ if (num_workers_histogram_) {
+ num_workers_histogram_->Add(workers_.size());
+ }
+ if (num_active_workers_histogram_) {
+ num_active_workers_histogram_->Add(workers_.size() -
+ idle_workers_stack_.Size());
+ }
}
ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
@@ -710,10 +709,13 @@ bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
WorkerThread* worker) {
+ DCHECK(!outer_->join_for_testing_started_);
DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
- outer_->num_tasks_before_detach_histogram_->Add(
- worker_only().num_tasks_since_last_detach);
+ if (outer_->num_tasks_before_detach_histogram_) {
+ outer_->num_tasks_before_detach_histogram_->Add(
+ worker_only().num_tasks_since_last_detach);
+ }
outer_->cleanup_timestamps_.push(subtle::TimeTicksNowIgnoringOverride());
worker->Cleanup();
outer_->idle_workers_stack_.Remove(worker);
@@ -756,7 +758,7 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
// |workers_| by the time the thread is about to exit. (except in the cases
// where the thread group is no longer going to be used - in which case,
// it's fine for there to be invalid workers in the thread group.
- if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) {
+ if (!shutdown_complete && !outer_->join_for_testing_started_) {
DCHECK(!outer_->idle_workers_stack_.Contains(worker));
DCHECK(!ContainsWorker(outer_->workers_, worker));
}
@@ -944,6 +946,7 @@ void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
scoped_refptr<WorkerThread>
ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
ScopedWorkersExecutor* executor) {
+ DCHECK(!join_for_testing_started_);
DCHECK_LT(workers_.size(), max_tasks_);
DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
DCHECK(idle_workers_stack_.IsEmpty());
@@ -962,8 +965,10 @@ ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
DCHECK_LE(workers_.size(), max_tasks_);
if (!cleanup_timestamps_.empty()) {
- detach_duration_histogram_->AddTime(subtle::TimeTicksNowIgnoringOverride() -
- cleanup_timestamps_.top());
+ if (detach_duration_histogram_) {
+ detach_duration_histogram_->AddTime(
+ subtle::TimeTicksNowIgnoringOverride() - cleanup_timestamps_.top());
+ }
cleanup_timestamps_.pop();
}
@@ -1011,7 +1016,7 @@ void ThreadGroupImpl::DidUpdateCanRunPolicy() {
void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
BaseScopedWorkersExecutor* base_executor) {
// Don't do anything if the thread group isn't started.
- if (max_tasks_ == 0)
+ if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
return;
ScopedWorkersExecutor* executor =
diff --git a/chromium/base/task/thread_pool/thread_group_impl.h b/chromium/base/task/thread_pool/thread_group_impl.h
index 3df96cfdef0..e0782bf2614 100644
--- a/chromium/base/task/thread_pool/thread_group_impl.h
+++ b/chromium/base/task/thread_pool/thread_group_impl.h
@@ -20,7 +20,6 @@
#include "base/memory/ref_counted.h"
#include "base/optional.h"
#include "base/strings/string_piece.h"
-#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/thread_pool/task.h"
@@ -329,10 +328,8 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
GUARDED_BY(lock_);
-#if DCHECK_IS_ON()
// Set at the start of JoinForTesting().
- AtomicFlag join_for_testing_started_;
-#endif
+ bool join_for_testing_started_ GUARDED_BY(lock_) = false;
// ThreadPool.DetachDuration.[thread group name] histogram. Intentionally
// leaked.
diff --git a/chromium/base/task/thread_pool/thread_group_unittest.cc b/chromium/base/task/thread_pool/thread_group_unittest.cc
index cb4481c3cf1..3ed430a60db 100644
--- a/chromium/base/task/thread_pool/thread_group_unittest.cc
+++ b/chromium/base/task/thread_pool/thread_group_unittest.cc
@@ -53,6 +53,7 @@ using ThreadGroupNativeType =
#endif
constexpr size_t kMaxTasks = 4;
+constexpr size_t kTooManyTasks = 1000;
// By default, tests allow half of the thread group to be used by best-effort
// tasks.
constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
@@ -654,6 +655,50 @@ TEST_P(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
task_tracker_.FlushForTesting();
}
+// Verify that Cancel() on a job stops running the worker task and causes
+// current workers to yield.
+TEST_P(ThreadGroupTest, CancelJobTaskSource) {
+ StartThreadGroup();
+
+ CheckedLock tasks_running_lock;
+ std::unique_ptr<ConditionVariable> tasks_running_cv =
+ tasks_running_lock.CreateConditionVariable();
+ bool tasks_running = false;
+
+ // Schedule a big number of tasks.
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ BindLambdaForTesting([&](experimental::JobDelegate* delegate) {
+ {
+ CheckedAutoLock auto_lock(tasks_running_lock);
+ tasks_running = true;
+ }
+ tasks_running_cv->Signal();
+
+ while (!delegate->ShouldYield()) {
+ }
+ }),
+ /* num_tasks_to_run */ kTooManyTasks);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, {ThreadPool()}, &mock_pooled_task_runner_delegate_);
+
+ mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
+ experimental::JobHandle job_handle =
+ internal::JobTaskSource::CreateJobHandle(task_source);
+
+ // Wait for at least 1 task to start running.
+ {
+ CheckedAutoLock auto_lock(tasks_running_lock);
+ while (!tasks_running)
+ tasks_running_cv->Wait();
+ }
+
+ // Cancels pending tasks and unblocks running ones.
+ job_handle.Cancel();
+
+ // This should not block since the job got cancelled.
+ task_tracker_.FlushForTesting();
+}
+
// Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
// tasks with the intended concurrency.
TEST_P(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
@@ -733,6 +778,37 @@ TEST_P(ThreadGroupTest, ScheduleEmptyJobTaskSource) {
task_tracker_.FlushForTesting();
}
+// Verify that Join() on a job contributes to max concurrency and waits for all
+// workers to return.
+TEST_P(ThreadGroupTest, JoinJobTaskSource) {
+ StartThreadGroup();
+
+ WaitableEvent threads_continue;
+ RepeatingClosure threads_continue_barrier = BarrierClosure(
+ kMaxTasks + 1,
+ BindOnce(&WaitableEvent::Signal, Unretained(&threads_continue)));
+
+ auto job_task = base::MakeRefCounted<test::MockJobTask>(
+ BindLambdaForTesting([&](experimental::JobDelegate*) {
+ threads_continue_barrier.Run();
+ test::WaitWithoutBlockingObserver(&threads_continue);
+ }),
+ /* num_tasks_to_run */ kMaxTasks + 1);
+ scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
+ FROM_HERE, {ThreadPool()}, &mock_pooled_task_runner_delegate_);
+
+ mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
+ experimental::JobHandle job_handle =
+ internal::JobTaskSource::CreateJobHandle(task_source);
+ job_handle.Join();
+ // All worker tasks should complete before Join() returns.
+ EXPECT_EQ(0U, job_task->GetMaxConcurrency());
+ thread_group_->JoinForTesting();
+ EXPECT_EQ(1U, task_source->HasOneRef());
+ // Prevent TearDown() from calling JoinForTesting() again.
+ thread_group_ = nullptr;
+}
+
// Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
// in a thread group does not affect JobTaskSource with a priority that was
// increased from BEST_EFFORT to USER_BLOCKING.
diff --git a/chromium/base/task/thread_pool/thread_pool_impl.cc b/chromium/base/task/thread_pool/thread_pool_impl.cc
index 64d580d08ad..ff1bfa78211 100644
--- a/chromium/base/task/thread_pool/thread_pool_impl.cc
+++ b/chromium/base/task/thread_pool/thread_pool_impl.cc
@@ -77,20 +77,23 @@ ThreadPoolImpl::ThreadPoolImpl(StringPiece histogram_label,
&delayed_task_manager_),
has_disable_best_effort_switch_(HasDisableBestEffortTasksSwitch()),
tracked_ref_factory_(this) {
- DCHECK(!histogram_label.empty());
-
foreground_thread_group_ = std::make_unique<ThreadGroupImpl>(
- JoinString(
- {histogram_label, kForegroundPoolEnvironmentParams.name_suffix}, "."),
+ histogram_label.empty()
+ ? std::string()
+ : JoinString(
+ {histogram_label, kForegroundPoolEnvironmentParams.name_suffix},
+ "."),
kForegroundPoolEnvironmentParams.name_suffix,
kForegroundPoolEnvironmentParams.priority_hint,
task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
if (CanUseBackgroundPriorityForWorkerThread()) {
background_thread_group_ = std::make_unique<ThreadGroupImpl>(
- JoinString(
- {histogram_label, kBackgroundPoolEnvironmentParams.name_suffix},
- "."),
+ histogram_label.empty()
+ ? std::string()
+ : JoinString({histogram_label,
+ kBackgroundPoolEnvironmentParams.name_suffix},
+ "."),
kBackgroundPoolEnvironmentParams.name_suffix,
kBackgroundPoolEnvironmentParams.priority_hint,
task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
@@ -423,6 +426,14 @@ bool ThreadPoolImpl::EnqueueJobTaskSource(
return true;
}
+void ThreadPoolImpl::RemoveJobTaskSource(
+ scoped_refptr<JobTaskSource> task_source) {
+ auto transaction = task_source->BeginTransaction();
+ ThreadGroup* const current_thread_group =
+ GetThreadGroupForTraits(transaction.traits());
+ current_thread_group->RemoveTaskSource(*task_source);
+}
+
bool ThreadPoolImpl::IsRunningPoolWithTraits(const TaskTraits& traits) const {
return GetThreadGroupForTraits(traits)->IsBoundToCurrentThread();
}
@@ -455,7 +466,7 @@ void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source,
// |task_source| is changing thread groups; remove it from its current
// thread group and reenqueue it.
auto registered_task_source =
- current_thread_group->RemoveTaskSource(task_source);
+ current_thread_group->RemoveTaskSource(*task_source);
if (registered_task_source) {
DCHECK(task_source);
new_thread_group->PushTaskSourceAndWakeUpWorkers(
diff --git a/chromium/base/task/thread_pool/thread_pool_impl.h b/chromium/base/task/thread_pool/thread_pool_impl.h
index 049ee6de692..8b3a6760ce9 100644
--- a/chromium/base/task/thread_pool/thread_pool_impl.h
+++ b/chromium/base/task/thread_pool/thread_pool_impl.h
@@ -60,8 +60,8 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
TaskTracker;
#endif
- // Creates a ThreadPoolImpl with a production TaskTracker.
- //|histogram_label| is used to label histograms, it must not be empty.
+ // Creates a ThreadPoolImpl with a production TaskTracker. |histogram_label|
+ // is used to label histograms. No histograms are recorded if it is empty.
explicit ThreadPoolImpl(StringPiece histogram_label);
// For testing only. Creates a ThreadPoolImpl with a custom TaskTracker.
@@ -103,6 +103,7 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
// PooledTaskRunnerDelegate:
bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
+ void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
void UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override;