diff options
author | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-01-23 17:21:03 +0100 |
---|---|---|
committer | Allan Sandfeld Jensen <allan.jensen@qt.io> | 2020-01-23 16:25:15 +0000 |
commit | c551f43206405019121bd2b2c93714319a0a3300 (patch) | |
tree | 1f48c30631c421fd4bbb3c36da20183c8a2ed7d7 /chromium/base/task | |
parent | 7961cea6d1041e3e454dae6a1da660b453efd238 (diff) |
BASELINE: Update Chromium to 79.0.3945.139
Change-Id: I336b7182fab9bca80b709682489c07db112eaca5
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@qt.io>
Diffstat (limited to 'chromium/base/task')
80 files changed, 2676 insertions, 970 deletions
diff --git a/chromium/base/task/post_job.cc b/chromium/base/task/post_job.cc index e57b32e4b40..3c83068995a 100644 --- a/chromium/base/task/post_job.cc +++ b/chromium/base/task/post_job.cc @@ -4,8 +4,11 @@ #include "base/task/post_job.h" +#include "base/task/scoped_set_task_priority_for_current_thread.h" #include "base/task/thread_pool/job_task_source.h" #include "base/task/thread_pool/pooled_task_runner_delegate.h" +#include "base/task/thread_pool/thread_pool_impl.h" +#include "base/task/thread_pool/thread_pool_instance.h" namespace base { namespace experimental { @@ -16,7 +19,6 @@ JobDelegate::JobDelegate( : task_source_(task_source), pooled_task_runner_delegate_(pooled_task_runner_delegate) { DCHECK(task_source_); - DCHECK(pooled_task_runner_delegate_); #if DCHECK_IS_ON() recorded_increase_version_ = task_source_->GetConcurrencyIncreaseVersion(); // Record max concurrency before running the worker task. @@ -42,7 +44,9 @@ bool JobDelegate::ShouldYield() { AssertExpectedConcurrency(recorded_max_concurrency_); #endif // DCHECK_IS_ON() const bool should_yield = - pooled_task_runner_delegate_->ShouldYield(task_source_); + task_source_->ShouldYield() || + (pooled_task_runner_delegate_ && + pooled_task_runner_delegate_->ShouldYield(task_source_)); #if DCHECK_IS_ON() last_should_yield_ = should_yield; @@ -98,5 +102,91 @@ void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) { #endif // DCHECK_IS_ON() } +JobHandle::JobHandle() = default; + +JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source) + : task_source_(std::move(task_source)) {} + +JobHandle::~JobHandle() { + DCHECK(!task_source_) + << "The Job must be cancelled, detached or joined before its " + "JobHandle is destroyed."; +} + +JobHandle::JobHandle(JobHandle&&) = default; + +JobHandle& JobHandle::operator=(JobHandle&& other) { + DCHECK(!task_source_) + << "The Job must be cancelled, detached or joined before its " + "JobHandle is re-assigned."; + task_source_ = std::move(other.task_source_); + return *this; +} + +void JobHandle::UpdatePriority(TaskPriority new_priority) { + task_source_->delegate()->UpdatePriority(task_source_, new_priority); +} + +void JobHandle::NotifyConcurrencyIncrease() { + task_source_->NotifyConcurrencyIncrease(); +} + +void JobHandle::Join() { + DCHECK_GE(internal::GetTaskPriorityForCurrentThread(), + task_source_->priority_racy()) + << "Join may not be called on Job with higher priority than the current " + "one."; + UpdatePriority(internal::GetTaskPriorityForCurrentThread()); + bool must_run = task_source_->WillJoin(); + while (must_run) + must_run = task_source_->RunJoinTask(); + // Remove |task_source_| from the ThreadPool to prevent access to + // |max_concurrency_callback| after Join(). + task_source_->delegate()->RemoveJobTaskSource(task_source_); + task_source_ = nullptr; +} + +void JobHandle::Cancel() { + task_source_->Cancel(); + Join(); +} + +void JobHandle::CancelAndDetach() { + task_source_->Cancel(); + Detach(); +} + +void JobHandle::Detach() { + DCHECK(task_source_); + task_source_ = nullptr; +} + +JobHandle PostJob(const Location& from_here, + const TaskTraits& traits, + RepeatingCallback<void(JobDelegate*)> worker_task, + RepeatingCallback<size_t()> max_concurrency_callback) { + DCHECK(ThreadPoolInstance::Get()) + << "Ref. Prerequisite section of post_task.h.\n\n" + "Hint: if this is in a unit test, you're likely merely missing a " + "base::test::TaskEnvironment member in your fixture.\n"; + DCHECK(traits.use_thread_pool()) + << "The base::ThreadPool() trait is mandatory with PostJob()."; + DCHECK_EQ(traits.extension_id(), + TaskTraitsExtensionStorage::kInvalidExtensionId) + << "Extension traits cannot be used with PostJob()."; + TaskTraits adjusted_traits = traits; + adjusted_traits.InheritPriority(internal::GetTaskPriorityForCurrentThread()); + auto task_source = base::MakeRefCounted<internal::JobTaskSource>( + from_here, adjusted_traits, std::move(worker_task), + std::move(max_concurrency_callback), + static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())); + const bool queued = + static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()) + ->EnqueueJobTaskSource(task_source); + if (queued) + return internal::JobTaskSource::CreateJobHandle(std::move(task_source)); + return JobHandle(); +} + } // namespace experimental } // namespace base
\ No newline at end of file diff --git a/chromium/base/task/post_job.h b/chromium/base/task/post_job.h index de6b2d66ac3..cf8ca1aec70 100644 --- a/chromium/base/task/post_job.h +++ b/chromium/base/task/post_job.h @@ -6,8 +6,12 @@ #define BASE_TASK_POST_JOB_H_ #include "base/base_export.h" +#include "base/callback.h" +#include "base/location.h" #include "base/logging.h" #include "base/macros.h" +#include "base/memory/ref_counted.h" +#include "base/task/task_traits.h" #include "base/time/time.h" namespace base { @@ -23,8 +27,9 @@ class BASE_EXPORT JobDelegate { public: // A JobDelegate is instantiated for each worker task that is run. // |task_source| is the task source whose worker task is running with this - // delegate and |pooled_task_runner_delegate| provides communication with the - // thread pool. + // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to + // check whether the pool wants this worker task to yield (null if this worker + // should never yield -- e.g. when the main thread is a worker). JobDelegate(internal::JobTaskSource* task_source, internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate); ~JobDelegate(); @@ -42,7 +47,7 @@ class BASE_EXPORT JobDelegate { void YieldIfNeeded(); // Notifies the scheduler that max concurrency was increased, and the number - // of worker should be adjusted. + // of worker should be adjusted accordingly. See PostJob() for more details. void NotifyConcurrencyIncrease(); private: @@ -67,6 +72,95 @@ class BASE_EXPORT JobDelegate { DISALLOW_COPY_AND_ASSIGN(JobDelegate); }; +// Handle returned when posting a Job. Provides methods to control execution of +// the posted Job. +class BASE_EXPORT JobHandle { + public: + JobHandle(); + // A job must either be joined, canceled or detached before the JobHandle is + // destroyed. + ~JobHandle(); + + JobHandle(JobHandle&&); + JobHandle& operator=(JobHandle&&); + + // Update this Job's priority. + void UpdatePriority(TaskPriority new_priority); + + // Notifies the scheduler that max concurrency was increased, and the number + // of workers should be adjusted accordingly. See PostJob() for more details. + void NotifyConcurrencyIncrease(); + + // Contributes to the job on this thread. Doesn't return until all tasks have + // completed and max concurrency becomes 0. This also promotes this Job's + // priority to be at least as high as the calling thread's priority. + void Join(); + + // Forces all existing workers to yield ASAP. Waits until they have all + // returned from the Job's callback before returning. + void Cancel(); + + // Forces all existing workers to yield ASAP but doesn’t wait for them. + // Warning, this is dangerous if the Job's callback is bound to or has access + // to state which may be deleted after this call. + void CancelAndDetach(); + + // Can be invoked before ~JobHandle() to avoid waiting on the job completing. + void Detach(); + + private: + friend class internal::JobTaskSource; + + explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source); + + scoped_refptr<internal::JobTaskSource> task_source_; + + DISALLOW_COPY_AND_ASSIGN(JobHandle); +}; + +// Posts a repeating |worker_task| with specific |traits| to run in parallel. +// Returns a JobHandle associated with the Job, which can be joined, canceled or +// detached. +// To avoid scheduling overhead, |worker_task| should do as much work as +// possible in a loop when invoked, and JobDelegate::ShouldYield() should be +// periodically invoked to conditionally exit and let the scheduler prioritize +// work. +// +// A canonical implementation of |worker_task| looks like: +// void WorkerTask(JobDelegate* job_delegate) { +// while (!job_delegate->ShouldYield()) { +// auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work. +// if (!work_item) +// return: +// ProcessWork(work_item); +// } +// } +// +// |max_concurrency_callback| controls the maximum number of threads calling +// |worker_task| concurrently. |worker_task| is only invoked if the number of +// threads previously running |worker_task| was less than the value returned by +// |max_concurrency_callback|. In general, |max_concurrency_callback| should +// return the latest number of incomplete work items (smallest unit of work) +// left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must* +// be invoked shortly after |max_concurrency_callback| starts returning a value +// larger than previously returned values. This usually happens when new work +// items are added and the API user wants additional threads to invoke +// |worker_task| concurrently. The callbacks may be called concurrently on any +// thread until the job is complete. If the job handle is detached, the +// callbacks may still be called, so they must not access global state that +// could be destroyed. +// +// |traits| requirements: +// - base::ThreadPool() must be specified. +// - Extension traits (e.g. BrowserThread) cannot be specified. +// - base::ThreadPolicy must be specified if the priority of the task runner +// will ever be increased from BEST_EFFORT. +JobHandle BASE_EXPORT +PostJob(const Location& from_here, + const TaskTraits& traits, + RepeatingCallback<void(JobDelegate*)> worker_task, + RepeatingCallback<size_t()> max_concurrency_callback); + } // namespace experimental } // namespace base diff --git a/chromium/base/task/post_job_unittest.cc b/chromium/base/task/post_job_unittest.cc new file mode 100644 index 00000000000..3fc3d6d4b07 --- /dev/null +++ b/chromium/base/task/post_job_unittest.cc @@ -0,0 +1,40 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/task/post_job.h" + +#include <atomic> + +#include "base/task/test_task_traits_extension.h" +#include "base/test/bind_test_util.h" +#include "base/test/gtest_util.h" +#include "base/test/task_environment.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +TEST(PostJobTest, PostJobSimple) { + test::TaskEnvironment task_environment; + std::atomic_size_t num_tasks_to_run(4); + auto handle = experimental::PostJob( + FROM_HERE, ThreadPool(), + BindLambdaForTesting( + [&](experimental::JobDelegate* delegate) { --num_tasks_to_run; }), + BindLambdaForTesting([&]() -> size_t { return num_tasks_to_run; })); + handle.Join(); + DCHECK_EQ(num_tasks_to_run, 0U); +} + +TEST(PostJobTest, PostJobExtension) { + testing::FLAGS_gtest_death_test_style = "threadsafe"; + EXPECT_DCHECK_DEATH({ + auto handle = experimental::PostJob( + FROM_HERE, TestExtensionBoolTrait(), + BindRepeating([](experimental::JobDelegate* delegate) {}), + BindRepeating([]() -> size_t { return 0; })); + }); +} + +} // namespace base
\ No newline at end of file diff --git a/chromium/base/task/post_task.cc b/chromium/base/task/post_task.cc index 76415fbfe56..244e5f6adc8 100644 --- a/chromium/base/task/post_task.cc +++ b/chromium/base/task/post_task.cc @@ -41,6 +41,13 @@ TaskTraits GetTaskTraitsWithExplicitPriority(TaskTraits traits) { } TaskExecutor* GetTaskExecutorForTraits(const TaskTraits& traits) { + if (traits.use_current_thread()) { + TaskExecutor* executor = GetTaskExecutorForCurrentThread(); + DCHECK(executor) << "Couldn't find a TaskExecutor for this thread. Note " + "you can't use base::CurrentThread in a one-off " + "base::ThreadPool task."; + return executor; + } TaskExecutor* executor = GetRegisteredTaskExecutorForTraits(traits); DCHECK(executor || ThreadPoolInstance::Get()) << "Ref. Prerequisite section of post_task.h.\n\n" @@ -139,54 +146,4 @@ scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner( } #endif // defined(OS_WIN) -// TODO(crbug.com/968047): Update all call sites and remove these forwarding -// wrappers. -bool PostTaskWithTraits(const Location& from_here, - const TaskTraits& traits, - OnceClosure task) { - return PostTask(from_here, traits, std::move(task)); -} - -bool PostDelayedTaskWithTraits(const Location& from_here, - const TaskTraits& traits, - OnceClosure task, - TimeDelta delay) { - return PostDelayedTask(from_here, traits, std::move(task), delay); -} - -bool PostTaskWithTraitsAndReply(const Location& from_here, - const TaskTraits& traits, - OnceClosure task, - OnceClosure reply) { - return PostTaskAndReply(from_here, traits, std::move(task), std::move(reply)); -} - -scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits(const TaskTraits& traits) { - return CreateTaskRunner(traits); -} - -scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunnerWithTraits( - const TaskTraits& traits) { - return CreateSequencedTaskRunner(traits); -} - -scoped_refptr<UpdateableSequencedTaskRunner> -CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits) { - return CreateUpdateableSequencedTaskRunner(traits); -} - -scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunnerWithTraits( - const TaskTraits& traits, - SingleThreadTaskRunnerThreadMode thread_mode) { - return CreateSingleThreadTaskRunner(traits, thread_mode); -} - -#if defined(OS_WIN) -scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunnerWithTraits( - const TaskTraits& traits, - SingleThreadTaskRunnerThreadMode thread_mode) { - return CreateCOMSTATaskRunner(traits, thread_mode); -} -#endif // defined(OS_WIN) - } // namespace base diff --git a/chromium/base/task/post_task.h b/chromium/base/task/post_task.h index b02fd65a032..9f9b5a24fa3 100644 --- a/chromium/base/task/post_task.h +++ b/chromium/base/task/post_task.h @@ -171,21 +171,6 @@ bool PostTaskAndReplyWithResult(const Location& from_here, std::move(reply), Owned(result))); } -// Temporary wrapper for PostTaskAndReplyWithResult. -// TODO(crbug.com/968047): Update all call sites and remove. -template <template <typename> class CallbackType, - typename TaskReturnType, - typename ReplyArgType, - typename = EnableIfIsBaseCallback<CallbackType>> -bool PostTaskWithTraitsAndReplyWithResult( - const Location& from_here, - const TaskTraits& traits, - CallbackType<TaskReturnType()> task, - CallbackType<void(ReplyArgType)> reply) { - return PostTaskAndReplyWithResult(from_here, traits, std::move(task), - std::move(reply)); -} - // Returns a TaskRunner whose PostTask invocations result in scheduling tasks // using |traits|. Tasks may run in any order and in parallel. BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunner( @@ -247,38 +232,6 @@ BASE_EXPORT scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner( SingleThreadTaskRunnerThreadMode::SHARED); #endif // defined(OS_WIN) -// Temporary wrappers for the task posting APIs while we remove the "WithTraits" -// suffix. -// TODO(crbug.com/968047): Update all call sites and remove. -BASE_EXPORT bool PostTaskWithTraits(const Location& from_here, - const TaskTraits& traits, - OnceClosure task); -BASE_EXPORT bool PostDelayedTaskWithTraits(const Location& from_here, - const TaskTraits& traits, - OnceClosure task, - TimeDelta delay); -BASE_EXPORT bool PostTaskWithTraitsAndReply(const Location& from_here, - const TaskTraits& traits, - OnceClosure task, - OnceClosure reply); -BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunnerWithTraits( - const TaskTraits& traits); -BASE_EXPORT scoped_refptr<SequencedTaskRunner> -CreateSequencedTaskRunnerWithTraits(const TaskTraits& traits); -BASE_EXPORT scoped_refptr<UpdateableSequencedTaskRunner> -CreateUpdateableSequencedTaskRunnerWithTraits(const TaskTraits& traits); -BASE_EXPORT scoped_refptr<SingleThreadTaskRunner> -CreateSingleThreadTaskRunnerWithTraits( - const TaskTraits& traits, - SingleThreadTaskRunnerThreadMode thread_mode = - SingleThreadTaskRunnerThreadMode::SHARED); -#if defined(OS_WIN) -BASE_EXPORT scoped_refptr<SingleThreadTaskRunner> -CreateCOMSTATaskRunnerWithTraits(const TaskTraits& traits, - SingleThreadTaskRunnerThreadMode thread_mode = - SingleThreadTaskRunnerThreadMode::SHARED); -#endif // defined(OS_WIN) - } // namespace base #endif // BASE_TASK_POST_TASK_H_ diff --git a/chromium/base/task/post_task_unittest.cc b/chromium/base/task/post_task_unittest.cc index ed7fb3db3bb..798eba8e580 100644 --- a/chromium/base/task/post_task_unittest.cc +++ b/chromium/base/task/post_task_unittest.cc @@ -5,9 +5,11 @@ #include "base/task/post_task.h" #include "base/bind_helpers.h" +#include "base/run_loop.h" #include "base/task/scoped_set_task_priority_for_current_thread.h" #include "base/task/task_executor.h" #include "base/task/test_task_traits_extension.h" +#include "base/test/bind_test_util.h" #include "base/test/gtest_util.h" #include "base/test/task_environment.h" #include "base/test/test_simple_task_runner.h" @@ -17,6 +19,8 @@ using ::testing::_; using ::testing::Invoke; +using ::testing::IsNull; +using ::testing::NotNull; using ::testing::Return; namespace base { @@ -177,6 +181,159 @@ TEST_F(PostTaskTestWithExecutor, PostTaskToTaskExecutor) { } } +TEST_F(PostTaskTestWithExecutor, + ThreadPoolTaskRunnerGetTaskExecutorForCurrentThread) { + auto task_runner = CreateTaskRunner({ThreadPool()}); + RunLoop run_loop; + + EXPECT_TRUE(task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + // We don't have an executor for a ThreadPool task runner becuse they + // are for one shot tasks. + EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull()); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolSequencedTaskRunnerGetTaskExecutorForCurrentThread) { + auto sequenced_task_runner = CreateSequencedTaskRunner({ThreadPool()}); + RunLoop run_loop; + + EXPECT_TRUE(sequenced_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull()); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolSingleThreadTaskRunnerGetTaskExecutorForCurrentThread) { + auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()}); + RunLoop run_loop; + + EXPECT_TRUE(single_thread_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull()); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, ThreadPoolTaskRunnerCurrentThreadTrait) { + auto task_runner = CreateTaskRunner({ThreadPool()}); + RunLoop run_loop; + + EXPECT_TRUE(task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() { + // CurrentThread is meaningless in this + // context. + EXPECT_DCHECK_DEATH( + PostTask(FROM_HERE, {CurrentThread()}, + DoNothing())); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolSequencedTaskRunnerCurrentThreadTrait) { + auto sequenced_task_runner = CreateSequencedTaskRunner({ThreadPool()}); + RunLoop run_loop; + + auto current_thread_task = BindLambdaForTesting([&]() { + EXPECT_TRUE(sequenced_task_runner->RunsTasksInCurrentSequence()); + run_loop.Quit(); + }); + + EXPECT_TRUE(sequenced_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_TRUE( + PostTask(FROM_HERE, {CurrentThread()}, current_thread_task)); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolSingleThreadTaskRunnerCurrentThreadTrait) { + auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()}); + RunLoop run_loop; + + auto current_thread_task = BindLambdaForTesting([&]() { + EXPECT_TRUE(single_thread_task_runner->RunsTasksInCurrentSequence()); + run_loop.Quit(); + }); + + EXPECT_TRUE(single_thread_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_TRUE( + PostTask(FROM_HERE, {CurrentThread()}, current_thread_task)); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, ThreadPoolCurrentThreadChangePriority) { + auto single_thread_task_runner = + CreateSingleThreadTaskRunner({ThreadPool(), TaskPriority::USER_BLOCKING}); + RunLoop run_loop; + + auto current_thread_task = BindLambdaForTesting([&]() { + EXPECT_TRUE(single_thread_task_runner->RunsTasksInCurrentSequence()); + run_loop.Quit(); + }); + + EXPECT_TRUE(single_thread_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + // We should be able to request a priority change, although it may be + // ignored. + EXPECT_TRUE(PostTask(FROM_HERE, + {CurrentThread(), TaskPriority::USER_VISIBLE}, + current_thread_task)); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolCurrentThreadCantChangeShutdownBehavior) { + auto single_thread_task_runner = CreateSingleThreadTaskRunner( + {ThreadPool(), TaskShutdownBehavior::SKIP_ON_SHUTDOWN}); + RunLoop run_loop; + + EXPECT_TRUE(single_thread_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_DCHECK_DEATH(PostTask( + FROM_HERE, {CurrentThread(), TaskShutdownBehavior::BLOCK_SHUTDOWN}, + DoNothing())); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + +TEST_F(PostTaskTestWithExecutor, + ThreadPoolCurrentThreadCantSetSyncPrimitivesInNonSyncTaskRunner) { + auto single_thread_task_runner = CreateSingleThreadTaskRunner({ThreadPool()}); + RunLoop run_loop; + + EXPECT_TRUE(single_thread_task_runner->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_DCHECK_DEATH( + PostTask(FROM_HERE, {CurrentThread(), WithBaseSyncPrimitives()}, + DoNothing())); + run_loop.Quit(); + }))); + + run_loop.Run(); +} + TEST_F(PostTaskTestWithExecutor, RegisterExecutorTwice) { testing::FLAGS_gtest_death_test_style = "threadsafe"; EXPECT_DCHECK_DEATH( diff --git a/chromium/base/task/promise/abstract_promise.cc b/chromium/base/task/promise/abstract_promise.cc index f48ba8517e2..315cfd30bfc 100644 --- a/chromium/base/task/promise/abstract_promise.cc +++ b/chromium/base/task/promise/abstract_promise.cc @@ -8,6 +8,7 @@ #include "base/lazy_instance.h" #include "base/sequenced_task_runner.h" #include "base/task/promise/dependent_list.h" +#include "base/task/promise/post_task_executor.h" #include "base/threading/sequenced_task_runner_handle.h" namespace base { @@ -65,6 +66,10 @@ AbstractPromise::~AbstractPromise() { OnCanceled(); } +void AbstractPromise::EmplaceResolvedVoid() { + emplace(Resolved<void>()); +} + bool AbstractPromise::IsCanceled() const { if (dependents_.IsCanceled()) return true; @@ -290,7 +295,7 @@ AbstractPromise* AbstractPromise::GetCurriedPromise() { const PromiseExecutor* AbstractPromise::GetExecutor() const { if (!value_.ContainsPromiseExecutor()) return nullptr; - return value_.Get<internal::PromiseExecutor>(); + return value_.Get<PromiseExecutor>(); } PromiseExecutor::PrerequisitePolicy AbstractPromise::GetPrerequisitePolicy() { @@ -509,7 +514,7 @@ void AbstractPromise::OnRejectMakeDependantsUseCurriedPrerequisite( void AbstractPromise::DispatchPromise() { if (task_runner_) { - task_runner_->PostPromiseInternal(this, TimeDelta()); + task_runner_->PostPromiseInternal(WrappedPromise(this), TimeDelta()); } else { Execute(); } @@ -670,14 +675,66 @@ void AbstractPromise::AdjacencyList::Clear() { prerequisite_list_.clear(); } else { // If there's multiple prerequisites we can't do that because the - // DependentList::Nodes may still be in use by some of them. Instead we - // release our prerequisite references and rely on refcounting to release - // the owning AbstractPromise. + // DependentList::Nodes may still be in use by some of them. + // Instead we release our prerequisite references and rely on refcounting to + // release the owning AbstractPromise. for (DependentList::Node& node : prerequisite_list_) { node.ClearPrerequisite(); } } } +BasePromise::BasePromise() = default; + +BasePromise::BasePromise( + scoped_refptr<internal::AbstractPromise> abstract_promise) + : abstract_promise_(std::move(abstract_promise)) {} + +BasePromise::BasePromise(const BasePromise& other) = default; +BasePromise::BasePromise(BasePromise&& other) = default; + +BasePromise& BasePromise::operator=(const BasePromise& other) = default; +BasePromise& BasePromise::operator=(BasePromise&& other) = default; + +BasePromise::~BasePromise() = default; + } // namespace internal + +WrappedPromise::WrappedPromise() = default; + +WrappedPromise::WrappedPromise(scoped_refptr<internal::AbstractPromise> promise) + : promise_(std::move(promise)) {} + +WrappedPromise::WrappedPromise(internal::PassedPromise&& passed_promise) + : promise_(passed_promise.Release(), subtle::kAdoptRefTag) { + DCHECK(promise_); +} + +WrappedPromise::WrappedPromise(const Location& from_here, OnceClosure task) + : WrappedPromise(internal::AbstractPromise::CreateNoPrerequisitePromise( + from_here, + RejectPolicy::kMustCatchRejection, + internal::DependentList::ConstructUnresolved(), + internal::PromiseExecutor::Data( + base::in_place_type_t<internal::PostTaskExecutor<void>>(), + std::move(task)))) {} + +WrappedPromise::WrappedPromise(const WrappedPromise& other) = default; +WrappedPromise::WrappedPromise(WrappedPromise&& other) = default; + +WrappedPromise& WrappedPromise::operator=(const WrappedPromise& other) = + default; +WrappedPromise& WrappedPromise::operator=(WrappedPromise&& other) = default; + +WrappedPromise::~WrappedPromise() = default; + +void WrappedPromise::Execute() { + DCHECK(promise_); + promise_->Execute(); +} + +void WrappedPromise::Clear() { + promise_ = nullptr; +} + } // namespace base diff --git a/chromium/base/task/promise/abstract_promise.h b/chromium/base/task/promise/abstract_promise.h index 9f67c688bd4..c16132c187c 100644 --- a/chromium/base/task/promise/abstract_promise.h +++ b/chromium/base/task/promise/abstract_promise.h @@ -23,6 +23,9 @@ class TaskRunner; template <typename ResolveType, typename RejectType> class ManualPromiseResolver; +template <typename ResolveType, typename RejectType> +class Promise; + // AbstractPromise Memory Management. // // Consider a chain of promises: P1, P2 & P3 @@ -265,13 +268,71 @@ enum class RejectPolicy { kCatchNotRequired, }; +class WrappedPromise; + namespace internal { template <typename T, typename... Args> class PromiseCallbackHelper; -class PromiseHolder; +class AbstractPromise; class AbstractPromiseTest; +class BasePromise; + +// A binary size optimization to reduce the overhead of passing a scoped_refptr +// to Promise<> returned by PostTask. There are many thousands of PostTasks so +// even a single extra instruction (such as the scoped_refptr move constructor +// clearing the pointer) adds up. This is why we're not constructing a Promise<> +// with a scoped_refptr. +// +// The constructor calls AddRef, it's up to the owner of this object to either +// call Clear (which calls Release) or AbstractPromise in order to pass +// ownership onto a WrappedPromise. +class BASE_EXPORT PassedPromise { + public: + explicit inline PassedPromise(const scoped_refptr<AbstractPromise>& promise); + + PassedPromise() : promise_(nullptr) {} + + PassedPromise(const PassedPromise&) = delete; + PassedPromise& operator=(const PassedPromise&) = delete; + +#if DCHECK_IS_ON() + PassedPromise(PassedPromise&& other) noexcept : promise_(other.promise_) { + DCHECK(promise_); + other.promise_ = nullptr; + } + + PassedPromise& operator=(PassedPromise&& other) noexcept { + DCHECK(!promise_); + promise_ = other.promise_; + DCHECK(promise_); + other.promise_ = nullptr; + return *this; + } + + ~PassedPromise() { + DCHECK(!promise_) << "The PassedPromise must be Cleared or passed onto a " + "Wrapped Promise"; + } +#else + PassedPromise(PassedPromise&&) noexcept = default; + PassedPromise& operator=(PassedPromise&&) noexcept = default; +#endif + + AbstractPromise* Release() { + AbstractPromise* promise = promise_; +#if DCHECK_IS_ON() + promise_ = nullptr; +#endif + return promise; + } + + AbstractPromise* get() const { return promise_; } + + private: + AbstractPromise* promise_; +}; // Internal promise representation, maintains a graph of dependencies and posts // promises as they become ready. In debug builds various sanity checks are @@ -308,12 +369,11 @@ class BASE_EXPORT AbstractPromise RejectPolicy reject_policy, ConstructType tag, PromiseExecutor::Data&& executor_data) noexcept { - scoped_refptr<AbstractPromise> promise = - subtle::AdoptRefIfNeeded(new internal::AbstractPromise( - nullptr, from_here, nullptr, reject_policy, - tag, std::move(executor_data)), - AbstractPromise::kRefCountPreference); - return promise; + return subtle::AdoptRefIfNeeded( + new internal::AbstractPromise(nullptr, from_here, nullptr, + reject_policy, tag, + std::move(executor_data)), + AbstractPromise::kRefCountPreference); } AbstractPromise(const AbstractPromise&) = delete; @@ -347,7 +407,9 @@ class BASE_EXPORT AbstractPromise public: PromiseValue& value() { return value_; } +#if DCHECK_IS_ON() ~ValueHandle() { value_.reset(); } +#endif private: friend class AbstractPromise; @@ -357,6 +419,8 @@ class BASE_EXPORT AbstractPromise PromiseValue& value_; }; + // Used for promise results that require move semantics. E.g. a promise chain + // involving a std::unique_ptr<>. ValueHandle TakeValue() { return ValueHandle(value_); } // Returns nullptr if there isn't a curried promise. @@ -380,6 +444,10 @@ class BASE_EXPORT AbstractPromise "Use scoped_refptr<AbstractPromise> instead"); } + // An out-of line emplace(Resolved<void>()); Useful for reducing binary + // bloat in executor templates. + void EmplaceResolvedVoid(); + // This is separate from AbstractPromise to reduce the memory footprint of // regular PostTask without promise chains. class BASE_EXPORT AdjacencyList { @@ -457,9 +525,14 @@ class BASE_EXPORT AbstractPromise void IgnoreUncaughtCatchForTesting(); - private: - friend class AbstractPromiseTest; + // Signals that this promise was cancelled. If executor hasn't run yet, this + // will prevent it from running and cancels any dependent promises unless they + // have PrerequisitePolicy::kAny, in which case they will only be canceled if + // all of their prerequisites are canceled. If OnCanceled() or OnResolved() or + // OnRejected() has already run, this does nothing. + void OnCanceled(); + private: friend base::RefCountedThreadSafe<AbstractPromise>; friend class AbstractPromiseTest; @@ -470,8 +543,6 @@ class BASE_EXPORT AbstractPromise template <typename T, typename... Args> friend class PromiseCallbackHelper; - friend class PromiseHolder; - template <typename ConstructType> AbstractPromise(const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, @@ -520,13 +591,6 @@ class BASE_EXPORT AbstractPromise // have been canceled, in which case null is returned. AbstractPromise* FindCurriedAncestor(); - // Signals that this promise was cancelled. If executor hasn't run yet, this - // will prevent it from running and cancels any dependent promises unless they - // have PrerequisitePolicy::kAny, in which case they will only be canceled if - // all of their prerequisites are canceled. If OnCanceled() or OnResolved() or - // OnRejected() has already run, this does nothing. - void OnCanceled(); - // Signals that |value_| now contains a resolve value. Dependent promises may // scheduled for execution. void OnResolved(); @@ -714,7 +778,115 @@ class BASE_EXPORT AbstractPromise std::unique_ptr<AdjacencyList> prerequisites_; }; +PassedPromise::PassedPromise(const scoped_refptr<AbstractPromise>& promise) + : promise_(promise.get()) { + promise_->AddRef(); +} + +// Non-templatized base class of the Promise<> template. This is a binary size +// optimization, letting us use an out of line destructor in the template +// instead of the more complex scoped_refptr<> destructor. +class BASE_EXPORT BasePromise { + public: + BasePromise(); + + BasePromise(const BasePromise& other); + BasePromise(BasePromise&& other) noexcept; + + BasePromise& operator=(const BasePromise& other); + BasePromise& operator=(BasePromise&& other) noexcept; + + // We want an out of line destructor to reduce binary size. + ~BasePromise(); + + // Returns true if the promise is not null. + operator bool() const { return abstract_promise_.get(); } + + protected: + struct InlineConstructor {}; + + explicit BasePromise( + scoped_refptr<internal::AbstractPromise> abstract_promise); + + // We want this to be inlined to reduce binary size for the Promise<> + // constructor. Its a template to bypass ChromiumStyle plugin which otherwise + // insists this is out of line. + template <typename T> + explicit BasePromise(internal::PassedPromise&& passed_promise, + T InlineConstructor) + : abstract_promise_(passed_promise.Release(), subtle::kAdoptRefTag) {} + + scoped_refptr<internal::AbstractPromise> abstract_promise_; +}; + } // namespace internal + +// Wrapper around scoped_refptr<base::internal::AbstractPromise> which is +// intended for use by TaskRunner implementations. +class BASE_EXPORT WrappedPromise { + public: + WrappedPromise(); + + explicit WrappedPromise(scoped_refptr<internal::AbstractPromise> promise); + + WrappedPromise(const WrappedPromise& other); + WrappedPromise(WrappedPromise&& other) noexcept; + + WrappedPromise& operator=(const WrappedPromise& other); + WrappedPromise& operator=(WrappedPromise&& other) noexcept; + + explicit WrappedPromise(internal::PassedPromise&& passed_promise); + + // Constructs a promise to run |task|. + WrappedPromise(const Location& from_here, OnceClosure task); + + // If the WrappedPromise hasn't been executed, cleared or taken by + // TakeForTesting, it will be canceled to prevent memory leaks of dependent + // tasks that will never run. + ~WrappedPromise(); + + // Returns true if the promise is not null. + operator bool() const { return promise_.get(); } + + bool IsCanceled() const { + DCHECK(promise_); + return promise_->IsCanceled(); + } + + void OnCanceled() { + DCHECK(promise_); + promise_->OnCanceled(); + } + + // Can only be called once, clears |promise_| after execution. + void Execute(); + + // Clears |promise_|. + void Clear(); + + const Location& from_here() const { + DCHECK(promise_); + return promise_->from_here(); + } + + scoped_refptr<internal::AbstractPromise>& GetForTesting() { return promise_; } + + scoped_refptr<internal::AbstractPromise> TakeForTesting() { + return std::move(promise_); + } + + private: + template <typename ResolveType, typename RejectType> + friend class Promise; + + template <typename T, typename... Args> + friend class internal::PromiseCallbackHelper; + + friend class Promises; + + scoped_refptr<internal::AbstractPromise> promise_; +}; + } // namespace base #endif // BASE_TASK_PROMISE_ABSTRACT_PROMISE_H_ diff --git a/chromium/base/task/promise/abstract_promise_unittest.cc b/chromium/base/task/promise/abstract_promise_unittest.cc index ea8d31fd898..5cbe4498d54 100644 --- a/chromium/base/task/promise/abstract_promise_unittest.cc +++ b/chromium/base/task/promise/abstract_promise_unittest.cc @@ -238,10 +238,13 @@ class AbstractPromiseTest : public testing::Test { #endif std::move(settings.callback)); - return AbstractPromise::Create( - settings.task_runner, settings.from_here, - std::move(settings.prerequisites), settings.reject_policy, - DependentList::ConstructUnresolved(), std::move(executor_data)); + return WrappedPromise(AbstractPromise::Create( + settings.task_runner, settings.from_here, + std::move(settings.prerequisites), + settings.reject_policy, + DependentList::ConstructUnresolved(), + std::move(executor_data))) + .TakeForTesting(); } PromiseSettings settings; @@ -293,7 +296,7 @@ class AbstractPromiseTest : public testing::Test { PromiseSettingsBuilder AllPromise( Location from_here, - std::vector<internal::DependentList::Node> prerequisite_list) { + std::vector<DependentList::Node> prerequisite_list) { PromiseSettingsBuilder builder( from_here, std::make_unique<AbstractPromise::AdjacencyList>( std::move(prerequisite_list))); diff --git a/chromium/base/task/promise/dependent_list.h b/chromium/base/task/promise/dependent_list.h index 020bdbfc77f..3245c1cb48f 100644 --- a/chromium/base/task/promise/dependent_list.h +++ b/chromium/base/task/promise/dependent_list.h @@ -59,7 +59,7 @@ class BASE_EXPORT DependentList { // Align Node on an 8-byte boundary to ensure the first 3 bits are 0 and can // be used to store additional state (see static_asserts below). - class BASE_EXPORT alignas(8) Node { + class BASE_EXPORT ALIGNAS(8) Node { public: Node(); explicit Node(Node&& other) noexcept; diff --git a/chromium/base/task/promise/finally_executor.h b/chromium/base/task/promise/finally_executor.h index 7dc1c798a5c..a80f81c5ee9 100644 --- a/chromium/base/task/promise/finally_executor.h +++ b/chromium/base/task/promise/finally_executor.h @@ -43,9 +43,15 @@ class FinallyExecutor { void Execute(AbstractPromise* promise) { AbstractPromise* prerequisite = promise->GetOnlyPrerequisite(); - CallbackT* resolve_executor = static_cast<CallbackT*>(&common_.callback_); - RunHelper<CallbackT, void, ResolveStorage, RejectStorage>::Run( - std::move(*resolve_executor), prerequisite, promise); + // Internally RunHelper uses const RepeatingCallback<>& to avoid the + // binary size overhead of moving a scoped_refptr<> about. We respect + // the onceness of the callback and RunHelper will overwrite the callback + // with the result. + using RepeatingCB = typename ToRepeatingCallback<CallbackT>::value; + RepeatingCB* resolve_executor = + static_cast<RepeatingCB*>(&common_.callback_); + RunHelper<RepeatingCB, void, ResolveStorage, RejectStorage>::Run( + *resolve_executor, prerequisite, promise); } #if DCHECK_IS_ON() diff --git a/chromium/base/task/promise/helpers.cc b/chromium/base/task/promise/helpers.cc index 8a68a123956..f100cac0c29 100644 --- a/chromium/base/task/promise/helpers.cc +++ b/chromium/base/task/promise/helpers.cc @@ -11,32 +11,11 @@ namespace base { namespace internal { -PromiseHolder::PromiseHolder(scoped_refptr<AbstractPromise> promise) - : promise_(std::move(promise)) {} - -PromiseHolder::~PromiseHolder() { - // Detect if the promise was not executed and if so cancel to ensure memory - // is released. - if (promise_) - promise_->OnCanceled(); -} - -PromiseHolder::PromiseHolder(PromiseHolder&& other) - : promise_(std::move(other.promise_)) {} - -scoped_refptr<AbstractPromise> PromiseHolder::Unwrap() const { - return std::move(promise_); -} - -scoped_refptr<TaskRunner> GetCurrentSequence() { - return SequencedTaskRunnerHandle::Get(); -} - DoNothing ToCallbackBase(DoNothing task) { return task; } -scoped_refptr<AbstractPromise> ConstructAbstractPromiseWithSinglePrerequisite( +PassedPromise ConstructAbstractPromiseWithSinglePrerequisite( const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, AbstractPromise* prerequisite, @@ -46,26 +25,35 @@ scoped_refptr<AbstractPromise> ConstructAbstractPromiseWithSinglePrerequisite( if (!prerequisite) { // Ensure the destructor for |executor_data| runs. PromiseExecutor dummy_executor(std::move(executor_data)); - return nullptr; + return PassedPromise(); } - return AbstractPromise::Create( + return PassedPromise(AbstractPromise::Create( task_runner, from_here, std::make_unique<AbstractPromise::AdjacencyList>(prerequisite), RejectPolicy::kMustCatchRejection, - internal::DependentList::ConstructUnresolved(), std::move(executor_data)); + internal::DependentList::ConstructUnresolved(), + std::move(executor_data))); } -scoped_refptr<AbstractPromise> ConstructManualPromiseResolverPromise( +PassedPromise ConstructHereAbstractPromiseWithSinglePrerequisite( const Location& from_here, - RejectPolicy reject_policy, - bool can_resolve, - bool can_reject) { - return AbstractPromise::CreateNoPrerequisitePromise( + AbstractPromise* prerequisite, + internal::PromiseExecutor::Data&& executor_data) noexcept { + return ConstructAbstractPromiseWithSinglePrerequisite( + SequencedTaskRunnerHandle::Get(), from_here, prerequisite, + std::move(executor_data)); +} + +PassedPromise ConstructManualPromiseResolverPromise(const Location& from_here, + RejectPolicy reject_policy, + bool can_resolve, + bool can_reject) { + return PassedPromise(AbstractPromise::CreateNoPrerequisitePromise( from_here, reject_policy, internal::DependentList::ConstructUnresolved(), internal::PromiseExecutor::Data( in_place_type_t<internal::NoOpPromiseExecutor>(), can_resolve, - can_reject)); + can_reject))); } } // namespace internal diff --git a/chromium/base/task/promise/helpers.h b/chromium/base/task/promise/helpers.h index 29cb49011ab..87d9ff64bec 100644 --- a/chromium/base/task/promise/helpers.h +++ b/chromium/base/task/promise/helpers.h @@ -19,11 +19,6 @@ class DoNothing; namespace internal { -// A wrapper around SequencedTaskRunnerHandle::Get(). This file is included by -// base/task_runner.h which means we can't include anything that depends on -// that! -scoped_refptr<TaskRunner> BASE_EXPORT GetCurrentSequence(); - template <typename T> using ToNonVoidT = std::conditional_t<std::is_void<T>::value, Void, T>; @@ -412,9 +407,26 @@ class ArgMoveSemanticsHelper { } }; +// Helper for converting a callback to its repeating variant. +template <typename Cb> +struct ToRepeatingCallback; + +template <typename Cb> +struct ToRepeatingCallback<OnceCallback<Cb>> { + using value = RepeatingCallback<Cb>; +}; + +template <typename Cb> +struct ToRepeatingCallback<RepeatingCallback<Cb>> { + using value = RepeatingCallback<Cb>; +}; + // Helper for running a promise callback and storing the result if any. // -// Callback = signature of the callback to execute, +// Callback = signature of the callback to execute. Note we use repeating +// callbacks to avoid the binary size overhead of a once callback which will +// generate a destructor which is redundant because we overwrite the executor +// with the promise result which also triggers the destructor. // ArgStorageType = type of the callback parameter (or void if none) // ResolveStorage = type to use for resolve, usually Resolved<T>. // RejectStorage = type to use for reject, usually Rejected<T>. @@ -431,18 +443,18 @@ template <typename CbResult, typename ArgStorageType, typename ResolveStorage, typename RejectStorage> -struct RunHelper<OnceCallback<CbResult(CbArg)>, +struct RunHelper<RepeatingCallback<CbResult(CbArg)>, ArgStorageType, ResolveStorage, RejectStorage> { - using Callback = OnceCallback<CbResult(CbArg)>; + using Callback = RepeatingCallback<CbResult(CbArg)>; - static void Run(Callback&& executor, + static void Run(const Callback& executor, AbstractPromise* arg, AbstractPromise* result) { EmplaceHelper<ResolveStorage, RejectStorage>::Emplace( - result, std::move(executor).Run( - ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg))); + result, + executor.Run(ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg))); } }; @@ -451,19 +463,18 @@ template <typename CbArg, typename ArgStorageType, typename ResolveStorage, typename RejectStorage> -struct RunHelper<OnceCallback<void(CbArg)>, +struct RunHelper<RepeatingCallback<void(CbArg)>, ArgStorageType, ResolveStorage, RejectStorage> { - using Callback = OnceCallback<void(CbArg)>; + using Callback = RepeatingCallback<void(CbArg)>; - static void Run(Callback&& executor, + static void Run(const Callback& executor, AbstractPromise* arg, AbstractPromise* result) { static_assert(std::is_void<typename ResolveStorage::Type>::value, ""); - std::move(executor).Run( - ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg)); - result->emplace(Resolved<void>()); + executor.Run(ArgMoveSemanticsHelper<CbArg, ArgStorageType>::Get(arg)); + result->EmplaceResolvedVoid(); } }; @@ -472,17 +483,17 @@ template <typename CbResult, typename ArgStorageType, typename ResolveStorage, typename RejectStorage> -struct RunHelper<OnceCallback<CbResult()>, +struct RunHelper<RepeatingCallback<CbResult()>, ArgStorageType, ResolveStorage, RejectStorage> { - using Callback = OnceCallback<CbResult()>; + using Callback = RepeatingCallback<CbResult()>; - static void Run(Callback&& executor, + static void Run(const Callback& executor, AbstractPromise* arg, AbstractPromise* result) { - EmplaceHelper<ResolveStorage, RejectStorage>::Emplace( - result, std::move(executor).Run()); + EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(result, + executor.Run()); } }; @@ -490,16 +501,16 @@ struct RunHelper<OnceCallback<CbResult()>, template <typename ArgStorageType, typename ResolveStorage, typename RejectStorage> -struct RunHelper<OnceCallback<void()>, +struct RunHelper<RepeatingCallback<void()>, ArgStorageType, ResolveStorage, RejectStorage> { - static void Run(OnceCallback<void()>&& executor, + static void Run(const RepeatingCallback<void()>& executor, AbstractPromise* arg, AbstractPromise* result) { static_assert(std::is_void<typename ResolveStorage::Type>::value, ""); - std::move(executor).Run(); - result->emplace(Resolved<void>()); + executor.Run(); + result->EmplaceResolvedVoid(); } }; @@ -538,79 +549,51 @@ template <typename CbResult, typename... CbArgs, typename ResolveStorage, typename RejectStorage> -struct RunHelper<OnceCallback<CbResult(CbArgs...)>, +struct RunHelper<RepeatingCallback<CbResult(CbArgs...)>, Resolved<std::tuple<CbArgs...>>, ResolveStorage, RejectStorage> { - using Callback = OnceCallback<CbResult(CbArgs...)>; + using Callback = RepeatingCallback<CbResult(CbArgs...)>; using StorageType = Resolved<std::tuple<CbArgs...>>; using IndexSequence = std::index_sequence_for<CbArgs...>; - static void Run(Callback&& executor, + static void Run(const Callback& executor, AbstractPromise* arg, AbstractPromise* result) { AbstractPromise::ValueHandle value = arg->TakeValue(); std::tuple<CbArgs...>& tuple = value.value().Get<StorageType>()->value; - RunInternal(std::move(executor), tuple, result, + RunInternal(executor, tuple, result, std::integral_constant<bool, std::is_void<CbResult>::value>(), IndexSequence{}); } private: template <typename Callback, size_t... Indices> - static void RunInternal(Callback&& executor, + static void RunInternal(const Callback& executor, std::tuple<CbArgs...>& tuple, AbstractPromise* result, std::false_type void_result, std::index_sequence<Indices...>) { - EmplaceHelper<ResolveStorage, RejectStorage>::Emplace( - std::move(executor).Run( - TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>, - Indices>::Get(tuple)...)); + EmplaceHelper<ResolveStorage, RejectStorage>::Emplace(executor.Run( + TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>, + Indices>::Get(tuple)...)); } template <typename Callback, size_t... Indices> - static void RunInternal(Callback&& executor, + static void RunInternal(const Callback& executor, std::tuple<CbArgs...>& tuple, AbstractPromise* result, std::true_type void_result, std::index_sequence<Indices...>) { - std::move(executor).Run( - TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>, - Indices>::Get(tuple)...); - result->emplace(Resolved<void>()); + executor.Run(TupleArgMoveSemanticsHelper<Callback, std::tuple<CbArgs...>, + Indices>::Get(tuple)...); + result->EmplaceResolvedVoid(); } }; -// For use with base::Bind*. Cancels the promise if the callback was not run by -// the time the callback is deleted. -class BASE_EXPORT PromiseHolder { - public: - explicit PromiseHolder(scoped_refptr<internal::AbstractPromise> promise); - - ~PromiseHolder(); - - PromiseHolder(PromiseHolder&& other); - - scoped_refptr<internal::AbstractPromise> Unwrap() const; - - private: - mutable scoped_refptr<internal::AbstractPromise> promise_; -}; - -} // namespace internal - -template <> -struct BindUnwrapTraits<internal::PromiseHolder> { - static scoped_refptr<internal::AbstractPromise> Unwrap( - const internal::PromiseHolder& o) { - return o.Unwrap(); - } -}; - -namespace internal { - -// Used by ManualPromiseResolver<> to generate callbacks. +// Used by ManualPromiseResolver<> to generate callbacks. Note the use of +// WrappedPromise, this is necessary because we want to cancel the promise (to +// release memory) if the callback gets deleted without having being run. template <typename T, typename... Args> class PromiseCallbackHelper { public: @@ -624,7 +607,7 @@ class PromiseCallbackHelper { std::forward<Args>(args)...); promise->OnResolved(); }, - PromiseHolder(promise)); + promise); } static RepeatingCallback GetRepeatingResolveCallback( @@ -635,7 +618,7 @@ class PromiseCallbackHelper { std::forward<Args>(args)...); promise->OnResolved(); }, - PromiseHolder(promise)); + promise); } static Callback GetRejectCallback(scoped_refptr<AbstractPromise>& promise) { @@ -645,7 +628,7 @@ class PromiseCallbackHelper { std::forward<Args>(args)...); promise->OnRejected(); }, - PromiseHolder(promise)); + promise); } static RepeatingCallback GetRepeatingRejectCallback( @@ -656,7 +639,7 @@ class PromiseCallbackHelper { std::forward<Args>(args)...); promise->OnRejected(); }, - PromiseHolder(promise)); + promise); } }; @@ -679,8 +662,7 @@ struct IsValidPromiseArg<PromiseType&, CallbackArgType> { // rejection storage type. template <typename RejectT> struct AllPromiseRejectHelper { - static void Reject(AbstractPromise* result, - const scoped_refptr<AbstractPromise>& prerequisite) { + static void Reject(AbstractPromise* result, AbstractPromise* prerequisite) { result->emplace(scoped_refptr<AbstractPromise>(prerequisite)); } }; @@ -709,14 +691,20 @@ CallbackBase&& ToCallbackBase(const CallbackT&& task) { // Helps reduce template bloat by moving AbstractPromise construction out of // line. -scoped_refptr<AbstractPromise> BASE_EXPORT -ConstructAbstractPromiseWithSinglePrerequisite( +PassedPromise BASE_EXPORT ConstructAbstractPromiseWithSinglePrerequisite( const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, AbstractPromise* prerequsite, - internal::PromiseExecutor::Data&& executor_data) noexcept; + PromiseExecutor::Data&& executor_data) noexcept; + +// Like ConstructAbstractPromiseWithSinglePrerequisite except tasks are posted +// onto SequencedTaskRunnerHandle::Get(). +PassedPromise BASE_EXPORT ConstructHereAbstractPromiseWithSinglePrerequisite( + const Location& from_here, + AbstractPromise* prerequsite, + PromiseExecutor::Data&& executor_data) noexcept; -scoped_refptr<AbstractPromise> BASE_EXPORT +PassedPromise BASE_EXPORT ConstructManualPromiseResolverPromise(const Location& from_here, RejectPolicy reject_policy, bool can_resolve, diff --git a/chromium/base/task/promise/helpers_unittest.cc b/chromium/base/task/promise/helpers_unittest.cc index afdc9e7079e..f13fb5c8c7a 100644 --- a/chromium/base/task/promise/helpers_unittest.cc +++ b/chromium/base/task/promise/helpers_unittest.cc @@ -187,8 +187,9 @@ TEST(EmplaceHelper, EmplacePromiseResult) { TEST(EmplaceHelper, EmplacePromise) { scoped_refptr<AbstractPromise> promise = DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true); - scoped_refptr<AbstractPromise> curried = DoNothingPromiseBuilder(FROM_HERE); + PassedPromise curried = NoOpPromiseExecutor::Create( + FROM_HERE, false, false, RejectPolicy::kCatchNotRequired); EmplaceHelper<Resolved<int>, Rejected<NoReject>>::Emplace( promise.get(), Promise<int>(std::move(curried))); @@ -243,8 +244,8 @@ TEST(RunHelper, CallbackVoidArgumentIntResult) { scoped_refptr<AbstractPromise> result = DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true); - RunHelper<OnceCallback<int()>, Resolved<void>, Resolved<int>, - Rejected<std::string>>::Run(BindOnce([]() { return 123; }), + RunHelper<RepeatingCallback<int()>, Resolved<void>, Resolved<int>, + Rejected<std::string>>::Run(BindRepeating([]() { return 123; }), arg.get(), result.get()); EXPECT_EQ(result->value().template Get<Resolved<int>>()->value, 123); @@ -255,8 +256,8 @@ TEST(RunHelper, CallbackVoidArgumentVoidResult) { scoped_refptr<AbstractPromise> result = DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true); - RunHelper<OnceCallback<void()>, Resolved<void>, Resolved<void>, - Rejected<std::string>>::Run(BindOnce([]() {}), arg.get(), + RunHelper<RepeatingCallback<void()>, Resolved<void>, Resolved<void>, + Rejected<std::string>>::Run(BindRepeating([]() {}), arg.get(), result.get()); EXPECT_TRUE(result->value().ContainsResolved()); @@ -268,8 +269,8 @@ TEST(RunHelper, CallbackIntArgumentIntResult) { DoNothingPromiseBuilder(FROM_HERE).SetCanResolve(true); arg->emplace(Resolved<int>(123)); - RunHelper<OnceCallback<int(int)>, Resolved<int>, Resolved<int>, - Rejected<std::string>>::Run(BindOnce([](int value) { + RunHelper<RepeatingCallback<int(int)>, Resolved<int>, Resolved<int>, + Rejected<std::string>>::Run(BindRepeating([](int value) { return value + 1; }), arg.get(), result.get()); @@ -284,7 +285,7 @@ TEST(RunHelper, CallbackIntArgumentArgumentVoidResult) { arg->emplace(Resolved<int>(123)); int value; - RunHelper<OnceCallback<void(int)>, Resolved<int>, Resolved<void>, + RunHelper<RepeatingCallback<void(int)>, Resolved<int>, Resolved<void>, Rejected<std::string>>::Run(BindLambdaForTesting([&](int arg) { value = arg; }), diff --git a/chromium/base/task/promise/no_op_promise_executor.cc b/chromium/base/task/promise/no_op_promise_executor.cc index 74a5bcc57c8..e168ed91007 100644 --- a/chromium/base/task/promise/no_op_promise_executor.cc +++ b/chromium/base/task/promise/no_op_promise_executor.cc @@ -45,15 +45,14 @@ bool NoOpPromiseExecutor::CanReject() const { void NoOpPromiseExecutor::Execute(AbstractPromise* promise) {} // static -scoped_refptr<internal::AbstractPromise> NoOpPromiseExecutor::Create( - Location from_here, - bool can_resolve, - bool can_reject, - RejectPolicy reject_policy) { - return AbstractPromise::CreateNoPrerequisitePromise( +PassedPromise NoOpPromiseExecutor::Create(Location from_here, + bool can_resolve, + bool can_reject, + RejectPolicy reject_policy) { + return PassedPromise(AbstractPromise::CreateNoPrerequisitePromise( from_here, reject_policy, DependentList::ConstructUnresolved(), PromiseExecutor::Data(in_place_type_t<NoOpPromiseExecutor>(), can_resolve, - can_reject)); + can_reject))); } } // namespace internal diff --git a/chromium/base/task/promise/no_op_promise_executor.h b/chromium/base/task/promise/no_op_promise_executor.h index 005ee8c0130..13f8ef0891b 100644 --- a/chromium/base/task/promise/no_op_promise_executor.h +++ b/chromium/base/task/promise/no_op_promise_executor.h @@ -21,10 +21,10 @@ class BASE_EXPORT NoOpPromiseExecutor { static constexpr PromiseExecutor::PrerequisitePolicy kPrerequisitePolicy = PromiseExecutor::PrerequisitePolicy::kNever; - static scoped_refptr<AbstractPromise> Create(Location from_here, - bool can_resolve, - bool can_reject, - RejectPolicy reject_policy); + static PassedPromise Create(Location from_here, + bool can_resolve, + bool can_reject, + RejectPolicy reject_policy); PromiseExecutor::PrerequisitePolicy GetPrerequisitePolicy() const; bool IsCancelled() const; diff --git a/chromium/base/task/promise/post_task_executor.h b/chromium/base/task/promise/post_task_executor.h index c018113de63..e3882ba1712 100644 --- a/chromium/base/task/promise/post_task_executor.h +++ b/chromium/base/task/promise/post_task_executor.h @@ -52,10 +52,14 @@ class PostTaskExecutor { static_assert(sizeof(CallbackBase) == sizeof(OnceCallback<ReturnType()>), "We assume it's possible to cast from CallbackBase to " "OnceCallback<ReturnType()>"); - OnceCallback<ReturnType()>* task = - static_cast<OnceCallback<ReturnType()>*>(&task_); - internal::RunHelper<OnceCallback<ReturnType()>, void, ResolveStorage, - RejectStorage>::Run(std::move(*task), nullptr, promise); + // Internally RunHelper uses const RepeatingCallback<>& to avoid the + // binary size overhead of moving a scoped_refptr<> about. We respect + // the onceness of the callback and RunHelper will overwrite the callback + // with the result. + RepeatingCallback<ReturnType()>* task = + static_cast<RepeatingCallback<ReturnType()>*>(&task_); + internal::RunHelper<RepeatingCallback<ReturnType()>, void, ResolveStorage, + RejectStorage>::Run(*task, nullptr, promise); } private: diff --git a/chromium/base/task/promise/post_task_executor_unittest.cc b/chromium/base/task/promise/post_task_executor_unittest.cc index 8e86aaefad4..1d3fc57e4d0 100644 --- a/chromium/base/task/promise/post_task_executor_unittest.cc +++ b/chromium/base/task/promise/post_task_executor_unittest.cc @@ -16,9 +16,8 @@ namespace internal { class PostTaskExecutorTest : public testing::Test { public: template <typename CallbackT> - scoped_refptr<internal::AbstractPromise> CreatePostTaskPromise( - const Location& from_here, - CallbackT&& task) { + WrappedPromise CreatePostTaskPromise(const Location& from_here, + CallbackT&& task) { // Extract properties from |task| callback. using CallbackTraits = CallbackTraits<std::decay_t<CallbackT>>; @@ -27,20 +26,20 @@ class PostTaskExecutorTest : public testing::Test { internal::PostTaskExecutor<typename CallbackTraits::ReturnType>>(), internal::ToCallbackBase(std::move(task))); - return AbstractPromise::CreateNoPrerequisitePromise( + return WrappedPromise(AbstractPromise::CreateNoPrerequisitePromise( from_here, RejectPolicy::kMustCatchRejection, internal::DependentList::ConstructUnresolved(), - std::move(executor_data)); + std::move(executor_data))); } }; TEST_F(PostTaskExecutorTest, OnceClosure) { bool run = false; - scoped_refptr<AbstractPromise> p = CreatePostTaskPromise( + WrappedPromise p = CreatePostTaskPromise( FROM_HERE, BindOnce([](bool* run) { *run = true; }, &run)); - p->Execute(); + p.Execute(); EXPECT_TRUE(run); } @@ -48,20 +47,19 @@ TEST_F(PostTaskExecutorTest, OnceClosure) { TEST_F(PostTaskExecutorTest, RepeatingClosure) { bool run = false; - scoped_refptr<AbstractPromise> p = CreatePostTaskPromise( + WrappedPromise p = CreatePostTaskPromise( FROM_HERE, BindRepeating([](bool* run) { *run = true; }, &run)); - p->Execute(); + p.Execute(); EXPECT_TRUE(run); } TEST_F(PostTaskExecutorTest, DoNothing) { // Check it compiles and the executor doesn't crash when run. - scoped_refptr<AbstractPromise> p = - CreatePostTaskPromise(FROM_HERE, DoNothing()); + WrappedPromise p = CreatePostTaskPromise(FROM_HERE, DoNothing()); - p->Execute(); + p.Execute(); } } // namespace internal diff --git a/chromium/base/task/promise/promise.h b/chromium/base/task/promise/promise.h index 4f2275ea183..467f74cb8e4 100644 --- a/chromium/base/task/promise/promise.h +++ b/chromium/base/task/promise/promise.h @@ -30,12 +30,12 @@ BASE_EXPORT scoped_refptr<TaskRunner> CreateTaskRunner( // callback will be posted immediately, otherwise it has to wait. // // Promise<> is copyable, moveable and thread safe. Under the hood -// internal::AbstractPromise is refcounted so retaining multiple Promises<> will +// AbstractPromise is refcounted so retaining multiple Promises<> will // prevent that part of the promise graph from being released. template <typename ResolveType, typename RejectType = NoReject> -class Promise { +class Promise : public internal::BasePromise { public: - Promise() : abstract_promise_(nullptr) {} + Promise() = default; static_assert( !std::is_reference<ResolveType>::value || @@ -49,16 +49,17 @@ class Promise { explicit Promise( scoped_refptr<internal::AbstractPromise> abstract_promise) noexcept - : abstract_promise_(std::move(abstract_promise)) {} + : BasePromise(std::move(abstract_promise)) {} - ~Promise() = default; + // Every PostTask calls this constructor so we need to be careful to avoid + // unnecessary binary bloat. + explicit Promise(internal::PassedPromise passed_promise) noexcept + : BasePromise(std::move(passed_promise), + BasePromise::InlineConstructor()) {} - operator bool() const { return !!abstract_promise_; } + ~Promise() = default; - bool IsCancelledForTesting() const { - DCHECK(abstract_promise_); - return abstract_promise_->IsCanceled(); - } + bool IsCancelledForTesting() const { return abstract_promise_->IsCanceled(); } // Waits until the promise has settled and if resolved it returns the resolved // value. @@ -75,8 +76,10 @@ class Promise { } DCHECK(abstract_promise_->IsResolved()) << "Can't take resolved value, promise wasn't resolved."; - return std::move( - abstract_promise_->TakeValue().value().Get<Resolved<T>>()->value); + return std::move(abstract_promise_->TakeValue() + .value() + .template Get<Resolved<T>>() + ->value); } // Waits until the promise has settled and if rejected it returns the rejected @@ -95,8 +98,10 @@ class Promise { abstract_promise_->IgnoreUncaughtCatchForTesting(); DCHECK(abstract_promise_->IsRejected()) << "Can't take rejected value, promise wasn't rejected."; - return std::move( - abstract_promise_->TakeValue().value().Get<Rejected<T>>()->value); + return std::move(abstract_promise_->TakeValue() + .value() + .template Get<Rejected<T>>() + ->value); } bool IsResolvedForTesting() const { @@ -122,22 +127,22 @@ class Promise { // // |task_runner| is const-ref to avoid bloat due the destructor (which posts a // task). - template <typename RejectCb> + template <typename CatchCb> auto CatchOn(const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, - RejectCb on_reject) noexcept { + CatchCb on_reject) noexcept { DCHECK(!on_reject.is_null()); // Extract properties from the |on_reject| callback. - using RejectCallbackTraits = internal::CallbackTraits<RejectCb>; - using RejectCallbackArgT = typename RejectCallbackTraits::ArgType; + using CatchCallbackTraits = internal::CallbackTraits<CatchCb>; + using CatchCallbackArgT = typename CatchCallbackTraits::ArgType; // Compute the resolve and reject types of the returned Promise. using ReturnedPromiseTraits = internal::PromiseCombiner<ResolveType, NoReject, // We've caught the reject case. - typename RejectCallbackTraits::ResolveType, - typename RejectCallbackTraits::RejectType>; + typename CatchCallbackTraits::ResolveType, + typename CatchCallbackTraits::RejectType>; using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; @@ -148,13 +153,13 @@ class Promise { static_assert(ReturnedPromiseTraits::valid, "Ambiguous promise resolve type"); static_assert( - internal::IsValidPromiseArg<RejectType, RejectCallbackArgT>::value || - std::is_void<RejectCallbackArgT>::value, + internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value || + std::is_void<CatchCallbackArgT>::value, "|on_reject| callback must accept Promise::RejectType or void."); static_assert( - !std::is_reference<RejectCallbackArgT>::value || - std::is_const<std::remove_reference_t<RejectCallbackArgT>>::value, + !std::is_reference<CatchCallbackArgT>::value || + std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value, "Google C++ Style: References in function parameters must be const."); return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( @@ -163,7 +168,7 @@ class Promise { internal::PromiseExecutor::Data( in_place_type_t<internal::ThenAndCatchExecutor< OnceClosure, // Never called. - OnceCallback<typename RejectCallbackTraits::SignatureType>, + OnceCallback<typename CatchCallbackTraits::SignatureType>, internal::NoCallback, RejectType, Resolved<ReturnedPromiseResolveT>, Rejected<ReturnedPromiseRejectT>>>(), @@ -171,18 +176,59 @@ class Promise { internal::ToCallbackBase(std::move(on_reject))))); } - template <typename RejectCb> + template <typename CatchCb> auto CatchOn(const TaskTraits& traits, const Location& from_here, - RejectCb&& on_reject) noexcept { + CatchCb&& on_reject) noexcept { return CatchOn(CreateTaskRunner(traits), from_here, - std::forward<RejectCb>(on_reject)); + std::forward<CatchCb>(on_reject)); } - template <typename RejectCb> - auto CatchHere(const Location& from_here, RejectCb&& on_reject) noexcept { - return CatchOn(internal::GetCurrentSequence(), from_here, - std::forward<RejectCb>(on_reject)); + template <typename CatchCb> + auto CatchHere(const Location& from_here, CatchCb&& on_reject) noexcept { + DCHECK(!on_reject.is_null()); + + // Extract properties from the |on_reject| callback. + using CatchCallbackTraits = internal::CallbackTraits<CatchCb>; + using CatchCallbackArgT = typename CatchCallbackTraits::ArgType; + + // Compute the resolve and reject types of the returned Promise. + using ReturnedPromiseTraits = + internal::PromiseCombiner<ResolveType, + NoReject, // We've caught the reject case. + typename CatchCallbackTraits::ResolveType, + typename CatchCallbackTraits::RejectType>; + using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; + using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; + + static_assert(!std::is_same<NoReject, RejectType>::value, + "Can't catch a NoReject promise."); + + // Check we wouldn't need to return Promise<Variant<...>, ...> + static_assert(ReturnedPromiseTraits::valid, + "Ambiguous promise resolve type"); + static_assert( + internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value || + std::is_void<CatchCallbackArgT>::value, + "|on_reject| callback must accept Promise::RejectType or void."); + + static_assert( + !std::is_reference<CatchCallbackArgT>::value || + std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value, + "Google C++ Style: References in function parameters must be const."); + + return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( + ConstructHereAbstractPromiseWithSinglePrerequisite( + from_here, abstract_promise_.get(), + internal::PromiseExecutor::Data( + in_place_type_t<internal::ThenAndCatchExecutor< + OnceClosure, // Never called. + OnceCallback<typename CatchCallbackTraits::SignatureType>, + internal::NoCallback, RejectType, + Resolved<ReturnedPromiseResolveT>, + Rejected<ReturnedPromiseRejectT>>>(), + OnceClosure(), + internal::ToCallbackBase(std::move(on_reject))))); } // A task to execute |on_resolve| is posted on |task_runner| as soon as this @@ -198,23 +244,22 @@ class Promise { // // |task_runner| is const-ref to avoid bloat due the destructor (which posts a // task). - template <typename ResolveCb> + template <typename ThenCb> auto ThenOn(const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, - ResolveCb on_resolve) noexcept { + ThenCb on_resolve) noexcept { DCHECK(!on_resolve.is_null()); // Extract properties from the |on_resolve| callback. - using ResolveCallbackTraits = - internal::CallbackTraits<std::decay_t<ResolveCb>>; - using ResolveCallbackArgT = typename ResolveCallbackTraits::ArgType; + using ThenCallbackTraits = internal::CallbackTraits<std::decay_t<ThenCb>>; + using ThenCallbackArgT = typename ThenCallbackTraits::ArgType; // Compute the resolve and reject types of the returned Promise. using ReturnedPromiseTraits = internal::PromiseCombiner<NoResolve, // We've caught the resolve case. RejectType, - typename ResolveCallbackTraits::ResolveType, - typename ResolveCallbackTraits::RejectType>; + typename ThenCallbackTraits::ResolveType, + typename ThenCallbackTraits::RejectType>; using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; @@ -223,13 +268,13 @@ class Promise { "Ambiguous promise reject type"); static_assert( - internal::IsValidPromiseArg<ResolveType, ResolveCallbackArgT>::value || - std::is_void<ResolveCallbackArgT>::value, + internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value || + std::is_void<ThenCallbackArgT>::value, "|on_resolve| callback must accept Promise::ResolveType or void."); static_assert( - !std::is_reference<ResolveCallbackArgT>::value || - std::is_const<std::remove_reference_t<ResolveCallbackArgT>>::value, + !std::is_reference<ThenCallbackArgT>::value || + std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value, "Google C++ Style: References in function parameters must be const."); return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( @@ -237,7 +282,7 @@ class Promise { task_runner, from_here, abstract_promise_.get(), internal::PromiseExecutor::Data( in_place_type_t<internal::ThenAndCatchExecutor< - OnceCallback<typename ResolveCallbackTraits::SignatureType>, + OnceCallback<typename ThenCallbackTraits::SignatureType>, OnceClosure, ResolveType, internal::NoCallback, Resolved<ReturnedPromiseResolveT>, Rejected<ReturnedPromiseRejectT>>>(), @@ -245,18 +290,56 @@ class Promise { OnceClosure()))); } - template <typename ResolveCb> + template <typename ThenCb> auto ThenOn(const TaskTraits& traits, const Location& from_here, - ResolveCb&& on_resolve) noexcept { + ThenCb&& on_resolve) noexcept { return ThenOn(CreateTaskRunner(traits), from_here, - std::forward<ResolveCb>(on_resolve)); + std::forward<ThenCb>(on_resolve)); } - template <typename ResolveCb> - auto ThenHere(const Location& from_here, ResolveCb&& on_resolve) noexcept { - return ThenOn(internal::GetCurrentSequence(), from_here, - std::forward<ResolveCb>(on_resolve)); + template <typename ThenCb> + auto ThenHere(const Location& from_here, ThenCb&& on_resolve) noexcept { + DCHECK(!on_resolve.is_null()); + + // Extract properties from the |on_resolve| callback. + using ThenCallbackTraits = internal::CallbackTraits<std::decay_t<ThenCb>>; + using ThenCallbackArgT = typename ThenCallbackTraits::ArgType; + + // Compute the resolve and reject types of the returned Promise. + using ReturnedPromiseTraits = + internal::PromiseCombiner<NoResolve, // We've caught the resolve case. + RejectType, + typename ThenCallbackTraits::ResolveType, + typename ThenCallbackTraits::RejectType>; + using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; + using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; + + // Check we wouldn't need to return Promise<..., Variant<...>> + static_assert(ReturnedPromiseTraits::valid, + "Ambiguous promise reject type"); + + static_assert( + internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value || + std::is_void<ThenCallbackArgT>::value, + "|on_resolve| callback must accept Promise::ResolveType or void."); + + static_assert( + !std::is_reference<ThenCallbackArgT>::value || + std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value, + "Google C++ Style: References in function parameters must be const."); + + return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( + ConstructHereAbstractPromiseWithSinglePrerequisite( + from_here, abstract_promise_.get(), + internal::PromiseExecutor::Data( + in_place_type_t<internal::ThenAndCatchExecutor< + OnceCallback<typename ThenCallbackTraits::SignatureType>, + OnceClosure, ResolveType, internal::NoCallback, + Resolved<ReturnedPromiseResolveT>, + Rejected<ReturnedPromiseRejectT>>>(), + internal::ToCallbackBase(std::move(on_resolve)), + OnceClosure()))); } // A task to execute |on_reject| is posted on |task_runner| as soon as this @@ -279,26 +362,26 @@ class Promise { // // |task_runner| is const-ref to avoid bloat due the destructor (which posts a // task). - template <typename ResolveCb, typename RejectCb> + template <typename ThenCb, typename CatchCb> auto ThenOn(const scoped_refptr<TaskRunner>& task_runner, const Location& from_here, - ResolveCb on_resolve, - RejectCb on_reject) noexcept { + ThenCb on_resolve, + CatchCb on_reject) noexcept { DCHECK(!on_resolve.is_null()); DCHECK(!on_reject.is_null()); // Extract properties from the |on_resolve| and |on_reject| callbacks. - using ResolveCallbackTraits = internal::CallbackTraits<ResolveCb>; - using RejectCallbackTraits = internal::CallbackTraits<RejectCb>; - using ResolveCallbackArgT = typename ResolveCallbackTraits::ArgType; - using RejectCallbackArgT = typename RejectCallbackTraits::ArgType; + using ThenCallbackTraits = internal::CallbackTraits<ThenCb>; + using CatchCallbackTraits = internal::CallbackTraits<CatchCb>; + using ThenCallbackArgT = typename ThenCallbackTraits::ArgType; + using CatchCallbackArgT = typename CatchCallbackTraits::ArgType; // Compute the resolve and reject types of the returned Promise. using ReturnedPromiseTraits = - internal::PromiseCombiner<typename ResolveCallbackTraits::ResolveType, - typename ResolveCallbackTraits::RejectType, - typename RejectCallbackTraits::ResolveType, - typename RejectCallbackTraits::RejectType>; + internal::PromiseCombiner<typename ThenCallbackTraits::ResolveType, + typename ThenCallbackTraits::RejectType, + typename CatchCallbackTraits::ResolveType, + typename CatchCallbackTraits::RejectType>; using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; @@ -310,23 +393,23 @@ class Promise { "compatible types."); static_assert( - internal::IsValidPromiseArg<ResolveType, ResolveCallbackArgT>::value || - std::is_void<ResolveCallbackArgT>::value, + internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value || + std::is_void<ThenCallbackArgT>::value, "|on_resolve| callback must accept Promise::ResolveType or void."); static_assert( - internal::IsValidPromiseArg<RejectType, RejectCallbackArgT>::value || - std::is_void<RejectCallbackArgT>::value, + internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value || + std::is_void<CatchCallbackArgT>::value, "|on_reject| callback must accept Promise::RejectType or void."); static_assert( - !std::is_reference<ResolveCallbackArgT>::value || - std::is_const<std::remove_reference_t<ResolveCallbackArgT>>::value, + !std::is_reference<ThenCallbackArgT>::value || + std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value, "Google C++ Style: References in function parameters must be const."); static_assert( - !std::is_reference<RejectCallbackArgT>::value || - std::is_const<std::remove_reference_t<RejectCallbackArgT>>::value, + !std::is_reference<CatchCallbackArgT>::value || + std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value, "Google C++ Style: References in function parameters must be const."); return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( @@ -334,31 +417,84 @@ class Promise { task_runner, from_here, abstract_promise_.get(), internal::PromiseExecutor::Data( in_place_type_t<internal::ThenAndCatchExecutor< - OnceCallback<typename ResolveCallbackTraits::SignatureType>, - OnceCallback<typename RejectCallbackTraits::SignatureType>, + OnceCallback<typename ThenCallbackTraits::SignatureType>, + OnceCallback<typename CatchCallbackTraits::SignatureType>, ResolveType, RejectType, Resolved<ReturnedPromiseResolveT>, Rejected<ReturnedPromiseRejectT>>>(), internal::ToCallbackBase(std::move(on_resolve)), internal::ToCallbackBase(std::move(on_reject))))); } - template <typename ResolveCb, typename RejectCb> + template <typename ThenCb, typename CatchCb> auto ThenOn(const TaskTraits& traits, const Location& from_here, - ResolveCb on_resolve, - RejectCb on_reject) noexcept { + ThenCb on_resolve, + CatchCb on_reject) noexcept { return ThenOn(CreateTaskRunner(traits), from_here, - std::forward<ResolveCb>(on_resolve), - std::forward<RejectCb>(on_reject)); + std::forward<ThenCb>(on_resolve), + std::forward<CatchCb>(on_reject)); } - template <typename ResolveCb, typename RejectCb> + template <typename ThenCb, typename CatchCb> auto ThenHere(const Location& from_here, - ResolveCb on_resolve, - RejectCb on_reject) noexcept { - return ThenOn(internal::GetCurrentSequence(), from_here, - std::forward<ResolveCb>(on_resolve), - std::forward<RejectCb>(on_reject)); + ThenCb on_resolve, + CatchCb on_reject) noexcept { + DCHECK(!on_resolve.is_null()); + DCHECK(!on_reject.is_null()); + + // Extract properties from the |on_resolve| and |on_reject| callbacks. + using ThenCallbackTraits = internal::CallbackTraits<ThenCb>; + using CatchCallbackTraits = internal::CallbackTraits<CatchCb>; + using ThenCallbackArgT = typename ThenCallbackTraits::ArgType; + using CatchCallbackArgT = typename CatchCallbackTraits::ArgType; + + // Compute the resolve and reject types of the returned Promise. + using ReturnedPromiseTraits = + internal::PromiseCombiner<typename ThenCallbackTraits::ResolveType, + typename ThenCallbackTraits::RejectType, + typename CatchCallbackTraits::ResolveType, + typename CatchCallbackTraits::RejectType>; + using ReturnedPromiseResolveT = typename ReturnedPromiseTraits::ResolveType; + using ReturnedPromiseRejectT = typename ReturnedPromiseTraits::RejectType; + + static_assert(!std::is_same<NoReject, RejectType>::value, + "Can't catch a NoReject promise."); + + static_assert(ReturnedPromiseTraits::valid, + "|on_resolve| callback and |on_resolve| callback must return " + "compatible types."); + + static_assert( + internal::IsValidPromiseArg<ResolveType, ThenCallbackArgT>::value || + std::is_void<ThenCallbackArgT>::value, + "|on_resolve| callback must accept Promise::ResolveType or void."); + + static_assert( + internal::IsValidPromiseArg<RejectType, CatchCallbackArgT>::value || + std::is_void<CatchCallbackArgT>::value, + "|on_reject| callback must accept Promise::RejectType or void."); + + static_assert( + !std::is_reference<ThenCallbackArgT>::value || + std::is_const<std::remove_reference_t<ThenCallbackArgT>>::value, + "Google C++ Style: References in function parameters must be const."); + + static_assert( + !std::is_reference<CatchCallbackArgT>::value || + std::is_const<std::remove_reference_t<CatchCallbackArgT>>::value, + "Google C++ Style: References in function parameters must be const."); + + return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( + ConstructHereAbstractPromiseWithSinglePrerequisite( + from_here, abstract_promise_.get(), + internal::PromiseExecutor::Data( + in_place_type_t<internal::ThenAndCatchExecutor< + OnceCallback<typename ThenCallbackTraits::SignatureType>, + OnceCallback<typename CatchCallbackTraits::SignatureType>, + ResolveType, RejectType, Resolved<ReturnedPromiseResolveT>, + Rejected<ReturnedPromiseRejectT>>>(), + internal::ToCallbackBase(std::move(on_resolve)), + internal::ToCallbackBase(std::move(on_reject))))); } // A task to execute |finally_callback| on |task_runner| is posted after the @@ -405,8 +541,24 @@ class Promise { template <typename FinallyCb> auto FinallyHere(const Location& from_here, FinallyCb finally_callback) noexcept { - return FinallyOn(internal::GetCurrentSequence(), from_here, - std::move(finally_callback)); + // Extract properties from |finally_callback| callback. + using CallbackTraits = internal::CallbackTraits<FinallyCb>; + using ReturnedPromiseResolveT = typename CallbackTraits::ResolveType; + using ReturnedPromiseRejectT = typename CallbackTraits::RejectType; + + using CallbackArgT = typename CallbackTraits::ArgType; + static_assert(std::is_void<CallbackArgT>::value, + "|finally_callback| callback must have no arguments"); + + return Promise<ReturnedPromiseResolveT, ReturnedPromiseRejectT>( + ConstructHereAbstractPromiseWithSinglePrerequisite( + from_here, abstract_promise_.get(), + internal::PromiseExecutor::Data( + in_place_type_t<internal::FinallyExecutor< + OnceCallback<typename CallbackTraits::ReturnType()>, + Resolved<ReturnedPromiseResolveT>, + Rejected<ReturnedPromiseRejectT>>>(), + internal::ToCallbackBase(std::move(finally_callback))))); } template <typename... Args> @@ -442,8 +594,6 @@ class Promise { nullptr, from_here, nullptr, RejectPolicy::kMustCatchRejection, internal::DependentList::ConstructResolved(), std::move(executor_data))); - promise->emplace(in_place_type_t<Rejected<RejectType>>(), - std::forward<Args>(args)...); return Promise<ResolveType, RejectType>(std::move(promise)); } @@ -454,6 +604,11 @@ class Promise { abstract_promise_->IgnoreUncaughtCatchForTesting(); } + const scoped_refptr<internal::AbstractPromise>& GetScopedRefptrForTesting() + const { + return abstract_promise_; + } + private: template <typename A, typename B> friend class Promise; @@ -471,8 +626,6 @@ class Promise { template <typename A, typename B> friend class ManualPromiseResolver; - - scoped_refptr<internal::AbstractPromise> abstract_promise_; }; // Used for manually resolving and rejecting a Promise. This is for @@ -624,8 +777,8 @@ class Promises { std::vector<internal::DependentList::Node> prerequisite_list( sizeof...(promises)); int i = 0; - for (auto&& p : {promises.abstract_promise_...}) { - prerequisite_list[i++].SetPrerequisite(p.get()); + for (auto&& p : {promises.abstract_promise_.get()...}) { + prerequisite_list[i++].SetPrerequisite(p); } internal::PromiseExecutor::Data executor_data( diff --git a/chromium/base/task/promise/promise_unittest.cc b/chromium/base/task/promise/promise_unittest.cc index bb3dda41cb3..d81adacb1a0 100644 --- a/chromium/base/task/promise/promise_unittest.cc +++ b/chromium/base/task/promise/promise_unittest.cc @@ -106,6 +106,86 @@ TEST(PromiseMemoryLeakTest, TargetTaskRunnerClearsTasks) { EXPECT_TRUE(delete_reply_flag); } +TEST(PromiseMemoryLeakTest, GetResolveCallbackNeverRun) { + test::TaskEnvironment task_environment_; + OnceCallback<void(int)> cb; + MockObject mock_object; + bool delete_task_flag = false; + + { + ManualPromiseResolver<int> p(FROM_HERE); + cb = p.GetResolveCallback(); + + p.promise().ThenHere( + FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object), + MakeRefCounted<ObjectToDelete>(&delete_task_flag))); + } + + EXPECT_FALSE(delete_task_flag); + cb = OnceCallback<void(int)>(); + EXPECT_TRUE(delete_task_flag); +} + +TEST(PromiseMemoryLeakTest, GetRepeatingResolveCallbackNeverRun) { + test::TaskEnvironment task_environment_; + RepeatingCallback<void(int)> cb; + MockObject mock_object; + bool delete_task_flag = false; + + { + ManualPromiseResolver<int> p(FROM_HERE); + cb = p.GetRepeatingResolveCallback(); + + p.promise().ThenHere( + FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object), + MakeRefCounted<ObjectToDelete>(&delete_task_flag))); + } + + EXPECT_FALSE(delete_task_flag); + cb = RepeatingCallback<void(int)>(); + EXPECT_TRUE(delete_task_flag); +} + +TEST(PromiseMemoryLeakTest, GetRejectCallbackNeverRun) { + test::TaskEnvironment task_environment_; + OnceCallback<void(int)> cb; + MockObject mock_object; + bool delete_task_flag = false; + + { + ManualPromiseResolver<void, int> p(FROM_HERE); + cb = p.GetRejectCallback(); + + p.promise().CatchHere( + FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object), + MakeRefCounted<ObjectToDelete>(&delete_task_flag))); + } + + EXPECT_FALSE(delete_task_flag); + cb = OnceCallback<void(int)>(); + EXPECT_TRUE(delete_task_flag); +} + +TEST(PromiseMemoryLeakTest, GetRepeatingRejectCallbackNeverRun) { + test::TaskEnvironment task_environment_; + RepeatingCallback<void(int)> cb; + MockObject mock_object; + bool delete_task_flag = false; + + { + ManualPromiseResolver<void, int> p(FROM_HERE); + cb = p.GetRepeatingRejectCallback(); + + p.promise().CatchHere( + FROM_HERE, BindOnce(&MockObject::Task, Unretained(&mock_object), + MakeRefCounted<ObjectToDelete>(&delete_task_flag))); + } + + EXPECT_FALSE(delete_task_flag); + cb = RepeatingCallback<void(int)>(); + EXPECT_TRUE(delete_task_flag); +} + TEST_F(PromiseTest, GetResolveCallbackThen) { ManualPromiseResolver<int> p(FROM_HERE); p.GetResolveCallback().Run(123); @@ -1602,13 +1682,17 @@ TEST_F(PromiseTest, MoveOnlyTypeMultipleCatchesNotAllowed) { auto p = Promise<void, std::unique_ptr<int>>::CreateRejected( FROM_HERE, std::make_unique<int>(123)); - p.CatchHere(FROM_HERE, - BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); })); + auto r = p.CatchHere( + FROM_HERE, BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); })); EXPECT_DCHECK_DEATH({ p.CatchHere(FROM_HERE, BindOnce([](std::unique_ptr<int> i) { EXPECT_EQ(123, *i); })); }); + + // TODO(alexclarke): Temporary, remove when SequenceManager handles promises + // natively. + r.GetScopedRefptrForTesting()->OnCanceled(); #endif } diff --git a/chromium/base/task/promise/promise_value.h b/chromium/base/task/promise/promise_value.h index 32ec758716d..3b1cfc43464 100644 --- a/chromium/base/task/promise/promise_value.h +++ b/chromium/base/task/promise/promise_value.h @@ -40,9 +40,6 @@ struct Resolved { "Can't have Resolved<NoResolve>"); } - Resolved(const Resolved& other) = default; - Resolved(Resolved&& other) = default; - // Conversion constructor accepts any arguments except Resolved<T>. template < typename... Args, @@ -75,9 +72,6 @@ struct Rejected { "Can't have Rejected<NoReject>"); } - Rejected(const Rejected& other) = default; - Rejected(Rejected&& other) = default; - // Conversion constructor accepts any arguments except Rejected<T>. template < typename... Args, diff --git a/chromium/base/task/promise/then_and_catch_executor.cc b/chromium/base/task/promise/then_and_catch_executor.cc index 5f7f8f09ad0..5eb38e99b78 100644 --- a/chromium/base/task/promise/then_and_catch_executor.cc +++ b/chromium/base/task/promise/then_and_catch_executor.cc @@ -8,14 +8,14 @@ namespace base { namespace internal { bool ThenAndCatchExecutorCommon::IsCancelled() const { - if (!resolve_callback_.is_null()) { + if (!then_callback_.is_null()) { // If there is both a resolve and a reject executor they must be canceled // at the same time. - DCHECK(reject_callback_.is_null() || - reject_callback_.IsCancelled() == resolve_callback_.IsCancelled()); - return resolve_callback_.IsCancelled(); + DCHECK(catch_callback_.is_null() || + catch_callback_.IsCancelled() == then_callback_.IsCancelled()); + return then_callback_.IsCancelled(); } - return reject_callback_.IsCancelled(); + return catch_callback_.IsCancelled(); } void ThenAndCatchExecutorCommon::Execute(AbstractPromise* promise, @@ -23,16 +23,16 @@ void ThenAndCatchExecutorCommon::Execute(AbstractPromise* promise, ExecuteCallback execute_catch) { AbstractPromise* prerequisite = promise->GetOnlyPrerequisite(); if (prerequisite->IsResolved()) { - if (ProcessNullCallback(resolve_callback_, prerequisite, promise)) + if (ProcessNullCallback(then_callback_, prerequisite, promise)) return; - execute_then(prerequisite, promise, &resolve_callback_); + execute_then(prerequisite, promise, &then_callback_); } else { DCHECK(prerequisite->IsRejected()); - if (ProcessNullCallback(reject_callback_, prerequisite, promise)) + if (ProcessNullCallback(catch_callback_, prerequisite, promise)) return; - execute_catch(prerequisite, promise, &reject_callback_); + execute_catch(prerequisite, promise, &catch_callback_); } } diff --git a/chromium/base/task/promise/then_and_catch_executor.h b/chromium/base/task/promise/then_and_catch_executor.h index 56d30ba2a11..7668fc1fbcb 100644 --- a/chromium/base/task/promise/then_and_catch_executor.h +++ b/chromium/base/task/promise/then_and_catch_executor.h @@ -17,11 +17,11 @@ namespace internal { // Exists to reduce template bloat. class BASE_EXPORT ThenAndCatchExecutorCommon { public: - ThenAndCatchExecutorCommon(internal::CallbackBase&& resolve_executor, - internal::CallbackBase&& reject_executor) noexcept - : resolve_callback_(std::move(resolve_executor)), - reject_callback_(std::move(reject_executor)) { - DCHECK(!resolve_callback_.is_null() || !reject_callback_.is_null()); + ThenAndCatchExecutorCommon(internal::CallbackBase&& then_callback, + internal::CallbackBase&& catch_callback) noexcept + : then_callback_(std::move(then_callback)), + catch_callback_(std::move(catch_callback)) { + DCHECK(!then_callback_.is_null() || !catch_callback_.is_null()); } ~ThenAndCatchExecutorCommon() = default; @@ -44,24 +44,23 @@ class BASE_EXPORT ThenAndCatchExecutorCommon { AbstractPromise* arg, AbstractPromise* result); - CallbackBase resolve_callback_; - CallbackBase reject_callback_; + CallbackBase then_callback_; + CallbackBase catch_callback_; }; // Tag signals no callback which is used to eliminate dead code. struct NoCallback {}; -template <typename ResolveOnceCallback, - typename RejectOnceCallback, +template <typename ThenOnceCallback, + typename CatchOnceCallback, typename ArgResolve, typename ArgReject, typename ResolveStorage, typename RejectStorage> class ThenAndCatchExecutor { public: - using ResolveReturnT = - typename CallbackTraits<ResolveOnceCallback>::ReturnType; - using RejectReturnT = typename CallbackTraits<RejectOnceCallback>::ReturnType; + using ThenReturnT = typename CallbackTraits<ThenOnceCallback>::ReturnType; + using CatchReturnT = typename CallbackTraits<CatchOnceCallback>::ReturnType; using PrerequisiteCouldResolve = std::integral_constant<bool, !std::is_same<ArgResolve, NoCallback>::value>; @@ -69,8 +68,8 @@ class ThenAndCatchExecutor { std::integral_constant<bool, !std::is_same<ArgReject, NoCallback>::value>; ThenAndCatchExecutor(CallbackBase&& resolve_callback, - CallbackBase&& reject_callback) noexcept - : common_(std::move(resolve_callback), std::move(reject_callback)) {} + CallbackBase&& catch_callback) noexcept + : common_(std::move(resolve_callback), std::move(catch_callback)) {} bool IsCancelled() const { return common_.IsCancelled(); } @@ -85,29 +84,29 @@ class ThenAndCatchExecutor { #if DCHECK_IS_ON() PromiseExecutor::ArgumentPassingType ResolveArgumentPassingType() const { - return common_.resolve_callback_.is_null() + return common_.then_callback_.is_null() ? PromiseExecutor::ArgumentPassingType::kNoCallback - : CallbackTraits<ResolveOnceCallback>::argument_passing_type; + : CallbackTraits<ThenOnceCallback>::argument_passing_type; } PromiseExecutor::ArgumentPassingType RejectArgumentPassingType() const { - return common_.reject_callback_.is_null() + return common_.catch_callback_.is_null() ? PromiseExecutor::ArgumentPassingType::kNoCallback - : CallbackTraits<RejectOnceCallback>::argument_passing_type; + : CallbackTraits<CatchOnceCallback>::argument_passing_type; } bool CanResolve() const { - return (!common_.resolve_callback_.is_null() && - PromiseCallbackTraits<ResolveReturnT>::could_resolve) || - (!common_.reject_callback_.is_null() && - PromiseCallbackTraits<RejectReturnT>::could_resolve); + return (!common_.then_callback_.is_null() && + PromiseCallbackTraits<ThenReturnT>::could_resolve) || + (!common_.catch_callback_.is_null() && + PromiseCallbackTraits<CatchReturnT>::could_resolve); } bool CanReject() const { - return (!common_.resolve_callback_.is_null() && - PromiseCallbackTraits<ResolveReturnT>::could_reject) || - (!common_.reject_callback_.is_null() && - PromiseCallbackTraits<RejectReturnT>::could_reject); + return (!common_.then_callback_.is_null() && + PromiseCallbackTraits<ThenReturnT>::could_reject) || + (!common_.catch_callback_.is_null() && + PromiseCallbackTraits<CatchReturnT>::could_reject); } #endif @@ -121,8 +120,8 @@ class ThenAndCatchExecutor { static void ExecuteCatch(AbstractPromise* prerequisite, AbstractPromise* promise, - CallbackBase* reject_callback) { - ExecuteCatchInternal(prerequisite, promise, reject_callback, + CallbackBase* catch_callback) { + ExecuteCatchInternal(prerequisite, promise, catch_callback, PrerequisiteCouldReject()); } @@ -130,10 +129,16 @@ class ThenAndCatchExecutor { AbstractPromise* promise, CallbackBase* resolve_callback, std::true_type can_resolve) { - RunHelper<ResolveOnceCallback, Resolved<ArgResolve>, ResolveStorage, - RejectStorage>:: - Run(std::move(*static_cast<ResolveOnceCallback*>(resolve_callback)), - prerequisite, promise); + // Internally RunHelper uses const RepeatingCallback<>& to avoid the + // binary size overhead of moving a scoped_refptr<> about. We respect + // the onceness of the callback and RunHelper will overwrite the callback + // with the result. + using RepeatingThenCB = + typename ToRepeatingCallback<ThenOnceCallback>::value; + RunHelper< + RepeatingThenCB, Resolved<ArgResolve>, ResolveStorage, + RejectStorage>::Run(*static_cast<RepeatingThenCB*>(resolve_callback), + prerequisite, promise); } static void ExecuteThenInternal(AbstractPromise* prerequisite, @@ -145,17 +150,23 @@ class ThenAndCatchExecutor { static void ExecuteCatchInternal(AbstractPromise* prerequisite, AbstractPromise* promise, - CallbackBase* reject_callback, + CallbackBase* catch_callback, std::true_type can_reject) { - RunHelper<RejectOnceCallback, Rejected<ArgReject>, ResolveStorage, - RejectStorage>:: - Run(std::move(*static_cast<RejectOnceCallback*>(reject_callback)), - prerequisite, promise); + // Internally RunHelper uses const RepeatingCallback<>& to avoid the + // binary size overhead of moving a scoped_refptr<> about. We respect + // the onceness of the callback and RunHelper will overwrite the callback + // with the result. + using RepeatingCatchCB = + typename ToRepeatingCallback<CatchOnceCallback>::value; + RunHelper< + RepeatingCatchCB, Rejected<ArgReject>, ResolveStorage, + RejectStorage>::Run(*static_cast<RepeatingCatchCB*>(catch_callback), + prerequisite, promise); } static void ExecuteCatchInternal(AbstractPromise* prerequisite, AbstractPromise* promise, - CallbackBase* reject_callback, + CallbackBase* catch_callback, std::false_type can_reject) { // |prerequisite| can't reject so don't generate dead code. } diff --git a/chromium/base/task/sequence_manager/sequence_manager.h b/chromium/base/task/sequence_manager/sequence_manager.h index cd2163d7793..6e1cb0b752b 100644 --- a/chromium/base/task/sequence_manager/sequence_manager.h +++ b/chromium/base/task/sequence_manager/sequence_manager.h @@ -11,6 +11,7 @@ #include "base/macros.h" #include "base/message_loop/message_pump_type.h" #include "base/message_loop/timer_slack.h" +#include "base/sequenced_task_runner.h" #include "base/single_thread_task_runner.h" #include "base/task/sequence_manager/task_queue_impl.h" #include "base/task/sequence_manager/task_time_observer.h" @@ -19,6 +20,7 @@ namespace base { class MessagePump; +class TaskObserver; namespace sequence_manager { @@ -102,6 +104,11 @@ class BASE_EXPORT SequenceManager { kNone, kEnabled, kEnabledWithBacktrace, + + // Logs high priority tasks and the lower priority tasks they skipped + // past. Useful for debugging test failures caused by scheduler policy + // changes. + kReorderedOnly, }; TaskLogging task_execution_logging = TaskLogging::kNone; @@ -141,6 +148,10 @@ class BASE_EXPORT SequenceManager { // performs this initialization automatically. virtual void BindToCurrentThread() = 0; + // Returns the task runner the current task was posted on. Returns null if no + // task is currently running. Must be called on the bound thread. + virtual scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() = 0; + // Finishes the initialization for a SequenceManager created via // CreateUnboundSequenceManagerWithPump(). Must not be called in any other // circumstances. The ownership of the pump is transferred to SequenceManager. @@ -238,6 +249,14 @@ class BASE_EXPORT SequenceManager { virtual std::unique_ptr<NativeWorkHandle> OnNativeWorkPending( TaskQueue::QueuePriority priority) = 0; + // Adds an observer which reports task execution. Can only be called on the + // same thread that |this| is running on. + virtual void AddTaskObserver(TaskObserver* task_observer) = 0; + + // Removes an observer which reports task execution. Can only be called on the + // same thread that |this| is running on. + virtual void RemoveTaskObserver(TaskObserver* task_observer) = 0; + protected: virtual std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( const TaskQueue::Spec& spec) = 0; diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl.cc b/chromium/base/task/sequence_manager/sequence_manager_impl.cc index 26977d6687b..7eea742aeb2 100644 --- a/chromium/base/task/sequence_manager/sequence_manager_impl.cc +++ b/chromium/base/task/sequence_manager/sequence_manager_impl.cc @@ -320,6 +320,16 @@ void SequenceManagerImpl::BindToCurrentThread( BindToMessagePump(std::move(pump)); } +scoped_refptr<SequencedTaskRunner> +SequenceManagerImpl::GetTaskRunnerForCurrentTask() { + DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); + if (main_thread_only().task_execution_stack.empty()) + return nullptr; + return main_thread_only() + .task_execution_stack.back() + .pending_task.task_runner; +} + void SequenceManagerImpl::CompleteInitializationOnBoundThread() { controller_->AddNestingObserver(this); main_thread_only().nesting_observer_registered_ = true; @@ -488,10 +498,10 @@ const char* RunTaskTraceNameForPriority(TaskQueue::QueuePriority priority) { } // namespace -Optional<Task> SequenceManagerImpl::TakeTask() { - Optional<Task> task = TakeTaskImpl(); +Task* SequenceManagerImpl::SelectNextTask() { + Task* task = SelectNextTaskImpl(); if (!task) - return base::nullopt; + return nullptr; ExecutingTask& executing_task = *main_thread_only().task_execution_stack.rbegin(); @@ -503,62 +513,70 @@ Optional<Task> SequenceManagerImpl::TakeTask() { "task_type", executing_task.task_type); TRACE_EVENT_BEGIN0("sequence_manager", executing_task.task_queue_name); -#if DCHECK_IS_ON() && !defined(OS_NACL) - LogTaskDebugInfo(executing_task); -#endif - return task; } #if DCHECK_IS_ON() && !defined(OS_NACL) void SequenceManagerImpl::LogTaskDebugInfo( - const ExecutingTask& executing_task) { + const WorkQueue* selected_work_queue) const { + const Task* task = selected_work_queue->GetFrontTask(); switch (settings_.task_execution_logging) { case Settings::TaskLogging::kNone: break; case Settings::TaskLogging::kEnabled: - LOG(INFO) << "#" - << static_cast<uint64_t>( - executing_task.pending_task.enqueue_order()) - << " " << executing_task.task_queue_name - << (executing_task.pending_task.cross_thread_ - ? " Run crossthread " - : " Run ") - << executing_task.pending_task.posted_from.ToString(); + LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " " + << selected_work_queue->task_queue()->GetName() + << (task->cross_thread_ ? " Run crossthread " : " Run ") + << task->posted_from.ToString(); break; case Settings::TaskLogging::kEnabledWithBacktrace: { std::array<const void*, PendingTask::kTaskBacktraceLength + 1> task_trace; - task_trace[0] = executing_task.pending_task.posted_from.program_counter(); - std::copy(executing_task.pending_task.task_backtrace.begin(), - executing_task.pending_task.task_backtrace.end(), + task_trace[0] = task->posted_from.program_counter(); + std::copy(task->task_backtrace.begin(), task->task_backtrace.end(), task_trace.begin() + 1); size_t length = 0; while (length < task_trace.size() && task_trace[length]) ++length; if (length == 0) break; - LOG(INFO) << "#" - << static_cast<uint64_t>( - executing_task.pending_task.enqueue_order()) - << " " << executing_task.task_queue_name - << (executing_task.pending_task.cross_thread_ - ? " Run crossthread " - : " Run ") + LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " " + << selected_work_queue->task_queue()->GetName() + << (task->cross_thread_ ? " Run crossthread " : " Run ") << debug::StackTrace(task_trace.data(), length); break; } + + case Settings::TaskLogging::kReorderedOnly: { + std::vector<const Task*> skipped_tasks; + main_thread_only().selector.CollectSkippedOverLowerPriorityTasks( + selected_work_queue, &skipped_tasks); + + if (skipped_tasks.empty()) + break; + + LOG(INFO) << "#" << static_cast<uint64_t>(task->enqueue_order()) << " " + << selected_work_queue->task_queue()->GetName() + << (task->cross_thread_ ? " Run crossthread " : " Run ") + << task->posted_from.ToString(); + + for (const Task* skipped_task : skipped_tasks) { + LOG(INFO) << "# (skipped over) " + << static_cast<uint64_t>(skipped_task->enqueue_order()) << " " + << skipped_task->posted_from.ToString(); + } + } } } #endif // DCHECK_IS_ON() && !defined(OS_NACL) -Optional<Task> SequenceManagerImpl::TakeTaskImpl() { +Task* SequenceManagerImpl::SelectNextTaskImpl() { CHECK(Validate()); DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), - "SequenceManagerImpl::TakeTask"); + "SequenceManagerImpl::SelectNextTask"); ReloadEmptyWorkQueues(); LazyNow lazy_now(controller_->GetClock()); @@ -579,7 +597,7 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() { this, AsValueWithSelectorResult(work_queue, /* force_verbose */ false)); if (!work_queue) - return nullopt; + return nullptr; // If the head task was canceled, remove it and run the selector again. if (UNLIKELY(work_queue->RemoveAllCanceledTasksFromFront())) @@ -604,9 +622,13 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() { work_queue->task_queue()->GetQueuePriority()))) { TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), "SequenceManager.YieldToNative"); - return nullopt; + return nullptr; } +#if DCHECK_IS_ON() && !defined(OS_NACL) + LogTaskDebugInfo(work_queue); +#endif // DCHECK_IS_ON() && !defined(OS_NACL) + main_thread_only().task_execution_stack.emplace_back( work_queue->TakeTaskFromWorkQueue(), work_queue->task_queue(), InitializeTaskTiming(work_queue->task_queue())); @@ -615,7 +637,7 @@ Optional<Task> SequenceManagerImpl::TakeTaskImpl() { *main_thread_only().task_execution_stack.rbegin(); NotifyWillProcessTask(&executing_task, &lazy_now); - return std::move(executing_task.pending_task); + return &executing_task.pending_task; } } diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl.h b/chromium/base/task/sequence_manager/sequence_manager_impl.h index a8c1659f52a..1ee7944a06e 100644 --- a/chromium/base/task/sequence_manager/sequence_manager_impl.h +++ b/chromium/base/task/sequence_manager/sequence_manager_impl.h @@ -25,6 +25,7 @@ #include "base/message_loop/message_pump_type.h" #include "base/pending_task.h" #include "base/run_loop.h" +#include "base/sequenced_task_runner.h" #include "base/single_thread_task_runner.h" #include "base/synchronization/lock.h" #include "base/task/common/task_annotator.h" @@ -41,8 +42,6 @@ namespace base { -class TaskObserver; - namespace trace_event { class ConvertableToTraceFormat; } // namespace trace_event @@ -103,6 +102,7 @@ class BASE_EXPORT SequenceManagerImpl // SequenceManager implementation: void BindToCurrentThread() override; + scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override; void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override; void SetObserver(Observer* observer) override; void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; @@ -126,16 +126,16 @@ class BASE_EXPORT SequenceManagerImpl std::string DescribeAllPendingTasks() const override; std::unique_ptr<NativeWorkHandle> OnNativeWorkPending( TaskQueue::QueuePriority priority) override; + void AddTaskObserver(TaskObserver* task_observer) override; + void RemoveTaskObserver(TaskObserver* task_observer) override; // SequencedTaskSource implementation: - Optional<Task> TakeTask() override; + Task* SelectNextTask() override; void DidRunTask() override; TimeDelta DelayTillNextTask(LazyNow* lazy_now) const override; bool HasPendingHighResolutionTasks() override; bool OnSystemIdle() override; - void AddTaskObserver(TaskObserver* task_observer); - void RemoveTaskObserver(TaskObserver* task_observer); void AddDestructionObserver( MessageLoopCurrent::DestructionObserver* destruction_observer); void RemoveDestructionObserver( @@ -230,7 +230,8 @@ class BASE_EXPORT SequenceManagerImpl // We have to track rentrancy because we support nested runloops but the // selector interface is unaware of those. This struct keeps track off all - // task related state needed to make pairs of TakeTask() / DidRunTask() work. + // task related state needed to make pairs of SelectNextTask() / DidRunTask() + // work. struct ExecutingTask { ExecutingTask(Task&& task, internal::TaskQueueImpl* task_queue, @@ -377,8 +378,8 @@ class BASE_EXPORT SequenceManagerImpl void RecordCrashKeys(const PendingTask&); // Helper to terminate all scoped trace events to allow starting new ones - // in TakeTask(). - Optional<Task> TakeTaskImpl(); + // in SelectNextTask(). + Task* SelectNextTaskImpl(); // Check if a task of priority |priority| should run given the pending set of // native work. @@ -388,7 +389,7 @@ class BASE_EXPORT SequenceManagerImpl TimeDelta GetDelayTillNextDelayedTask(LazyNow* lazy_now) const; #if DCHECK_IS_ON() - void LogTaskDebugInfo(const ExecutingTask& executing_task); + void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const; #endif // Determines if wall time or thread time should be recorded for the next diff --git a/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc b/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc index cb2f91b8176..3e1bb5dc3db 100644 --- a/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc +++ b/chromium/base/task/sequence_manager/sequence_manager_impl_unittest.cc @@ -566,6 +566,18 @@ class TestCountUsesTimeSource : public TickClock { DISALLOW_COPY_AND_ASSIGN(TestCountUsesTimeSource); }; +TEST_P(SequenceManagerTest, GetCorrectTaskRunnerForCurrentTask) { + auto queue = CreateTaskQueue(); + + queue->task_runner()->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_EQ(queue->task_runner(), + sequence_manager()->GetTaskRunnerForCurrentTask()); + })); + + RunLoop().RunUntilIdle(); +} + TEST_P(SequenceManagerTest, NowNotCalledIfUnneeded) { sequence_manager()->SetWorkBatchSize(6); @@ -2356,9 +2368,9 @@ TEST_P(SequenceManagerTest, TaskQueueObserver_ImmediateTask) { Mock::VerifyAndClearExpectations(&observer); // Unless the immediate work queue is emptied. - sequence_manager()->TakeTask(); + sequence_manager()->SelectNextTask(); sequence_manager()->DidRunTask(); - sequence_manager()->TakeTask(); + sequence_manager()->SelectNextTask(); sequence_manager()->DidRunTask(); EXPECT_CALL(observer, OnPostTask(_, TimeDelta())); EXPECT_CALL(observer, OnQueueNextWakeUpChanged(_)); diff --git a/chromium/base/task/sequence_manager/sequenced_task_source.h b/chromium/base/task/sequence_manager/sequenced_task_source.h index b1153fb32e8..5ea8874ab5e 100644 --- a/chromium/base/task/sequence_manager/sequenced_task_source.h +++ b/chromium/base/task/sequence_manager/sequenced_task_source.h @@ -19,13 +19,13 @@ class SequencedTaskSource { public: virtual ~SequencedTaskSource() = default; - // Returns the next task to run from this source or nullopt if + // Returns the next task to run from this source or nullptr if // there're no more tasks ready to run. If a task is returned, - // DidRunTask() must be invoked before the next call to TakeTask(). - virtual Optional<Task> TakeTask() = 0; + // DidRunTask() must be invoked before the next call to SelectNextTask(). + virtual Task* SelectNextTask() = 0; // Notifies this source that the task previously obtained - // from TakeTask() has been completed. + // from SelectNextTask() has been completed. virtual void DidRunTask() = 0; // Returns the delay till the next task or TimeDelta::Max() diff --git a/chromium/base/task/sequence_manager/task_queue_impl.cc b/chromium/base/task/sequence_manager/task_queue_impl.cc index d109f915ded..7c22f9c1121 100644 --- a/chromium/base/task/sequence_manager/task_queue_impl.cc +++ b/chromium/base/task/sequence_manager/task_queue_impl.cc @@ -78,16 +78,18 @@ TaskQueueImpl::TaskRunner::~TaskRunner() {} bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location, OnceClosure callback, TimeDelta delay) { - return task_poster_->PostTask(PostedTask(std::move(callback), location, delay, - Nestable::kNestable, task_type_)); + return task_poster_->PostTask(PostedTask(this, std::move(callback), location, + delay, Nestable::kNestable, + task_type_)); } bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask( const Location& location, OnceClosure callback, TimeDelta delay) { - return task_poster_->PostTask(PostedTask(std::move(callback), location, delay, - Nestable::kNonNestable, task_type_)); + return task_poster_->PostTask(PostedTask(this, std::move(callback), location, + delay, Nestable::kNonNestable, + task_type_)); } bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const { @@ -423,8 +425,10 @@ void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) { #endif // TODO(altimin): Add a copy method to Task to capture metadata here. + auto task_runner = pending_task.task_runner; PostImmediateTaskImpl( - PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask, + PostedTask(std::move(task_runner), + BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask, Unretained(this), std::move(pending_task)), FROM_HERE, TimeDelta(), Nestable::kNonNestable, pending_task.task_type), diff --git a/chromium/base/task/sequence_manager/task_queue_selector.cc b/chromium/base/task/sequence_manager/task_queue_selector.cc index 5e3a14a8e80..cb58d9b4f3a 100644 --- a/chromium/base/task/sequence_manager/task_queue_selector.cc +++ b/chromium/base/task/sequence_manager/task_queue_selector.cc @@ -159,6 +159,15 @@ void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) { } } +void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks( + const internal::WorkQueue* selected_work_queue, + std::vector<const Task*>* result) const { + delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks( + selected_work_queue, result); + immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks( + selected_work_queue, result); +} + #if DCHECK_IS_ON() || !defined(NDEBUG) bool TaskQueueSelector::CheckContainsQueueForTest( const internal::TaskQueueImpl* queue) const { diff --git a/chromium/base/task/sequence_manager/task_queue_selector.h b/chromium/base/task/sequence_manager/task_queue_selector.h index 2ae9b52ef05..8a79a5e52bd 100644 --- a/chromium/base/task/sequence_manager/task_queue_selector.h +++ b/chromium/base/task/sequence_manager/task_queue_selector.h @@ -76,6 +76,12 @@ class BASE_EXPORT TaskQueueSelector : public WorkQueueSets::Observer { void WorkQueueSetBecameEmpty(size_t set_index) override; void WorkQueueSetBecameNonEmpty(size_t set_index) override; + // Populates |result| with tasks with lower priority than the first task from + // |selected_work_queue| which could otherwise run now. + void CollectSkippedOverLowerPriorityTasks( + const internal::WorkQueue* selected_work_queue, + std::vector<const Task*>* result) const; + protected: WorkQueueSets* delayed_work_queue_sets() { return &delayed_work_queue_sets_; } diff --git a/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc b/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc index c99366471ba..504477048dd 100644 --- a/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc +++ b/chromium/base/task/sequence_manager/task_queue_selector_unittest.cc @@ -74,7 +74,7 @@ class TaskQueueSelectorTestBase : public testing::Test { EnqueueOrderGenerator enqueue_order_generator; for (size_t i = 0; i < num_tasks; i++) { task_queues_[queue_indices[i]]->immediate_work_queue()->Push( - Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), + Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), enqueue_order_generator.GenerateNext())); } } @@ -84,15 +84,15 @@ class TaskQueueSelectorTestBase : public testing::Test { size_t num_tasks) { for (size_t i = 0; i < num_tasks; i++) { task_queues_[queue_indices[i]]->immediate_work_queue()->Push(Task( - PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(enqueue_orders[i]))); + PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), + EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_orders[i]))); } } void PushTask(const size_t queue_index, const size_t enqueue_order) { task_queues_[queue_index]->immediate_work_queue()->Push( - Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(enqueue_order))); + Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), + EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order))); } std::vector<size_t> PopTasksAndReturnQueueIndices() { @@ -666,8 +666,8 @@ TEST_F(TaskQueueSelectorTest, ChooseWithPriority_Empty) { TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyDelayed) { task_queues_[0]->delayed_work_queue()->Push( - Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(2))); + Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), + EnqueueOrder(), EnqueueOrder::FromIntForTesting(2))); bool chose_delayed_over_immediate = false; EXPECT_EQ( @@ -680,8 +680,8 @@ TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyDelayed) { TEST_F(TaskQueueSelectorTest, ChooseWithPriority_OnlyImmediate) { task_queues_[0]->immediate_work_queue()->Push( - Task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(2))); + Task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), + EnqueueOrder(), EnqueueOrder::FromIntForTesting(2))); bool chose_delayed_over_immediate = false; EXPECT_EQ( @@ -706,8 +706,8 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithOneBlockedQueue) { task_queue->SetQueueEnabled(false); selector.DisableQueue(task_queue.get()); - Task task(PostedTask(test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(2)); + Task task(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), + EnqueueOrder(), EnqueueOrder::FromIntForTesting(2)); task_queue->immediate_work_queue()->Push(std::move(task)); EXPECT_EQ(nullptr, selector.SelectWorkQueueToService()); @@ -736,10 +736,10 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithTwoBlockedQueues) { selector.SetQueuePriority(task_queue2.get(), TaskQueue::kControlPriority); - Task task1(PostedTask(test_closure_, FROM_HERE), TimeTicks(), + Task task1(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder::FromIntForTesting(2), EnqueueOrder::FromIntForTesting(2)); - Task task2(PostedTask(test_closure_, FROM_HERE), TimeTicks(), + Task task2(PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder::FromIntForTesting(3), EnqueueOrder::FromIntForTesting(3)); task_queue->immediate_work_queue()->Push(std::move(task1)); @@ -762,6 +762,21 @@ TEST_F(TaskQueueSelectorTest, TestObserverWithTwoBlockedQueues) { task_queue2->UnregisterTaskQueue(); } +TEST_F(TaskQueueSelectorTest, CollectSkippedOverLowerPriorityTasks) { + size_t queue_order[] = {0, 1, 2, 3, 2, 1, 0}; + PushTasks(queue_order, 7); + selector_.SetQueuePriority(task_queues_[3].get(), TaskQueue::kHighPriority); + + std::vector<const Task*> result; + selector_.CollectSkippedOverLowerPriorityTasks( + task_queues_[3]->immediate_work_queue(), &result); + + ASSERT_EQ(3u, result.size()); + EXPECT_EQ(2u, result[0]->enqueue_order()); // The order here isn't important. + EXPECT_EQ(3u, result[1]->enqueue_order()); + EXPECT_EQ(4u, result[2]->enqueue_order()); +} + class DisabledAntiStarvationLogicTaskQueueSelectorTest : public TaskQueueSelectorTestBase, public testing::WithParamInterface<TaskQueue::QueuePriority> { @@ -839,13 +854,13 @@ class ChooseWithPriorityTest TEST_P(ChooseWithPriorityTest, RoundRobinTest) { task_queues_[0]->immediate_work_queue()->Push(Task( - PostedTask(test_closure_, FROM_HERE), TimeTicks(), + PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder::FromIntForTesting(GetParam().immediate_task_enqueue_order), EnqueueOrder::FromIntForTesting( GetParam().immediate_task_enqueue_order))); task_queues_[0]->delayed_work_queue()->Push(Task( - PostedTask(test_closure_, FROM_HERE), TimeTicks(), + PostedTask(nullptr, test_closure_, FROM_HERE), TimeTicks(), EnqueueOrder::FromIntForTesting(GetParam().delayed_task_enqueue_order), EnqueueOrder::FromIntForTesting(GetParam().delayed_task_enqueue_order))); diff --git a/chromium/base/task/sequence_manager/tasks.cc b/chromium/base/task/sequence_manager/tasks.cc index 14bd306b947..a3bd5ce60f6 100644 --- a/chromium/base/task/sequence_manager/tasks.cc +++ b/chromium/base/task/sequence_manager/tasks.cc @@ -17,6 +17,7 @@ Task::Task(internal::PostedTask posted_task, desired_run_time, posted_task.nestable), task_type(posted_task.task_type), + task_runner(std::move(posted_task.task_runner)), enqueue_order_(enqueue_order) { // We use |sequence_num| in DelayedWakeUp for ordering purposes and it // may wrap around to a negative number during the static cast, hence, @@ -28,9 +29,15 @@ Task::Task(internal::PostedTask posted_task, queue_time = posted_task.queue_time; } -namespace internal { +Task::Task(Task&& move_from) = default; + +Task::~Task() = default; -PostedTask::PostedTask(OnceClosure callback, +Task& Task::operator=(Task&& other) = default; + +namespace internal { +PostedTask::PostedTask(scoped_refptr<SequencedTaskRunner> task_runner, + OnceClosure callback, Location location, TimeDelta delay, Nestable nestable, @@ -39,7 +46,8 @@ PostedTask::PostedTask(OnceClosure callback, location(location), delay(delay), nestable(nestable), - task_type(task_type) {} + task_type(task_type), + task_runner(std::move(task_runner)) {} PostedTask::PostedTask(PostedTask&& move_from) noexcept : callback(std::move(move_from.callback)), @@ -47,6 +55,7 @@ PostedTask::PostedTask(PostedTask&& move_from) noexcept delay(move_from.delay), nestable(move_from.nestable), task_type(move_from.task_type), + task_runner(std::move(move_from.task_runner)), queue_time(move_from.queue_time) {} PostedTask::~PostedTask() = default; diff --git a/chromium/base/task/sequence_manager/tasks.h b/chromium/base/task/sequence_manager/tasks.h index 0c886c4b9ca..d1e1912ead2 100644 --- a/chromium/base/task/sequence_manager/tasks.h +++ b/chromium/base/task/sequence_manager/tasks.h @@ -6,6 +6,7 @@ #define BASE_TASK_SEQUENCE_MANAGER_TASKS_H_ #include "base/pending_task.h" +#include "base/sequenced_task_runner.h" #include "base/task/sequence_manager/enqueue_order.h" namespace base { @@ -21,7 +22,8 @@ enum class WakeUpResolution { kLow, kHigh }; // Wrapper around PostTask method arguments and the assigned task type. // Eventually it becomes a PendingTask once accepted by a TaskQueueImpl. struct BASE_EXPORT PostedTask { - explicit PostedTask(OnceClosure callback = OnceClosure(), + explicit PostedTask(scoped_refptr<SequencedTaskRunner> task_runner, + OnceClosure callback = OnceClosure(), Location location = Location(), TimeDelta delay = TimeDelta(), Nestable nestable = Nestable::kNestable, @@ -34,6 +36,9 @@ struct BASE_EXPORT PostedTask { TimeDelta delay; Nestable nestable; TaskType task_type; + // The task runner this task is running on. Can be used by task runners that + // support posting back to the "current sequence". + scoped_refptr<SequencedTaskRunner> task_runner; // The time at which the task was queued. TimeTicks queue_time; @@ -76,6 +81,9 @@ struct BASE_EXPORT Task : public PendingTask { EnqueueOrder enqueue_order = EnqueueOrder(), internal::WakeUpResolution wake_up_resolution = internal::WakeUpResolution::kLow); + Task(Task&& move_from); + ~Task(); + Task& operator=(Task&& other); internal::DelayedWakeUp delayed_wake_up() const { return internal::DelayedWakeUp{delayed_run_time, sequence_num}; @@ -97,6 +105,10 @@ struct BASE_EXPORT Task : public PendingTask { TaskType task_type; + // The task runner this task is running on. Can be used by task runners that + // support posting back to the "current sequence". + scoped_refptr<SequencedTaskRunner> task_runner; + #if DCHECK_IS_ON() bool cross_thread_; #endif diff --git a/chromium/base/task/sequence_manager/thread_controller_impl.cc b/chromium/base/task/sequence_manager/thread_controller_impl.cc index 22bffdb8a5b..676b9a6708d 100644 --- a/chromium/base/task/sequence_manager/thread_controller_impl.cc +++ b/chromium/base/task/sequence_manager/thread_controller_impl.cc @@ -174,7 +174,7 @@ void ThreadControllerImpl::DoWork(WorkType work_type) { WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr(); // TODO(scheduler-dev): Consider moving to a time based work batch instead. for (int i = 0; i < main_sequence_only().work_batch_size_; i++) { - Optional<PendingTask> task = sequence_->TakeTask(); + Task* task = sequence_->SelectNextTask(); if (!task) break; @@ -189,7 +189,7 @@ void ThreadControllerImpl::DoWork(WorkType work_type) { // Trace events should finish before we call DidRunTask to ensure that // SequenceManager trace events do not interfere with them. TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task); - task_annotator_.RunTask("SequenceManager RunTask", &*task); + task_annotator_.RunTask("SequenceManager RunTask", task); } if (!weak_ptr) diff --git a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc index deeda0e4260..9b0a058efbc 100644 --- a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc +++ b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl.cc @@ -342,7 +342,7 @@ TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl( DCHECK(main_thread_only().task_source); for (int i = 0; i < main_thread_only().work_batch_size; i++) { - Optional<Task> task = main_thread_only().task_source->TakeTask(); + Task* task = main_thread_only().task_source->SelectNextTask(); if (!task) break; @@ -362,7 +362,7 @@ TimeDelta ThreadControllerWithMessagePumpImpl::DoWorkImpl( // Trace events should finish before we call DidRunTask to ensure that // SequenceManager trace events do not interfere with them. TRACE_TASK_EXECUTION("ThreadControllerImpl::RunTask", *task); - task_annotator_.RunTask("SequenceManager RunTask", &*task); + task_annotator_.RunTask("SequenceManager RunTask", task); } #if DCHECK_IS_ON() diff --git a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc index 64eb8f7e74b..3c02becf16d 100644 --- a/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc +++ b/chromium/base/task/sequence_manager/thread_controller_with_message_pump_impl_unittest.cc @@ -79,17 +79,17 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource { explicit FakeSequencedTaskSource(TickClock* clock) : clock_(clock) {} ~FakeSequencedTaskSource() override = default; - Optional<Task> TakeTask() override { + Task* SelectNextTask() override { if (tasks_.empty()) - return nullopt; + return nullptr; if (tasks_.front().delayed_run_time > clock_->NowTicks()) - return nullopt; - Task task = std::move(tasks_.front()); + return nullptr; + running_stack_.push_back(std::move(tasks_.front())); tasks_.pop(); - return task; + return &running_stack_.back(); } - void DidRunTask() override {} + void DidRunTask() override { running_stack_.pop_back(); } TimeDelta DelayTillNextTask(LazyNow* lazy_now) const override { if (tasks_.empty()) @@ -106,8 +106,9 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource { TimeTicks delayed_run_time) { DCHECK(tasks_.empty() || delayed_run_time.is_null() || tasks_.back().delayed_run_time < delayed_run_time); - tasks_.push(Task(internal::PostedTask(std::move(task), posted_from), - delayed_run_time, EnqueueOrder::FromIntForTesting(13))); + tasks_.push( + Task(internal::PostedTask(nullptr, std::move(task), posted_from), + delayed_run_time, EnqueueOrder::FromIntForTesting(13))); } bool HasPendingHighResolutionTasks() override { return false; } @@ -117,6 +118,7 @@ class FakeSequencedTaskSource : public internal::SequencedTaskSource { private: TickClock* clock_; std::queue<Task> tasks_; + std::vector<Task> running_stack_; }; TimeTicks Seconds(int seconds) { diff --git a/chromium/base/task/sequence_manager/work_queue.cc b/chromium/base/task/sequence_manager/work_queue.cc index 0d9df4a150a..2dfd04da230 100644 --- a/chromium/base/task/sequence_manager/work_queue.cc +++ b/chromium/base/task/sequence_manager/work_queue.cc @@ -305,6 +305,16 @@ void WorkQueue::PopTaskForTesting() { tasks_.pop_front(); } +void WorkQueue::CollectTasksOlderThan(EnqueueOrder reference, + std::vector<const Task*>* result) const { + for (const Task& task : tasks_) { + if (task.enqueue_order() >= reference) + break; + + result->push_back(&task); + } +} + } // namespace internal } // namespace sequence_manager } // namespace base diff --git a/chromium/base/task/sequence_manager/work_queue.h b/chromium/base/task/sequence_manager/work_queue.h index 849d48cac5d..65fdee4ca28 100644 --- a/chromium/base/task/sequence_manager/work_queue.h +++ b/chromium/base/task/sequence_manager/work_queue.h @@ -161,6 +161,11 @@ class BASE_EXPORT WorkQueue { // Test support function. This should not be used in production code. void PopTaskForTesting(); + // Iterates through |tasks_| adding any that are older than |reference| to + // |result|. + void CollectTasksOlderThan(EnqueueOrder reference, + std::vector<const Task*>* result) const; + private: bool InsertFenceImpl(EnqueueOrder fence); diff --git a/chromium/base/task/sequence_manager/work_queue_sets.cc b/chromium/base/task/sequence_manager/work_queue_sets.cc index c2f9886271d..68ec9613338 100644 --- a/chromium/base/task/sequence_manager/work_queue_sets.cc +++ b/chromium/base/task/sequence_manager/work_queue_sets.cc @@ -237,6 +237,19 @@ bool WorkQueueSets::ContainsWorkQueueForTest( } #endif +void WorkQueueSets::CollectSkippedOverLowerPriorityTasks( + const internal::WorkQueue* selected_work_queue, + std::vector<const Task*>* result) const { + EnqueueOrder selected_enqueue_order; + CHECK(selected_work_queue->GetFrontTaskEnqueueOrder(&selected_enqueue_order)); + for (size_t priority = selected_work_queue->work_queue_set_index() + 1; + priority < TaskQueue::kQueuePriorityCount; priority++) { + for (const OldestTaskEnqueueOrder& pair : work_queue_heaps_[priority]) { + pair.value->CollectTasksOlderThan(selected_enqueue_order, result); + } + } +} + } // namespace internal } // namespace sequence_manager } // namespace base diff --git a/chromium/base/task/sequence_manager/work_queue_sets.h b/chromium/base/task/sequence_manager/work_queue_sets.h index 90cc6d789c1..f128c62c369 100644 --- a/chromium/base/task/sequence_manager/work_queue_sets.h +++ b/chromium/base/task/sequence_manager/work_queue_sets.h @@ -95,6 +95,12 @@ class BASE_EXPORT WorkQueueSets { const char* GetName() const { return name_; } + // Collects ready tasks which where skipped over when |selected_work_queue| + // was selected. Note this is somewhat expensive. + void CollectSkippedOverLowerPriorityTasks( + const internal::WorkQueue* selected_work_queue, + std::vector<const Task*>* result) const; + private: struct OldestTaskEnqueueOrder { EnqueueOrder key; diff --git a/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc b/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc index 30f3bb8fa68..6450573d5b1 100644 --- a/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc +++ b/chromium/base/task/sequence_manager/work_queue_sets_unittest.cc @@ -51,14 +51,14 @@ class WorkQueueSetsTest : public testing::Test { } Task FakeTaskWithEnqueueOrder(int enqueue_order) { - Task fake_task(PostedTask(BindOnce([] {}), FROM_HERE), TimeTicks(), + Task fake_task(PostedTask(nullptr, BindOnce([] {}), FROM_HERE), TimeTicks(), EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order)); return fake_task; } Task FakeNonNestableTaskWithEnqueueOrder(int enqueue_order) { - Task fake_task(PostedTask(BindOnce([] {}), FROM_HERE), TimeTicks(), + Task fake_task(PostedTask(nullptr, BindOnce([] {}), FROM_HERE), TimeTicks(), EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order)); fake_task.nestable = Nestable::kNonNestable; @@ -331,6 +331,32 @@ TEST_F(WorkQueueSetsTest, PushNonNestableTaskToFront) { EXPECT_EQ(queue1, work_queue_sets_->GetOldestQueueInSet(set)); } +TEST_F(WorkQueueSetsTest, CollectSkippedOverLowerPriorityTasks) { + WorkQueue* queue1 = NewTaskQueue("queue1"); + WorkQueue* queue2 = NewTaskQueue("queue2"); + WorkQueue* queue3 = NewTaskQueue("queue3"); + + work_queue_sets_->ChangeSetIndex(queue1, 3); + work_queue_sets_->ChangeSetIndex(queue2, 2); + work_queue_sets_->ChangeSetIndex(queue3, 1); + + queue1->Push(FakeTaskWithEnqueueOrder(1)); + queue1->Push(FakeTaskWithEnqueueOrder(2)); + queue2->Push(FakeTaskWithEnqueueOrder(3)); + queue3->Push(FakeTaskWithEnqueueOrder(4)); + queue3->Push(FakeTaskWithEnqueueOrder(5)); + queue2->Push(FakeTaskWithEnqueueOrder(6)); + queue1->Push(FakeTaskWithEnqueueOrder(7)); + + std::vector<const Task*> result; + work_queue_sets_->CollectSkippedOverLowerPriorityTasks(queue3, &result); + + ASSERT_EQ(3u, result.size()); + EXPECT_EQ(3u, result[0]->enqueue_order()); // The order here isn't important. + EXPECT_EQ(1u, result[1]->enqueue_order()); + EXPECT_EQ(2u, result[2]->enqueue_order()); +} + } // namespace internal } // namespace sequence_manager } // namespace base diff --git a/chromium/base/task/sequence_manager/work_queue_unittest.cc b/chromium/base/task/sequence_manager/work_queue_unittest.cc index 721f60fda7d..1ab88bf5065 100644 --- a/chromium/base/task/sequence_manager/work_queue_unittest.cc +++ b/chromium/base/task/sequence_manager/work_queue_unittest.cc @@ -73,23 +73,23 @@ class WorkQueueTest : public testing::Test { protected: Task FakeCancelableTaskWithEnqueueOrder(int enqueue_order, WeakPtr<Cancelable> weak_ptr) { - Task fake_task( - PostedTask(BindOnce(&Cancelable::NopTask, weak_ptr), FROM_HERE), - TimeTicks(), EnqueueOrder(), - EnqueueOrder::FromIntForTesting(enqueue_order)); + Task fake_task(PostedTask(nullptr, BindOnce(&Cancelable::NopTask, weak_ptr), + FROM_HERE), + TimeTicks(), EnqueueOrder(), + EnqueueOrder::FromIntForTesting(enqueue_order)); return fake_task; } Task FakeTaskWithEnqueueOrder(int enqueue_order) { - Task fake_task(PostedTask(BindOnce(&NopTask), FROM_HERE), TimeTicks(), - EnqueueOrder(), + Task fake_task(PostedTask(nullptr, BindOnce(&NopTask), FROM_HERE), + TimeTicks(), EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order)); return fake_task; } Task FakeNonNestableTaskWithEnqueueOrder(int enqueue_order) { - Task fake_task(PostedTask(BindOnce(&NopTask), FROM_HERE), TimeTicks(), - EnqueueOrder(), + Task fake_task(PostedTask(nullptr, BindOnce(&NopTask), FROM_HERE), + TimeTicks(), EnqueueOrder(), EnqueueOrder::FromIntForTesting(enqueue_order)); fake_task.nestable = Nestable::kNonNestable; return fake_task; @@ -555,6 +555,20 @@ TEST_F(WorkQueueTest, RemoveAllCanceledTasksFromFrontQueueBlockedByFence) { EXPECT_FALSE(work_queue_->GetFrontTaskEnqueueOrder(&enqueue_order)); } +TEST_F(WorkQueueTest, CollectTasksOlderThan) { + work_queue_->Push(FakeTaskWithEnqueueOrder(2)); + work_queue_->Push(FakeTaskWithEnqueueOrder(3)); + work_queue_->Push(FakeTaskWithEnqueueOrder(4)); + + std::vector<const Task*> result; + work_queue_->CollectTasksOlderThan(EnqueueOrder::FromIntForTesting(4), + &result); + + ASSERT_EQ(2u, result.size()); + EXPECT_EQ(2u, result[0]->enqueue_order()); + EXPECT_EQ(3u, result[1]->enqueue_order()); +} + } // namespace internal } // namespace sequence_manager } // namespace base diff --git a/chromium/base/task/simple_task_executor.cc b/chromium/base/task/simple_task_executor.cc new file mode 100644 index 00000000000..ffccf52be86 --- /dev/null +++ b/chromium/base/task/simple_task_executor.cc @@ -0,0 +1,62 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/task/simple_task_executor.h" + +namespace base { + +SimpleTaskExecutor::SimpleTaskExecutor( + scoped_refptr<SingleThreadTaskRunner> task_queue) + : task_queue_(std::move(task_queue)), + previous_task_executor_(GetTaskExecutorForCurrentThread()) { + DCHECK(task_queue_); + // The TaskExecutor API does not expect nesting, but this can happen in tests + // so we have to work around it here. + if (previous_task_executor_) + SetTaskExecutorForCurrentThread(nullptr); + SetTaskExecutorForCurrentThread(this); +} + +SimpleTaskExecutor::~SimpleTaskExecutor() { + if (previous_task_executor_) + SetTaskExecutorForCurrentThread(nullptr); + SetTaskExecutorForCurrentThread(previous_task_executor_); +} + +bool SimpleTaskExecutor::PostDelayedTask(const Location& from_here, + const TaskTraits& traits, + OnceClosure task, + TimeDelta delay) { + return task_queue_->PostDelayedTask(from_here, std::move(task), delay); +} + +scoped_refptr<TaskRunner> SimpleTaskExecutor::CreateTaskRunner( + const TaskTraits& traits) { + return task_queue_; +} + +scoped_refptr<SequencedTaskRunner> +SimpleTaskExecutor::CreateSequencedTaskRunner(const TaskTraits& traits) { + return task_queue_; +} + +scoped_refptr<SingleThreadTaskRunner> +SimpleTaskExecutor::CreateSingleThreadTaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) { + return task_queue_; +} + +#if defined(OS_WIN) +scoped_refptr<SingleThreadTaskRunner> +SimpleTaskExecutor::CreateCOMSTATaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) { + // It seems pretty unlikely this will be used on a comsta task thread. + NOTREACHED(); + return task_queue_; +} +#endif // defined(OS_WIN) + +} // namespace base diff --git a/chromium/base/task/simple_task_executor.h b/chromium/base/task/simple_task_executor.h new file mode 100644 index 00000000000..7d9a74dc5d2 --- /dev/null +++ b/chromium/base/task/simple_task_executor.h @@ -0,0 +1,52 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef BASE_TASK_SIMPLE_TASK_EXECUTOR_H_ +#define BASE_TASK_SIMPLE_TASK_EXECUTOR_H_ + +#include "base/task/task_executor.h" +#include "build/build_config.h" + +namespace base { + +// A simple TaskExecutor with exactly one SingleThreadTaskRunner. +// Must be instantiated and destroyed on the thread that runs tasks for the +// SingleThreadTaskRunner. +class BASE_EXPORT SimpleTaskExecutor : public TaskExecutor { + public: + explicit SimpleTaskExecutor(scoped_refptr<SingleThreadTaskRunner> task_queue); + + ~SimpleTaskExecutor() override; + + bool PostDelayedTask(const Location& from_here, + const TaskTraits& traits, + OnceClosure task, + TimeDelta delay) override; + + scoped_refptr<TaskRunner> CreateTaskRunner(const TaskTraits& traits) override; + + scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner( + const TaskTraits& traits) override; + + scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) override; + +#if defined(OS_WIN) + scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) override; +#endif // defined(OS_WIN) + + private: + const scoped_refptr<SingleThreadTaskRunner> task_queue_; + + // In tests there may already be a TaskExecutor registered for the thread, we + // keep tack of the previous TaskExecutor and restored it upon destruction. + TaskExecutor* const previous_task_executor_; +}; + +} // namespace base + +#endif // BASE_TASK_SIMPLE_TASK_EXECUTOR_H_ diff --git a/chromium/base/task/single_thread_task_executor.cc b/chromium/base/task/single_thread_task_executor.cc index 7e12b784faf..01d993be952 100644 --- a/chromium/base/task/single_thread_task_executor.cc +++ b/chromium/base/task/single_thread_task_executor.cc @@ -17,7 +17,8 @@ SingleThreadTaskExecutor::SingleThreadTaskExecutor(MessagePumpType type) .Build())), default_task_queue_(sequence_manager_->CreateTaskQueue( sequence_manager::TaskQueue::Spec("default_tq"))), - type_(type) { + type_(type), + simple_task_executor_(task_runner()) { sequence_manager_->SetDefaultTaskRunner(default_task_queue_->task_runner()); sequence_manager_->BindToMessagePump(MessagePump::Create(type)); diff --git a/chromium/base/task/single_thread_task_executor.h b/chromium/base/task/single_thread_task_executor.h index d350420d797..c048a830cf1 100644 --- a/chromium/base/task/single_thread_task_executor.h +++ b/chromium/base/task/single_thread_task_executor.h @@ -11,6 +11,7 @@ #include "base/memory/scoped_refptr.h" #include "base/message_loop/message_pump_type.h" #include "base/single_thread_task_runner.h" +#include "base/task/simple_task_executor.h" namespace base { @@ -40,6 +41,7 @@ class BASE_EXPORT SingleThreadTaskExecutor { std::unique_ptr<sequence_manager::SequenceManager> sequence_manager_; scoped_refptr<sequence_manager::TaskQueue> default_task_queue_; MessagePumpType type_; + SimpleTaskExecutor simple_task_executor_; DISALLOW_COPY_AND_ASSIGN(SingleThreadTaskExecutor); }; diff --git a/chromium/base/task/single_thread_task_executor_unittest.cc b/chromium/base/task/single_thread_task_executor_unittest.cc new file mode 100644 index 00000000000..b64294f404f --- /dev/null +++ b/chromium/base/task/single_thread_task_executor_unittest.cc @@ -0,0 +1,58 @@ +// Copyright 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/task/single_thread_task_executor.h" + +#include "base/run_loop.h" +#include "base/task/post_task.h" +#include "base/test/bind_test_util.h" +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" + +using ::testing::IsNull; +using ::testing::NotNull; + +namespace base { + +TEST(SingleThreadTaskExecutorTest, GetTaskExecutorForCurrentThread) { + EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull()); + + { + SingleThreadTaskExecutor single_thread_task_executor; + EXPECT_THAT(GetTaskExecutorForCurrentThread(), NotNull()); + } + + EXPECT_THAT(GetTaskExecutorForCurrentThread(), IsNull()); +} + +TEST(SingleThreadTaskExecutorTest, + GetTaskExecutorForCurrentThreadInPostedTask) { + SingleThreadTaskExecutor single_thread_task_executor; + TaskExecutor* task_executor = GetTaskExecutorForCurrentThread(); + + EXPECT_THAT(task_executor, NotNull()); + + RunLoop run_loop; + single_thread_task_executor.task_runner()->PostTask( + FROM_HERE, BindLambdaForTesting([&]() { + EXPECT_EQ(GetTaskExecutorForCurrentThread(), task_executor); + run_loop.Quit(); + })); + + run_loop.Run(); +} + +TEST(SingleThreadTaskExecutorTest, CurrentThread) { + SingleThreadTaskExecutor single_thread_task_executor; + + EXPECT_EQ(single_thread_task_executor.task_runner(), + base::CreateSingleThreadTaskRunner({base::CurrentThread()})); + + // There's only one task queue so priority is ignored. + EXPECT_EQ(single_thread_task_executor.task_runner(), + base::CreateSingleThreadTaskRunner( + {base::CurrentThread(), base::TaskPriority::BEST_EFFORT})); +} + +} // namespace base diff --git a/chromium/base/task/task_executor.cc b/chromium/base/task/task_executor.cc index 8b527b0cd5b..355fa3f445f 100644 --- a/chromium/base/task/task_executor.cc +++ b/chromium/base/task/task_executor.cc @@ -6,8 +6,10 @@ #include <type_traits> +#include "base/no_destructor.h" #include "base/task/task_traits.h" #include "base/task/task_traits_extension.h" +#include "base/threading/thread_local.h" namespace base { @@ -30,6 +32,21 @@ static_assert( } // namespace +ThreadLocalPointer<TaskExecutor>* GetTLSForCurrentTaskExecutor() { + static NoDestructor<ThreadLocalPointer<TaskExecutor>> instance; + return instance.get(); +} + +void SetTaskExecutorForCurrentThread(TaskExecutor* task_executor) { + DCHECK(!task_executor || !GetTLSForCurrentTaskExecutor()->Get() || + GetTLSForCurrentTaskExecutor()->Get() == task_executor); + GetTLSForCurrentTaskExecutor()->Set(task_executor); +} + +TaskExecutor* GetTaskExecutorForCurrentThread() { + return GetTLSForCurrentTaskExecutor()->Get(); +} + void RegisterTaskExecutor(uint8_t extension_id, TaskExecutor* task_executor) { DCHECK_NE(extension_id, TaskTraitsExtensionStorage::kInvalidExtensionId); DCHECK_LE(extension_id, TaskTraitsExtensionStorage::kMaxExtensionId); @@ -57,4 +74,4 @@ TaskExecutor* GetRegisteredTaskExecutorForTraits(const TaskTraits& traits) { return nullptr; } -} // namespace base
\ No newline at end of file +} // namespace base diff --git a/chromium/base/task/task_executor.h b/chromium/base/task/task_executor.h index b4e79e14c9a..a062fdc4391 100644 --- a/chromium/base/task/task_executor.h +++ b/chromium/base/task/task_executor.h @@ -74,6 +74,13 @@ void BASE_EXPORT RegisterTaskExecutor(uint8_t extension_id, TaskExecutor* task_executor); void BASE_EXPORT UnregisterTaskExecutorForTesting(uint8_t extension_id); +// Stores the provided TaskExecutor in TLS for the current thread, to be used by +// tasks with the CurrentThread() trait. +void BASE_EXPORT SetTaskExecutorForCurrentThread(TaskExecutor* task_executor); + +// Returns the task executor registered for the current thread. +BASE_EXPORT TaskExecutor* GetTaskExecutorForCurrentThread(); + // Determines whether a registered TaskExecutor will handle tasks with the given // |traits| and, if so, returns a pointer to it. Otherwise, returns |nullptr|. TaskExecutor* GetRegisteredTaskExecutorForTraits(const TaskTraits& traits); diff --git a/chromium/base/task/task_traits.h b/chromium/base/task/task_traits.h index c1e52ed0f07..03abf17913f 100644 --- a/chromium/base/task/task_traits.h +++ b/chromium/base/task/task_traits.h @@ -185,6 +185,14 @@ struct WithBaseSyncPrimitives {}; // between tasks, see base::PostTask::CreateSequencedTaskRunner. struct ThreadPool {}; +// Tasks and task runners with this thread will run tasks on the virtual thread +// (sequence) they are posted/created from. Other traits may be specified +// alongside this one to refine properties for the associated tasks +// (e.g. base::TaskPriority or content::BrowserTaskType) as long as those traits +// are compatible with the current thread (e.g. cannot specify base::MayBlock() +// on a non-blocking thread or alter base::TaskShutdownBehavior). +struct CurrentThread {}; + // Describes metadata for a single task or a group of tasks. class BASE_EXPORT TaskTraits { public: @@ -196,6 +204,7 @@ class BASE_EXPORT TaskTraits { ValidTrait(MayBlock); ValidTrait(WithBaseSyncPrimitives); ValidTrait(ThreadPool); + ValidTrait(CurrentThread); }; // Invoking this constructor without arguments produces TaskTraits that are @@ -255,15 +264,22 @@ class BASE_EXPORT TaskTraits { may_block_(trait_helpers::HasTrait<MayBlock, ArgTypes...>()), with_base_sync_primitives_( trait_helpers::HasTrait<WithBaseSyncPrimitives, ArgTypes...>()), - use_thread_pool_(trait_helpers::HasTrait<ThreadPool, ArgTypes...>()) { + use_thread_pool_(trait_helpers::HasTrait<ThreadPool, ArgTypes...>()), + use_current_thread_( + trait_helpers::HasTrait<CurrentThread, ArgTypes...>()) { constexpr bool has_thread_pool = trait_helpers::HasTrait<ThreadPool, ArgTypes...>(); constexpr bool has_extension = !trait_helpers::AreValidTraits<ValidTrait, ArgTypes...>::value; + constexpr bool has_current_thread = + trait_helpers::HasTrait<CurrentThread, ArgTypes...>(); + static_assert( + !has_current_thread || !has_thread_pool, + "base::CurrentThread is mutually exclusive with base::ThreadPool"); static_assert( - has_thread_pool ^ has_extension, + has_thread_pool ^ has_extension || has_current_thread, "Traits must explicitly specify a destination (e.g. ThreadPool or a " - "named thread like BrowserThread)"); + "named thread like BrowserThread, or CurrentThread)"); } constexpr TaskTraits(const TaskTraits& other) = default; @@ -271,14 +287,15 @@ class BASE_EXPORT TaskTraits { // TODO(eseckler): Default the comparison operator once C++20 arrives. bool operator==(const TaskTraits& other) const { - static_assert(sizeof(TaskTraits) == 15, + static_assert(sizeof(TaskTraits) == 16, "Update comparison operator when TaskTraits change"); return extension_ == other.extension_ && priority_ == other.priority_ && shutdown_behavior_ == other.shutdown_behavior_ && thread_policy_ == other.thread_policy_ && may_block_ == other.may_block_ && with_base_sync_primitives_ == other.with_base_sync_primitives_ && - use_thread_pool_ == other.use_thread_pool_; + use_thread_pool_ == other.use_thread_pool_ && + use_current_thread_ == other.use_current_thread_; } // Sets the priority of tasks with these traits to |priority|. @@ -335,6 +352,10 @@ class BASE_EXPORT TaskTraits { // Returns true if tasks with these traits execute on the thread pool. constexpr bool use_thread_pool() const { return use_thread_pool_; } + // Returns true if tasks with these traits execute on the virtual thread + // (sequence) they are posted/created from. + constexpr bool use_current_thread() const { return use_current_thread_; } + uint8_t extension_id() const { return extension_.extension_id; } // Access the extension data by parsing it into the provided extension type. @@ -353,6 +374,7 @@ class BASE_EXPORT TaskTraits { TaskPriority priority, bool may_block, bool use_thread_pool, + bool use_current_thread, TaskTraitsExtensionStorage extension) : extension_(extension), priority_(static_cast<uint8_t>(priority) | @@ -362,8 +384,9 @@ class BASE_EXPORT TaskTraits { thread_policy_(static_cast<uint8_t>(ThreadPolicy::PREFER_BACKGROUND)), may_block_(may_block), with_base_sync_primitives_(false), - use_thread_pool_(use_thread_pool) { - static_assert(sizeof(TaskTraits) == 15, "Keep this constructor up to date"); + use_thread_pool_(use_thread_pool), + use_current_thread_(use_current_thread) { + static_assert(sizeof(TaskTraits) == 16, "Keep this constructor up to date"); } // This bit is set in |priority_|, |shutdown_behavior_| and |thread_policy_| @@ -378,6 +401,7 @@ class BASE_EXPORT TaskTraits { bool may_block_; bool with_base_sync_primitives_; bool use_thread_pool_; + bool use_current_thread_; }; // Returns string literals for the enums defined in this file. These methods diff --git a/chromium/base/task/task_traits_unittest.nc b/chromium/base/task/task_traits_unittest.nc index 8755ca028ff..73fc7be5506 100644 --- a/chromium/base/task/task_traits_unittest.nc +++ b/chromium/base/task/task_traits_unittest.nc @@ -26,6 +26,8 @@ constexpr TaskTraits traits = {TaskShutdownBehavior::BLOCK_SHUTDOWN, TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN}; #elif defined(NCTEST_TASK_TRAITS_INVALID_TYPE) // [r"no matching constructor for initialization of 'const base::TaskTraits'"] constexpr TaskTraits traits = {TaskShutdownBehavior::BLOCK_SHUTDOWN, true}; +#elif defined(NCTEST_TASK_TRAITS_CURRENT_THREAD_AND_THREADPOOL) // [r"base::CurrentThread is mutually exclusive with base::ThreadPool"] +constexpr TaskTraits traits = {ThreadPool(), CurrentThread()}; #endif } // namespace base diff --git a/chromium/base/task/thread_pool/historical_histogram_data.md b/chromium/base/task/thread_pool/historical_histogram_data.md new file mode 100644 index 00000000000..d253731b6fb --- /dev/null +++ b/chromium/base/task/thread_pool/historical_histogram_data.md @@ -0,0 +1,92 @@ +# Historical Histogram Data + +This page presents data captured from `base::ThreadPool` histograms at a given +point in time so it can be used in future design decisions. + +All data is 28-day aggregation on Stable channel. + +## Number of tasks between waits + +Number of tasks between two waits by a foreground worker thread in a +browser/renderer process. + +Histogram name: ThreadPool.NumTasksBetweenWaits.(Browser/Renderer).Foreground +Date: August 2019 +Values in tables below are percentiles. + +### Windows + +| Number of tasks | Browser process | Renderer process | +|-----------------|-----------------|------------------| +| 1 | 87 | 92 | +| 2 | 95 | 98 | +| 5 | 99 | 100 | + +### Mac + +| Number of tasks | Browser process | Renderer process | +|-----------------|-----------------|------------------| +| 1 | 81 | 90 | +| 2 | 92 | 97 | +| 5 | 98 | 100 | + +### Android + +| Number of tasks | Browser process | Renderer process | +|-----------------|-----------------|------------------| +| 1 | 92 | 96 | +| 2 | 97 | 98 | +| 5 | 99 | 100 | + + +## Number of tasks run while queueing + +Number of tasks run by ThreadPool while task was queuing (from time task was +posted until time it was run). Recorded for dummy heartbeat tasks in the +*browser* process. The heartbeat recording avoids dependencies between this +report and other work in the system. + +Histogram name: ThreadPool.NumTasksRunWhileQueuing.Browser.* +Date: September 2019 +Values in tables below are percentiles. + +Note: In *renderer* processes, on all platforms/priorities, 0 tasks are run +while queuing at 99.5th percentile. + +### Windows + +| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT | +|-----------------|---------------|--------------|-------------| +| 0 | 95 | 93 | 90 | +| 1 | 98 | 95 | 92 | +| 2 | 99 | 96 | 93 | +| 5 | 100 | 98 | 95 | + +### Mac + +| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT | +|-----------------|---------------|--------------|-------------| +| 0 | 100 | 100 | 99 | +| 1 | 100 | 100 | 99 | +| 2 | 100 | 100 | 99 | +| 5 | 100 | 100 | 100 | + +### Android + +| Number of tasks | USER_BLOCKING | USER_VISIBLE | BEST_EFFORT | +|-----------------|---------------|--------------|-------------| +| 0 | 99 | 98 | 97 | +| 1 | 100 | 99 | 99 | +| 2 | 100 | 99 | 99 | +| 5 | 100 | 100 | 100 | + +### Chrome OS + +For all priorities, 0 tasks are run while queueing at 99.5th percentile. + +### Analysis + +The number of tasks that run while a BEST_EFFORT task is queued is unexpectedly +low. We should explore creating threads less aggressively, at the expense of +keeping BEST_EFFORT tasks in the queue for a longer time. See +[Bug 906079](https://crbug.com/906079). diff --git a/chromium/base/task/thread_pool/initialization_util.cc b/chromium/base/task/thread_pool/initialization_util.cc index d70c4c3d27d..6fd197300c9 100644 --- a/chromium/base/task/thread_pool/initialization_util.cc +++ b/chromium/base/task/thread_pool/initialization_util.cc @@ -6,6 +6,7 @@ #include <algorithm> +#include "base/numerics/ranges.h" #include "base/system/sys_info.h" namespace base { @@ -16,7 +17,7 @@ int RecommendedMaxNumberOfThreadsInThreadGroup(int min, int offset) { const int num_of_cores = SysInfo::NumberOfProcessors(); const int threads = std::ceil<int>(num_of_cores * cores_multiplier) + offset; - return std::min(max, std::max(min, threads)); + return ClampToRange(threads, min, max); } } // namespace base diff --git a/chromium/base/task/thread_pool/job_task_source.cc b/chromium/base/task/thread_pool/job_task_source.cc index c6e5f9733c5..be52945d0d9 100644 --- a/chromium/base/task/thread_pool/job_task_source.cc +++ b/chromium/base/task/thread_pool/job_task_source.cc @@ -19,6 +19,120 @@ namespace base { namespace internal { +// Memory ordering on |state_| operations +// +// The write operation on |state_| in WillRunTask() uses +// std::memory_order_release, matched by std::memory_order_acquire on read +// operations (in DidProcessTask()) to establish a +// Release-Acquire ordering. When a call to WillRunTask() is caused by an +// increase of max concurrency followed by an associated +// NotifyConcurrencyIncrease(), the priority queue lock guarantees an +// happens-after relation with NotifyConcurrencyIncrease(). This ensures that an +// increase of max concurrency that happened-before NotifyConcurrencyIncrease() +// is visible to a read operation that happens-after WillRunTask(). +// +// In DidProcessTask(), this is necessary to +// ensure that the task source is always re-enqueued when it needs to. When the +// task source needs to be queued, either because the current task yielded or +// because of NotifyConcurrencyIncrease(), one of the following is true: +// A) DidProcessTask() happens-after WillRunTask(): +// T1: Current task returns (because it is done) or yields. +// T2: Increases the value returned by GetMaxConcurrency() +// NotifyConcurrencyIncrease() enqueues the task source +// T3: WillRunTask(), in response to the concurrency increase - Release +// Does not keep the TaskSource in PriorityQueue because it is at max +// concurrency +// T1: DidProcessTask() - Acquire - Because of memory barrier, sees the same +// (or newer) max concurrency as T2 +// Re-enqueues the TaskSource because no longer at max concurrency +// Without the memory barrier, T1 may see an outdated max concurrency that +// is lower than the actual max concurrency and won't re-enqueue the +// task source, because it thinks it's already saturated. +// The task source often needs to be re-enqueued if its task +// completed because it yielded and |max_concurrency| wasn't decreased. +// B) DidProcessTask() happens-before WillRunTask(): +// T1: Current task returns (because it is done) or yields +// T2: Increases the value returned by GetMaxConcurrency() +// NotifyConcurrencyIncrease() enqueues the task source +// T1: DidProcessTask() - Acquire (ineffective) +// Since the task source is already in the queue, it doesn't matter +// whether T1 re-enqueues the task source or not. +// Note that stale values the other way around can cause incorrectly +// re-enqueuing this task_source, which is not an issue because the queues +// support empty task sources. + +JobTaskSource::State::State() = default; +JobTaskSource::State::~State() = default; + +JobTaskSource::State::Value JobTaskSource::State::Cancel() { + return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)}; +} + +JobTaskSource::State::Value +JobTaskSource::State::TryIncrementWorkerCountFromWorkerRelease( + size_t max_concurrency) { + uint32_t value_before_add = value_.load(std::memory_order_relaxed); + + // std::memory_order_release on success to establish Release-Acquire ordering + // with DecrementWorkerCountAcquire() (see Memory Ordering comment at top of + // the file). + while (!(value_before_add & kCanceledMask) && + (value_before_add >> kWorkerCountBitOffset) < max_concurrency && + !value_.compare_exchange_weak( + value_before_add, value_before_add + kWorkerCountIncrement, + std::memory_order_release, std::memory_order_relaxed)) { + } + return {value_before_add}; +} + +JobTaskSource::State::Value +JobTaskSource::State::DecrementWorkerCountFromWorkerAcquire() { + const size_t value_before_sub = + value_.fetch_sub(kWorkerCountIncrement, std::memory_order_acquire); + DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0); + return {value_before_sub}; +} + +JobTaskSource::State::Value +JobTaskSource::State::IncrementWorkerCountFromJoiningThread() { + size_t value_before_add = + value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed); + return {value_before_add}; +} + +JobTaskSource::State::Value +JobTaskSource::State::DecrementWorkerCountFromJoiningThread() { + const size_t value_before_sub = + value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed); + DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0); + return {value_before_sub}; +} + +JobTaskSource::State::Value JobTaskSource::State::Load() const { + return {value_.load(std::memory_order_relaxed)}; +} + +JobTaskSource::JoinFlag::JoinFlag() = default; +JobTaskSource::JoinFlag::~JoinFlag() = default; + +void JobTaskSource::JoinFlag::SetWaiting() { + const auto previous_value = + value_.exchange(kWaitingForWorkerToYield, std::memory_order_relaxed); + DCHECK(previous_value == kNotWaiting); +} + +bool JobTaskSource::JoinFlag::ShouldWorkerYield() { + // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was + // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged. + return value_.fetch_and(kWaitingForWorkerToSignal, + std::memory_order_relaxed) == + kWaitingForWorkerToYield; +} + +bool JobTaskSource::JoinFlag::ShouldWorkerSignal() { + return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting; +} + JobTaskSource::JobTaskSource( const Location& from_here, const TaskTraits& traits, @@ -28,64 +142,122 @@ JobTaskSource::JobTaskSource( : TaskSource(traits, nullptr, TaskSourceExecutionMode::kJob), from_here_(from_here), max_concurrency_callback_(std::move(max_concurrency_callback)), - worker_task_(base::BindRepeating( - [](JobTaskSource* self, - const RepeatingCallback<void(experimental::JobDelegate*)>& - worker_task) { + worker_task_(std::move(worker_task)), + primary_task_(base::BindRepeating( + [](JobTaskSource* self) { // Each worker task has its own delegate with associated state. experimental::JobDelegate job_delegate{self, self->delegate_}; - worker_task.Run(&job_delegate); + self->worker_task_.Run(&job_delegate); }, - base::Unretained(this), - std::move(worker_task))), + base::Unretained(this))), queue_time_(TimeTicks::Now()), delegate_(delegate) { DCHECK(delegate_); } JobTaskSource::~JobTaskSource() { -#if DCHECK_IS_ON() - auto worker_count = worker_count_.load(std::memory_order_relaxed); // Make sure there's no outstanding active run operation left. - DCHECK(worker_count == 0U || worker_count == kInvalidWorkerCount) - << worker_count; -#endif + DCHECK_EQ(state_.Load().worker_count(), 0U); } ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() { return {SequenceToken::Create(), nullptr}; } +bool JobTaskSource::WillJoin() { + { + CheckedAutoLock auto_lock(lock_); + DCHECK(!worker_released_condition_); // This may only be called once. + worker_released_condition_ = lock_.CreateConditionVariable(); + } + // std::memory_order_relaxed on |worker_count_| is sufficient because call to + // GetMaxConcurrency() is used for a best effort early exit. Stale values will + // only cause WaitForParticipationOpportunity() to be called. + const auto state_before_add = state_.IncrementWorkerCountFromJoiningThread(); + + if (!state_before_add.is_canceled() && + state_before_add.worker_count() < GetMaxConcurrency()) { + return true; + } + return WaitForParticipationOpportunity(); +} + +bool JobTaskSource::RunJoinTask() { + experimental::JobDelegate job_delegate{this, nullptr}; + worker_task_.Run(&job_delegate); + + // std::memory_order_relaxed on |worker_count_| is sufficient because the call + // to GetMaxConcurrency() is used for a best effort early exit. Stale values + // will only cause WaitForParticipationOpportunity() to be called. + const auto state = state_.Load(); + if (!state.is_canceled() && state.worker_count() <= GetMaxConcurrency()) + return true; + + return WaitForParticipationOpportunity(); +} + +void JobTaskSource::Cancel(TaskSource::Transaction* transaction) { + // Sets the kCanceledMask bit on |state_| so that further calls to + // WillRunTask() never succeed. std::memory_order_relaxed is sufficient + // because this task source never needs to be re-enqueued after Cancel(). + state_.Cancel(); +} + +bool JobTaskSource::WaitForParticipationOpportunity() { + CheckedAutoLock auto_lock(lock_); + + // std::memory_order_relaxed is sufficient because no other state is + // synchronized with |state_| outside of |lock_|. + auto state = state_.Load(); + size_t max_concurrency = GetMaxConcurrency(); + + // Wait until either: + // A) |worker_count| is below or equal to max concurrency and state is not + // canceled. + // B) All other workers returned and |worker_count| is 1. + while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) || + state.worker_count() == 1)) { + // std::memory_order_relaxed is sufficient because no other state is + // synchronized with |join_flag_| outside of |lock_|. + join_flag_.SetWaiting(); + + // To avoid unnecessarily waiting, if either condition A) or B) change + // |lock_| is taken and |worker_released_condition_| signaled if necessary: + // 1- In DidProcessTask(), after worker count is decremented. + // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase. + worker_released_condition_->Wait(); + state = state_.Load(); + max_concurrency = GetMaxConcurrency(); + } + // Case A: + if (state.worker_count() <= max_concurrency && !state.is_canceled()) + return true; + // Case B: + // Only the joining thread remains. + DCHECK_EQ(state.worker_count(), 1U); + DCHECK(state.is_canceled() || max_concurrency == 0U); + state_.DecrementWorkerCountFromJoiningThread(); + return false; +} + TaskSource::RunStatus JobTaskSource::WillRunTask() { - // When this call is caused by an increase of max concurrency followed by an - // associated NotifyConcurrencyIncrease(), the priority queue lock guarantees - // an happens-after relation with NotifyConcurrencyIncrease(). The memory - // operations on |worker_count| below and in DidProcessTask() use - // std::memory_order_release and std::memory_order_acquire respectively to - // establish a Release-Acquire ordering. This ensures that all memory - // side-effects made before this point, including an increase of max - // concurrency followed by NotifyConcurrencyIncrease() are visible to a - // DidProcessTask() call which is ordered after this one. const size_t max_concurrency = GetMaxConcurrency(); - size_t worker_count_before_add = - worker_count_.load(std::memory_order_relaxed); - - // std::memory_order_release on success to make the newest |max_concurrency| - // visible to a thread that calls DidProcessTask() containing a matching - // std::memory_order_acquire. - while (worker_count_before_add < max_concurrency && - !worker_count_.compare_exchange_weak( - worker_count_before_add, worker_count_before_add + 1, - std::memory_order_release, std::memory_order_relaxed)) { - } + // std::memory_order_release on success to establish Release-Acquire ordering + // with read operations (see Memory Ordering comment at top of the file). + const auto state_before_add = + state_.TryIncrementWorkerCountFromWorkerRelease(max_concurrency); + // Don't allow this worker to run the task if either: - // A) |worker_count_| is already at |max_concurrency|. - // B) |max_concurrency| was lowered below or to |worker_count_|. - // C) |worker_count_| was invalidated. - if (worker_count_before_add >= max_concurrency) { - // The caller is prevented from running a task from this TaskSource. + // A) |state_| was canceled. + // B) |worker_count| is already at |max_concurrency|. + // C) |max_concurrency| was lowered below or to |worker_count|. + // Case A: + if (state_before_add.is_canceled()) + return RunStatus::kDisallowed; + const size_t worker_count_before_add = state_before_add.worker_count(); + // Case B) or C): + if (worker_count_before_add >= max_concurrency) return RunStatus::kDisallowed; - } DCHECK_LT(worker_count_before_add, max_concurrency); return max_concurrency == worker_count_before_add + 1 @@ -96,15 +268,23 @@ TaskSource::RunStatus JobTaskSource::WillRunTask() { size_t JobTaskSource::GetRemainingConcurrency() const { // std::memory_order_relaxed is sufficient because no other state is // synchronized with GetRemainingConcurrency(). - const size_t worker_count = worker_count_.load(std::memory_order_relaxed); + const auto state = state_.Load(); const size_t max_concurrency = GetMaxConcurrency(); // Avoid underflows. - if (worker_count > max_concurrency) + if (state.is_canceled() || state.worker_count() > max_concurrency) return 0; - return max_concurrency - worker_count; + return max_concurrency - state.worker_count(); } void JobTaskSource::NotifyConcurrencyIncrease() { + { + // Lock is taken to access |join_flag_| below and signal + // |worker_released_condition_|. + CheckedAutoLock auto_lock(lock_); + if (join_flag_.ShouldWorkerSignal()) + worker_released_condition_->Signal(); + } + #if DCHECK_IS_ON() { AutoLock auto_lock(version_lock_); @@ -125,6 +305,14 @@ size_t JobTaskSource::GetMaxConcurrency() const { return max_concurrency_callback_.Run(); } +bool JobTaskSource::ShouldYield() { + // It is safe to read |join_flag_| without a lock since this + // variable is atomic, keeping in mind that threads may not immediately see + // the new value when it is updated. + return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() || + state_.Load().is_canceled(); +} + #if DCHECK_IS_ON() size_t JobTaskSource::GetConcurrencyIncreaseVersion() const { @@ -151,45 +339,36 @@ bool JobTaskSource::WaitForConcurrencyIncreaseUpdate(size_t recorded_version) { #endif // DCHECK_IS_ON() Optional<Task> JobTaskSource::TakeTask(TaskSource::Transaction* transaction) { - DCHECK_GT(worker_count_.load(std::memory_order_relaxed), 0U); - DCHECK(worker_task_); - return base::make_optional<Task>(from_here_, worker_task_, TimeDelta()); + // JobTaskSource members are not lock-protected so no need to acquire a lock + // if |transaction| is nullptr. + DCHECK_GT(state_.Load().worker_count(), 0U); + DCHECK(primary_task_); + return base::make_optional<Task>(from_here_, primary_task_, TimeDelta()); } bool JobTaskSource::DidProcessTask(TaskSource::Transaction* transaction) { - size_t worker_count_before_sub = - worker_count_.load(std::memory_order_relaxed); - - // std::memory_order_acquire on |worker_count_| is necessary to establish - // Release-Acquire ordering (see WillRunTask()). - // When the task source needs to be queued, either because the current task - // yielded or because of NotifyConcurrencyIncrease(), one of the following is - // true: - // A) The JobTaskSource is already in the queue (no worker picked up the - // extra work yet): Incorrectly returning false is fine and the memory - // barrier may be ineffective. - // B) The JobTaskSource() is no longer in the queue: The Release-Acquire - // ordering with WillRunTask() established by |worker_count| ensures that - // the upcoming call for GetMaxConcurrency() happens-after any - // NotifyConcurrencyIncrease() that happened-before WillRunTask(). If - // this task completed because it yielded, this barrier guarantees that - // it sees an up-to-date concurrency value and correctly re-enqueues. - // - // Note that stale values the other way around (incorrectly re-enqueuing) are - // not an issue because the queues support empty task sources. - while (worker_count_before_sub != kInvalidWorkerCount && - !worker_count_.compare_exchange_weak( - worker_count_before_sub, worker_count_before_sub - 1, - std::memory_order_acquire, std::memory_order_relaxed)) { - } - if (worker_count_before_sub == kInvalidWorkerCount) + // Lock is needed to access |join_flag_| below and signal + // |worker_released_condition_|. If |transaction|, then |lock_| is already + // taken. + CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); + AnnotateAcquiredLockAlias annotate(lock_, lock_); + + // std::memory_order_acquire to establish Release-Acquire ordering with + // WillRunTask() (see Memory Ordering comment at top of the file). + const auto state_before_sub = state_.DecrementWorkerCountFromWorkerAcquire(); + + if (join_flag_.ShouldWorkerSignal()) + worker_released_condition_->Signal(); + + // A canceled task source should never get re-enqueued. + if (state_before_sub.is_canceled()) return false; - DCHECK_GT(worker_count_before_sub, 0U); + DCHECK_GT(state_before_sub.worker_count(), 0U); // Re-enqueue the TaskSource if the task ran and the worker count is below the // max concurrency. - return worker_count_before_sub <= GetMaxConcurrency(); + return state_before_sub.worker_count() <= GetMaxConcurrency(); } SequenceSortKey JobTaskSource::GetSortKey() const { @@ -197,12 +376,7 @@ SequenceSortKey JobTaskSource::GetSortKey() const { } Optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) { - // Invalidate |worker_count_| so that further calls to WillRunTask() never - // succeed. std::memory_order_relaxed is sufficient because this task source - // never needs to be re-enqueued after Clear(). - size_t worker_count_before_store = - worker_count_.exchange(kInvalidWorkerCount, std::memory_order_relaxed); - DCHECK_GT(worker_count_before_store, 0U); + Cancel(); // Nothing is cleared since other workers might still racily run tasks. For // simplicity, the destructor will take care of it once all references are // released. diff --git a/chromium/base/task/thread_pool/job_task_source.h b/chromium/base/task/thread_pool/job_task_source.h index ddaca19b343..57838679e7a 100644 --- a/chromium/base/task/thread_pool/job_task_source.h +++ b/chromium/base/task/thread_pool/job_task_source.h @@ -38,10 +38,33 @@ class BASE_EXPORT JobTaskSource : public TaskSource { RepeatingCallback<size_t()> max_concurrency_callback, PooledTaskRunnerDelegate* delegate); + static experimental::JobHandle CreateJobHandle( + scoped_refptr<internal::JobTaskSource> task_source) { + return experimental::JobHandle(std::move(task_source)); + } + // Notifies this task source that max concurrency was increased, and the // number of worker should be adjusted. void NotifyConcurrencyIncrease(); + // Informs this JobTaskSource that the current thread would like to join and + // contribute to running |worker_task|. Returns true if the joining thread can + // contribute (RunJoinTask() can be called), or false if joining was completed + // and all other workers returned because either there's no work remaining or + // Job was cancelled. + bool WillJoin(); + + // Contributes to running |worker_task| and returns true if the joining thread + // can contribute again (RunJoinTask() can be called again), or false if + // joining was completed and all other workers returned because either there's + // no work remaining or Job was cancelled. This should be called only after + // WillJoin() or RunJoinTask() previously returned true. + bool RunJoinTask(); + + // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() + // to return RunStatus::kDisallowed. + void Cancel(TaskSource::Transaction* transaction = nullptr); + // TaskSource: ExecutionEnvironment GetExecutionEnvironment() override; size_t GetRemainingConcurrency() const override; @@ -50,6 +73,12 @@ class BASE_EXPORT JobTaskSource : public TaskSource { // concurrently. size_t GetMaxConcurrency() const; + // Returns true if a worker should return from the worker task on the current + // thread ASAP. + bool ShouldYield(); + + PooledTaskRunnerDelegate* delegate() const { return delegate_; } + #if DCHECK_IS_ON() size_t GetConcurrencyIncreaseVersion() const; // Returns true if the concurrency version was updated above @@ -58,11 +87,98 @@ class BASE_EXPORT JobTaskSource : public TaskSource { #endif // DCHECK_IS_ON() private: - static constexpr size_t kInvalidWorkerCount = - std::numeric_limits<size_t>::max(); + // Atomic internal state to track the number of workers running a task from + // this JobTaskSource and whether this JobTaskSource is canceled. + class State { + public: + static constexpr size_t kCanceledMask = 1; + static constexpr size_t kWorkerCountBitOffset = 1; + static constexpr size_t kWorkerCountIncrement = 1 << kWorkerCountBitOffset; + + struct Value { + size_t worker_count() const { return value >> kWorkerCountBitOffset; } + // Returns true if canceled. + bool is_canceled() const { return value & kCanceledMask; } + + uint32_t value; + }; + + State(); + ~State(); + + // Sets as canceled using std::memory_order_relaxed. Returns the state + // before the operation. + Value Cancel(); + + // Increments the worker count by 1 if smaller than |max_concurrency| and if + // |!is_canceled()|, using std::memory_order_release, and returns the state + // before the operation. Equivalent to Load() otherwise. + Value TryIncrementWorkerCountFromWorkerRelease(size_t max_concurrency); + + // Decrements the worker count by 1 using std::memory_order_acquire. Returns + // the state before the operation. + Value DecrementWorkerCountFromWorkerAcquire(); + + // Increments the worker count by 1 using std::memory_order_relaxed. Returns + // the state before the operation. + Value IncrementWorkerCountFromJoiningThread(); + + // Decrements the worker count by 1 using std::memory_order_relaxed. Returns + // the state before the operation. + Value DecrementWorkerCountFromJoiningThread(); + + // Loads and returns the state, using std::memory_order_relaxed. + Value Load() const; + + private: + std::atomic<uint32_t> value_{0}; + }; + + // Atomic flag that indicates if the joining thread is currently waiting on + // another worker to yield or to signal. + class JoinFlag { + public: + static constexpr uint32_t kNotWaiting = 0; + static constexpr uint32_t kWaitingForWorkerToSignal = 1; + static constexpr uint32_t kWaitingForWorkerToYield = 3; + // kWaitingForWorkerToYield is 3 because the impl relies on the following + // property. + static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == + kWaitingForWorkerToSignal, + ""); + + JoinFlag(); + ~JoinFlag(); + + // Sets the status as kWaitingForWorkerToYield using + // std::memory_order_relaxed. + void SetWaiting(); + + // If the flag is kWaitingForWorkerToYield, returns true indicating that the + // worker should yield, and atomically updates to kWaitingForWorkerToSignal + // (using std::memory_order_relaxed) to ensure that a single worker yields + // in response to SetWaiting(). + bool ShouldWorkerYield(); + + // If the flag is kWaiting*, returns true indicating that the worker should + // signal, and atomically updates to kNotWaiting (using + // std::memory_order_relaxed) to ensure that a single worker signals in + // response to SetWaiting(). + bool ShouldWorkerSignal(); + + private: + std::atomic<uint32_t> value_{kNotWaiting}; + }; ~JobTaskSource() override; + // Called from the joining thread. Waits for the worker count to be below or + // equal to max concurrency (will happen when a worker calls + // DidProcessTask()). Returns true if the joining thread should run a task, or + // false if joining was completed and all other workers returned because + // either there's no work remaining or Job was cancelled. + bool WaitForParticipationOpportunity(); + // TaskSource: RunStatus WillRunTask() override; Optional<Task> TakeTask(TaskSource::Transaction* transaction) override; @@ -70,13 +186,23 @@ class BASE_EXPORT JobTaskSource : public TaskSource { bool DidProcessTask(TaskSource::Transaction* transaction) override; SequenceSortKey GetSortKey() const override; - // The current number of workers concurrently running tasks from this - // TaskSource. - std::atomic_size_t worker_count_{0U}; + // Current atomic state. + State state_; + // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() + // hence the use of atomics. + JoinFlag join_flag_ GUARDED_BY(lock_); + // Signaled when |join_flag_| is kWaiting* and a worker returns. + std::unique_ptr<ConditionVariable> worker_released_condition_ + GUARDED_BY(lock_); const Location from_here_; - base::RepeatingCallback<size_t()> max_concurrency_callback_; - base::RepeatingClosure worker_task_; + RepeatingCallback<size_t()> max_concurrency_callback_; + + // Worker task set by the job owner. + RepeatingCallback<void(experimental::JobDelegate*)> worker_task_; + // Task returned from TakeTask(), that calls |worker_task_| internally. + RepeatingClosure primary_task_; + const TimeTicks queue_time_; PooledTaskRunnerDelegate* delegate_; diff --git a/chromium/base/task/thread_pool/job_task_source_unittest.cc b/chromium/base/task/thread_pool/job_task_source_unittest.cc index 789cf5ab7a0..413771f2235 100644 --- a/chromium/base/task/thread_pool/job_task_source_unittest.cc +++ b/chromium/base/task/thread_pool/job_task_source_unittest.cc @@ -29,6 +29,8 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate { MOCK_CONST_METHOD1(ShouldYield, bool(const TaskSource* task_source)); MOCK_METHOD1(EnqueueJobTaskSource, bool(scoped_refptr<JobTaskSource> task_source)); + MOCK_METHOD1(RemoveJobTaskSource, + void(scoped_refptr<JobTaskSource> task_source)); MOCK_CONST_METHOD1(IsRunningPoolWithTraits, bool(const TaskTraits& traits)); MOCK_METHOD2(UpdatePriority, void(scoped_refptr<TaskSource> task_source, @@ -108,13 +110,17 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) { EXPECT_EQ(registered_task_source_d.WillRunTask(), TaskSource::RunStatus::kAllowedNotSaturated); + EXPECT_FALSE(task_source->ShouldYield()); + { EXPECT_EQ(1U, task_source->GetRemainingConcurrency()); auto task = registered_task_source_c.Clear(); std::move(task->task).Run(); + registered_task_source_c.DidProcessTask(); EXPECT_EQ(0U, task_source->GetRemainingConcurrency()); } // The task source shouldn't allow any further tasks after Clear. + EXPECT_TRUE(task_source->ShouldYield()); EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(), TaskSource::RunStatus::kDisallowed); @@ -122,6 +128,7 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) { { auto task = registered_task_source_d.Clear(); std::move(task->task).Run(); + registered_task_source_d.DidProcessTask(); EXPECT_EQ(0U, task_source->GetRemainingConcurrency()); } @@ -129,15 +136,53 @@ TEST_F(ThreadPoolJobTaskSourceTest, Clear) { std::move(task_a->task).Run(); registered_task_source_a.DidProcessTask(); - // A valid outstanding RunStatus can also take & run a task. + // A valid outstanding RunStatus can also take and run a task. { auto task = registered_task_source_b.TakeTask(); std::move(task->task).Run(); registered_task_source_b.DidProcessTask(); } - // Sanity check. +} + +// Verifies that a job task source doesn't return an "allowed" RunStatus after +// Cancel() is called. +TEST_F(ThreadPoolJobTaskSourceTest, Cancel) { + auto job_task = base::MakeRefCounted<test::MockJobTask>( + DoNothing(), /* num_tasks_to_run */ 3); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, {ThreadPool(), TaskPriority::BEST_EFFORT}, + &pooled_task_runner_delegate_); + + auto registered_task_source_a = + RegisteredTaskSource::CreateForTesting(task_source); + EXPECT_EQ(registered_task_source_a.WillRunTask(), + TaskSource::RunStatus::kAllowedNotSaturated); + auto task_a = registered_task_source_a.TakeTask(); + + auto registered_task_source_b = + RegisteredTaskSource::CreateForTesting(task_source); + EXPECT_EQ(registered_task_source_b.WillRunTask(), + TaskSource::RunStatus::kAllowedNotSaturated); + + EXPECT_FALSE(task_source->ShouldYield()); + + task_source->Cancel(); + EXPECT_TRUE(task_source->ShouldYield()); + + // The task source shouldn't allow any further tasks after Cancel. EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(), TaskSource::RunStatus::kDisallowed); + + // A task that was already acquired can still run. + std::move(task_a->task).Run(); + registered_task_source_a.DidProcessTask(); + + // A RegisteredTaskSource that's ready can also take and run a task. + { + auto task = registered_task_source_b.TakeTask(); + std::move(task->task).Run(); + registered_task_source_b.DidProcessTask(); + } } // Verifies that multiple tasks can run in parallel up to |max_concurrency|. @@ -183,6 +228,68 @@ TEST_F(ThreadPoolJobTaskSourceTest, RunTasksInParallel) { EXPECT_FALSE(registered_task_source_c.DidProcessTask()); } +// Verifies the normal flow of running the join task until completion. +TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTask) { + auto job_task = base::MakeRefCounted<test::MockJobTask>( + DoNothing(), /* num_tasks_to_run */ 2); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_); + + EXPECT_TRUE(task_source->WillJoin()); + // Intentionally run |worker_task| twice to make sure RunJoinTask() calls + // it again. This can happen in production if the joining thread spuriously + // return and needs to run again. + EXPECT_TRUE(task_source->RunJoinTask()); + EXPECT_FALSE(task_source->RunJoinTask()); +} + +// Verifies that WillJoin() doesn't allow a joining thread to contribute +// after Cancel() is called. +TEST_F(ThreadPoolJobTaskSourceTest, CancelJoinTask) { + auto job_task = base::MakeRefCounted<test::MockJobTask>( + DoNothing(), /* num_tasks_to_run */ 2); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_); + + task_source->Cancel(); + EXPECT_FALSE(task_source->WillJoin()); +} + +// Verifies that RunJoinTask() doesn't allow a joining thread to contribute +// after Cancel() is called. +TEST_F(ThreadPoolJobTaskSourceTest, JoinCancelTask) { + auto job_task = base::MakeRefCounted<test::MockJobTask>( + DoNothing(), /* num_tasks_to_run */ 2); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_); + + EXPECT_TRUE(task_source->WillJoin()); + task_source->Cancel(); + EXPECT_FALSE(task_source->RunJoinTask()); +} + +// Verifies that the join task can run in parallel with worker tasks up to +// |max_concurrency|. +TEST_F(ThreadPoolJobTaskSourceTest, RunJoinTaskInParallel) { + auto job_task = base::MakeRefCounted<test::MockJobTask>( + DoNothing(), /* num_tasks_to_run */ 2); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, ThreadPool(), &pooled_task_runner_delegate_); + + auto registered_task_source = + RegisteredTaskSource::CreateForTesting(task_source); + EXPECT_EQ(registered_task_source.WillRunTask(), + TaskSource::RunStatus::kAllowedNotSaturated); + auto worker_task = registered_task_source.TakeTask(); + + EXPECT_TRUE(task_source->WillJoin()); + + std::move(worker_task->task).Run(); + EXPECT_FALSE(registered_task_source.DidProcessTask()); + + EXPECT_FALSE(task_source->RunJoinTask()); +} + // Verifies that a call to NotifyConcurrencyIncrease() calls the delegate // and allows to run additional tasks. TEST_F(ThreadPoolJobTaskSourceTest, NotifyConcurrencyIncrease) { @@ -342,13 +449,9 @@ TEST_F(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) { auto registered_task_source = RegisteredTaskSource::CreateForTesting(task_source); - EXPECT_EQ(registered_task_source.WillRunTask(), - TaskSource::RunStatus::kAllowedSaturated); - // Can not be called before TakeTask(). - EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask()); - auto task = registered_task_source.TakeTask(); - registered_task_source.DidProcessTask(); + // Can not be called before WillRunTask(). + EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask()); } } // namespace internal diff --git a/chromium/base/task/thread_pool/pooled_task_runner_delegate.h b/chromium/base/task/thread_pool/pooled_task_runner_delegate.h index e00f481c6a5..0ec1b87c10b 100644 --- a/chromium/base/task/thread_pool/pooled_task_runner_delegate.h +++ b/chromium/base/task/thread_pool/pooled_task_runner_delegate.h @@ -46,6 +46,10 @@ class BASE_EXPORT PooledTaskRunnerDelegate { virtual bool EnqueueJobTaskSource( scoped_refptr<JobTaskSource> task_source) = 0; + // Removes |task_source| from the priority queue. + virtual void RemoveJobTaskSource( + scoped_refptr<JobTaskSource> task_source) = 0; + // Invoked when RunsTasksInCurrentSequence() is called on a // PooledParallelTaskRunner. Returns true if the current thread is part of the // ThreadGroup associated with |traits|. diff --git a/chromium/base/task/thread_pool/priority_queue.cc b/chromium/base/task/thread_pool/priority_queue.cc index 28077b8740e..85eae1fe469 100644 --- a/chromium/base/task/thread_pool/priority_queue.cc +++ b/chromium/base/task/thread_pool/priority_queue.cc @@ -139,20 +139,18 @@ RegisteredTaskSource PriorityQueue::PopTaskSource() { } RegisteredTaskSource PriorityQueue::RemoveTaskSource( - scoped_refptr<TaskSource> task_source) { - DCHECK(task_source); - + const TaskSource& task_source) { if (IsEmpty()) return nullptr; - const HeapHandle heap_handle = task_source->heap_handle(); + const HeapHandle heap_handle = task_source.heap_handle(); if (!heap_handle.IsValid()) return nullptr; TaskSourceAndSortKey& task_source_and_sort_key = const_cast<PriorityQueue::TaskSourceAndSortKey&>( container_.at(heap_handle)); - DCHECK_EQ(task_source_and_sort_key.task_source().get(), task_source); + DCHECK_EQ(task_source_and_sort_key.task_source().get(), &task_source); RegisteredTaskSource registered_task_source = task_source_and_sort_key.take_task_source(); diff --git a/chromium/base/task/thread_pool/priority_queue.h b/chromium/base/task/thread_pool/priority_queue.h index fbe391e9384..d4d88842126 100644 --- a/chromium/base/task/thread_pool/priority_queue.h +++ b/chromium/base/task/thread_pool/priority_queue.h @@ -49,7 +49,7 @@ class BASE_EXPORT PriorityQueue { // RegisteredTaskSource which evaluates to true if successful, or false if // |task_source| is not currently in the PriorityQueue or the PriorityQueue is // empty. - RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source); + RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source); // Updates the sort key of the TaskSource in |transaction| to // match its current traits. No-ops if the TaskSource is not in the diff --git a/chromium/base/task/thread_pool/priority_queue_unittest.cc b/chromium/base/task/thread_pool/priority_queue_unittest.cc index d5af9167457..30ad206ac98 100644 --- a/chromium/base/task/thread_pool/priority_queue_unittest.cc +++ b/chromium/base/task/thread_pool/priority_queue_unittest.cc @@ -55,7 +55,7 @@ class PriorityQueueWithSequencesTest : public testing::Test { } test::TaskEnvironment task_environment{ - test::ScopedTaskEnvironment::TimeSource::MOCK_TIME}; + test::TaskEnvironment::TimeSource::MOCK_TIME}; scoped_refptr<TaskSource> sequence_a = MakeSequenceWithTraitsAndTask( TaskTraits(ThreadPool(), TaskPriority::USER_VISIBLE)); @@ -144,34 +144,34 @@ TEST_F(PriorityQueueWithSequencesTest, RemoveSequence) { // Remove |sequence_a| from the PriorityQueue. |sequence_b| is still the // sequence with the highest priority. - EXPECT_TRUE(pq.RemoveTaskSource(sequence_a).Unregister()); + EXPECT_TRUE(pq.RemoveTaskSource(*sequence_a).Unregister()); EXPECT_EQ(sort_key_b, pq.PeekSortKey()); ExpectNumSequences(1U, 0U, 2U); // RemoveTaskSource() should return false if called on a sequence not in the // PriorityQueue. - EXPECT_FALSE(pq.RemoveTaskSource(sequence_a).Unregister()); + EXPECT_FALSE(pq.RemoveTaskSource(*sequence_a).Unregister()); ExpectNumSequences(1U, 0U, 2U); // Remove |sequence_b| from the PriorityQueue. |sequence_c| becomes the // sequence with the highest priority. - EXPECT_TRUE(pq.RemoveTaskSource(sequence_b).Unregister()); + EXPECT_TRUE(pq.RemoveTaskSource(*sequence_b).Unregister()); EXPECT_EQ(sort_key_c, pq.PeekSortKey()); ExpectNumSequences(1U, 0U, 1U); // Remove |sequence_d| from the PriorityQueue. |sequence_c| is still the // sequence with the highest priority. - EXPECT_TRUE(pq.RemoveTaskSource(sequence_d).Unregister()); + EXPECT_TRUE(pq.RemoveTaskSource(*sequence_d).Unregister()); EXPECT_EQ(sort_key_c, pq.PeekSortKey()); ExpectNumSequences(0U, 0U, 1U); // Remove |sequence_c| from the PriorityQueue, making it empty. - EXPECT_TRUE(pq.RemoveTaskSource(sequence_c).Unregister()); + EXPECT_TRUE(pq.RemoveTaskSource(*sequence_c).Unregister()); EXPECT_TRUE(pq.IsEmpty()); ExpectNumSequences(0U, 0U, 0U); // Return false if RemoveTaskSource() is called on an empty PriorityQueue. - EXPECT_FALSE(pq.RemoveTaskSource(sequence_c).Unregister()); + EXPECT_FALSE(pq.RemoveTaskSource(*sequence_c).Unregister()); ExpectNumSequences(0U, 0U, 0U); } diff --git a/chromium/base/task/thread_pool/sequence.cc b/chromium/base/task/thread_pool/sequence.cc index 3a4139c551e..ea7ce3053de 100644 --- a/chromium/base/task/thread_pool/sequence.cc +++ b/chromium/base/task/thread_pool/sequence.cc @@ -91,6 +91,7 @@ bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) { // WillRunTask(). DCHECK(has_worker_); has_worker_ = false; + // See comment on TaskSource::task_runner_ for lifetime management details. if (queue_.empty()) { ReleaseTaskRunner(); return false; @@ -108,20 +109,17 @@ SequenceSortKey Sequence::GetSortKey() const { Optional<Task> Sequence::Clear(TaskSource::Transaction* transaction) { CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_); - has_worker_ = false; - return base::make_optional<Task>( - FROM_HERE, - base::BindOnce( - [](scoped_refptr<Sequence> self, base::queue<Task> queue) { - bool queue_was_empty = queue.empty(); - while (!queue.empty()) - queue.pop(); - if (!queue_was_empty) { - self->ReleaseTaskRunner(); - } - }, - scoped_refptr<Sequence>(this), std::move(queue_)), - TimeDelta()); + // See comment on TaskSource::task_runner_ for lifetime management details. + if (!queue_.empty() && !has_worker_) + ReleaseTaskRunner(); + return base::make_optional<Task>(FROM_HERE, + base::BindOnce( + [](base::queue<Task> queue) { + while (!queue.empty()) + queue.pop(); + }, + std::move(queue_)), + TimeDelta()); } void Sequence::ReleaseTaskRunner() { diff --git a/chromium/base/task/thread_pool/sequence.h b/chromium/base/task/thread_pool/sequence.h index cbbbda42645..ca505ae84db 100644 --- a/chromium/base/task/thread_pool/sequence.h +++ b/chromium/base/task/thread_pool/sequence.h @@ -104,8 +104,7 @@ class BASE_EXPORT Sequence : public TaskSource { bool DidProcessTask(TaskSource::Transaction* transaction) override; SequenceSortKey GetSortKey() const override; - // Releases reference to TaskRunner. This might cause this object to be - // deleted; therefore, no member access should be made after this method. + // Releases reference to TaskRunner. void ReleaseTaskRunner(); const SequenceToken token_ = SequenceToken::Create(); diff --git a/chromium/base/task/thread_pool/sequence_unittest.cc b/chromium/base/task/thread_pool/sequence_unittest.cc index 7257a20d6f1..c5166239151 100644 --- a/chromium/base/task/thread_pool/sequence_unittest.cc +++ b/chromium/base/task/thread_pool/sequence_unittest.cc @@ -184,7 +184,7 @@ TEST(ThreadPoolSequenceTest, GetSortKeyForeground) { // Verify that a DCHECK fires if DidProcessTask() is called on a sequence which // didn't return a Task. -TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutTakeTask) { +TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutWillRunTask) { scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>( TaskTraits(ThreadPool()), nullptr, TaskSourceExecutionMode::kParallel); Sequence::Transaction sequence_transaction(sequence->BeginTransaction()); @@ -193,7 +193,6 @@ TEST(ThreadPoolSequenceTest, DidProcessTaskWithoutTakeTask) { auto registered_task_source = RegisteredTaskSource::CreateForTesting(sequence); EXPECT_DCHECK_DEATH({ - registered_task_source.WillRunTask(); registered_task_source.DidProcessTask(&sequence_transaction); }); } diff --git a/chromium/base/task/thread_pool/service_thread.cc b/chromium/base/task/thread_pool/service_thread.cc index 400fbb0590e..a7e1d21061a 100644 --- a/chromium/base/task/thread_pool/service_thread.cc +++ b/chromium/base/task/thread_pool/service_thread.cc @@ -71,39 +71,27 @@ void ServiceThread::PerformHeartbeatLatencyReport() const { if (!task_tracker_) return; - static constexpr TaskTraits kReportedTraits[] = { - {ThreadPool(), TaskPriority::BEST_EFFORT}, - {ThreadPool(), TaskPriority::BEST_EFFORT, MayBlock()}, - {ThreadPool(), TaskPriority::USER_VISIBLE}, - {ThreadPool(), TaskPriority::USER_VISIBLE, MayBlock()}, - {ThreadPool(), TaskPriority::USER_BLOCKING}, - {ThreadPool(), TaskPriority::USER_BLOCKING, MayBlock()}}; - - // Only record latency for one set of TaskTraits per report to avoid bias in - // the order in which tasks are posted (should we record all at once) as well - // as to avoid spinning up many worker threads to process this report if the + // Only record latency for one TaskPriority per report to avoid bias in the + // order in which tasks are posted (should we record all at once) as well as + // to avoid spinning up many worker threads to process this report if the // thread pool is currently idle (each thread group keeps at least one idle // thread so a single task isn't an issue). // Invoke RandInt() out-of-line to ensure it's obtained before // TimeTicks::Now(). - const TaskTraits& profiled_traits = - kReportedTraits[RandInt(0, base::size(kReportedTraits) - 1)]; + const TaskPriority profiled_priority = static_cast<TaskPriority>( + RandInt(static_cast<int>(TaskPriority::LOWEST), + static_cast<int>(TaskPriority::HIGHEST))); // Post through the static API to time the full stack. Use a new Now() for // every set of traits in case PostTask() itself is slow. // Bonus: this approach also includes the overhead of BindOnce() in the // reported latency. - // TODO(jessemckenna): pass |profiled_traits| directly to - // RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms() once compiler - // error on NaCl is fixed - TaskPriority task_priority = profiled_traits.priority(); - bool may_block = profiled_traits.may_block(); base::PostTask( - FROM_HERE, profiled_traits, + FROM_HERE, {base::ThreadPool(), profiled_priority}, BindOnce( &TaskTracker::RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms, - Unretained(task_tracker_), task_priority, may_block, TimeTicks::Now(), + Unretained(task_tracker_), profiled_priority, TimeTicks::Now(), task_tracker_->GetNumTasksRun())); } diff --git a/chromium/base/task/thread_pool/service_thread_unittest.cc b/chromium/base/task/thread_pool/service_thread_unittest.cc index ce098456877..7b3d3c21031 100644 --- a/chromium/base/task/thread_pool/service_thread_unittest.cc +++ b/chromium/base/task/thread_pool/service_thread_unittest.cc @@ -65,15 +65,9 @@ TEST(ThreadPoolServiceThreadIntegrationTest, HeartbeatLatencyReport) { "ThreadPool.HeartbeatLatencyMicroseconds.Test." "UserBlockingTaskPriority", "ThreadPool.HeartbeatLatencyMicroseconds.Test." - "UserBlockingTaskPriority_MayBlock", - "ThreadPool.HeartbeatLatencyMicroseconds.Test." "UserVisibleTaskPriority", "ThreadPool.HeartbeatLatencyMicroseconds.Test." - "UserVisibleTaskPriority_MayBlock", - "ThreadPool.HeartbeatLatencyMicroseconds.Test." - "BackgroundTaskPriority", - "ThreadPool.HeartbeatLatencyMicroseconds.Test." - "BackgroundTaskPriority_MayBlock"}; + "BackgroundTaskPriority"}; // Each report hits a single histogram above (randomly selected). But 1000 // reports should touch all histograms at least once the vast majority of the diff --git a/chromium/base/task/thread_pool/task_source.cc b/chromium/base/task/thread_pool/task_source.cc index 271988c2400..9674346b962 100644 --- a/chromium/base/task/thread_pool/task_source.cc +++ b/chromium/base/task/thread_pool/task_source.cc @@ -131,7 +131,6 @@ Optional<Task> RegisteredTaskSource::TakeTask( DCHECK(!transaction || transaction->task_source() == get()); #if DCHECK_IS_ON() DCHECK_EQ(State::kReady, run_step_); - run_step_ = State::kTaskAcquired; #endif // DCHECK_IS_ON() return task_source_->TakeTask(transaction); } @@ -139,9 +138,6 @@ Optional<Task> RegisteredTaskSource::TakeTask( Optional<Task> RegisteredTaskSource::Clear( TaskSource::Transaction* transaction) { DCHECK(!transaction || transaction->task_source() == get()); -#if DCHECK_IS_ON() - run_step_ = State::kInitial; -#endif // DCHECK_IS_ON() return task_source_->Clear(transaction); } @@ -149,7 +145,7 @@ bool RegisteredTaskSource::DidProcessTask( TaskSource::Transaction* transaction) { DCHECK(!transaction || transaction->task_source() == get()); #if DCHECK_IS_ON() - DCHECK_EQ(State::kTaskAcquired, run_step_); + DCHECK_EQ(State::kReady, run_step_); run_step_ = State::kInitial; #endif // DCHECK_IS_ON() return task_source_->DidProcessTask(transaction); diff --git a/chromium/base/task/thread_pool/task_source.h b/chromium/base/task/thread_pool/task_source.h index e56b42eda69..4dfa0875885 100644 --- a/chromium/base/task/thread_pool/task_source.h +++ b/chromium/base/task/thread_pool/task_source.h @@ -55,9 +55,9 @@ struct BASE_EXPORT ExecutionEnvironment { // RegisteredTaskSource after obtaining it from the queue: // 1- Check whether a task can run with WillRunTask() (and register/enqueue the // task source again if not saturated). -// 2- Iff (1) determined that a task can run, access the next task with -// TakeTask(). -// 3- Execute the task. +// 2- (optional) Iff (1) determined that a task can run, access the next task +// with TakeTask(). +// 3- (optional) Execute the task. // 4- Inform the task source that a task was processed with DidProcessTask(), // and re-enqueue the task source iff requested. // When a task source is registered multiple times, many overlapping chains of @@ -86,10 +86,10 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { enum class RunStatus { // TakeTask() cannot be called. kDisallowed, - // TakeTask() must be called, and the TaskSource has not reached its maximum + // TakeTask() may called, and the TaskSource has not reached its maximum // concurrency (i.e. the TaskSource still needs to be queued). kAllowedNotSaturated, - // TakeTask() must be called, and the TaskSource has reached its maximum + // TakeTask() may called, and the TaskSource has reached its maximum // concurrency (i.e. the TaskSource no longer needs to be queued). kAllowedSaturated, }; @@ -218,7 +218,7 @@ class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { // A pointer to the TaskRunner that posts to this TaskSource, if any. The // derived class is responsible for calling AddRef() when a TaskSource from // which no Task is executing becomes non-empty and Release() when - // DidProcessTask() returns false. + // it becomes empty again (e.g. when DidProcessTask() returns false). TaskRunner* task_runner_; TaskSourceExecutionMode execution_mode_; @@ -270,11 +270,11 @@ class BASE_EXPORT RegisteredTaskSource { Optional<Task> TakeTask(TaskSource::Transaction* transaction = nullptr) WARN_UNUSED_RESULT; - // Must be called once the task was run. This resets this RegisteredTaskSource - // to its initial state so that WillRunTask() may be called again. - // |transaction| is optional and should only be provided if this operation is - // already part of a transaction. Returns true if the TaskSource should be - // queued after this operation. + // Must be called after WillRunTask() or once the task was run if TakeTask() + // was called. This resets this RegisteredTaskSource to its initial state so + // that WillRunTask() may be called again. |transaction| is optional and + // should only be provided if this operation is already part of a transaction. + // Returns true if the TaskSource should be queued after this operation. bool DidProcessTask(TaskSource::Transaction* transaction = nullptr); // Returns a task that clears this TaskSource to make it empty. |transaction| @@ -293,7 +293,6 @@ class BASE_EXPORT RegisteredTaskSource { enum class State { kInitial, // WillRunTask() may be called. kReady, // After WillRunTask() returned a valid RunStatus. - kTaskAcquired, // After TakeTask(). }; State run_step_ = State::kInitial; diff --git a/chromium/base/task/thread_pool/task_tracker.cc b/chromium/base/task/thread_pool/task_tracker.cc index d21422bc9fa..a74f14003b3 100644 --- a/chromium/base/task/thread_pool/task_tracker.cc +++ b/chromium/base/task/thread_pool/task_tracker.cc @@ -20,6 +20,7 @@ #include "base/sequence_token.h" #include "base/synchronization/condition_variable.h" #include "base/task/scoped_set_task_priority_for_current_thread.h" +#include "base/task/task_executor.h" #include "base/threading/sequence_local_storage_map.h" #include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/thread_restrictions.h" @@ -27,6 +28,7 @@ #include "base/time/time.h" #include "base/trace_event/trace_event.h" #include "base/values.h" +#include "build/build_config.h" namespace base { namespace internal { @@ -75,59 +77,34 @@ void TaskTracingInfo::AppendAsTraceFormat(std::string* out) const { out->append(tmp); } -// Constructs a histogram to track latency which is logging to -// "ThreadPool.{histogram_name}.{histogram_label}.{task_type_suffix}". -HistogramBase* GetLatencyHistogram(StringPiece histogram_name, - StringPiece histogram_label, - StringPiece task_type_suffix) { - DCHECK(!histogram_name.empty()); - DCHECK(!histogram_label.empty()); - DCHECK(!task_type_suffix.empty()); - // Mimics the UMA_HISTOGRAM_HIGH_RESOLUTION_CUSTOM_TIMES macro. The minimums - // and maximums were chosen to place the 1ms mark at around the 70% range - // coverage for buckets giving us good info for tasks that have a latency - // below 1ms (most of them) and enough info to assess how bad the latency is - // for tasks that exceed this threshold. - const std::string histogram = JoinString( - {"ThreadPool", histogram_name, histogram_label, task_type_suffix}, "."); - return Histogram::FactoryMicrosecondsTimeGet( - histogram, TimeDelta::FromMicroseconds(1), - TimeDelta::FromMilliseconds(20), 50, - HistogramBase::kUmaTargetedHistogramFlag); -} - -// Constructs a histogram to track task count which is logging to -// "ThreadPool.{histogram_name}.{histogram_label}.{task_type_suffix}". -HistogramBase* GetCountHistogram(StringPiece histogram_name, - StringPiece histogram_label, - StringPiece task_type_suffix) { - DCHECK(!histogram_name.empty()); - DCHECK(!histogram_label.empty()); - DCHECK(!task_type_suffix.empty()); - // Mimics the UMA_HISTOGRAM_CUSTOM_COUNTS macro. - const std::string histogram = JoinString( - {"ThreadPool", histogram_name, histogram_label, task_type_suffix}, "."); - // 500 was chosen as the maximum number of tasks run while queuing because - // values this high would likely indicate an error, beyond which knowing the - // actual number of tasks is not informative. - return Histogram::FactoryGet(histogram, 1, 500, 50, - HistogramBase::kUmaTargetedHistogramFlag); -} - -// Returns a histogram stored in a 2D array indexed by task priority and -// whether it may block. -// TODO(jessemckenna): use the STATIC_HISTOGRAM_POINTER_GROUP macro from -// histogram_macros.h instead. -HistogramBase* GetHistogramForTaskTraits( - TaskTraits task_traits, - HistogramBase* const (*histograms)[2]) { - return histograms[static_cast<int>(task_traits.priority())] - [task_traits.may_block() || - task_traits.with_base_sync_primitives() - ? 1 - : 0]; +const char* GetTaskPrioritySuffix(TaskPriority priority) { + switch (priority) { + case TaskPriority::BEST_EFFORT: + return "BackgroundTaskPriority"; + case TaskPriority::USER_VISIBLE: + return "UserVisibleTaskPriority"; + case TaskPriority::USER_BLOCKING: + return "UserBlockingTaskPriority"; + } } +// Records |time_sample| to the histogram |histogram_name|.|priority suffix|, +// where |priority_suffix| is derived from |priority|. +// +// The minimums and maximums were chosen to place the 1ms mark at around the 70% +// range coverage for buckets giving us good info for tasks that have a latency +// below 1ms (most of them) and enough info to assess how bad the latency is for +// tasks that exceed this threshold. +#define STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(histogram_name, priority, \ + time_sample) \ + STATIC_HISTOGRAM_POINTER_GROUP( \ + histogram_name, static_cast<int>(priority), \ + static_cast<int>(TaskPriority::HIGHEST) + 1, AddTime(time_sample), \ + Histogram::FactoryMicrosecondsTimeGet( \ + histogram_name, TimeDelta::FromMicroseconds(1), \ + TimeDelta::FromMilliseconds(20), 50, \ + HistogramBase::kUmaTargetedHistogramFlag)); + bool HasLogBestEffortTasksSwitch() { // The CommandLine might not be initialized if ThreadPool is initialized in a // dynamic library which doesn't have access to argc/argv. @@ -136,6 +113,84 @@ bool HasLogBestEffortTasksSwitch() { switches::kLogBestEffortTasks); } +// Needed for PostTaskHere and CurrentThread. This executor lives for the +// duration of a threadpool task invocation. +class EphemeralTaskExecutor : public TaskExecutor { + public: + // |sequenced_task_runner| and |single_thread_task_runner| must outlive this + // EphemeralTaskExecutor. + EphemeralTaskExecutor(SequencedTaskRunner* sequenced_task_runner, + SingleThreadTaskRunner* single_thread_task_runner, + const TaskTraits* sequence_traits) + : sequenced_task_runner_(sequenced_task_runner), + single_thread_task_runner_(single_thread_task_runner), + sequence_traits_(sequence_traits) { + SetTaskExecutorForCurrentThread(this); + } + + ~EphemeralTaskExecutor() override { + SetTaskExecutorForCurrentThread(nullptr); + } + + // TaskExecutor: + bool PostDelayedTask(const Location& from_here, + const TaskTraits& traits, + OnceClosure task, + TimeDelta delay) override { + CheckTraitsCompatibleWithSequenceTraits(traits); + return sequenced_task_runner_->PostDelayedTask(from_here, std::move(task), + delay); + } + + scoped_refptr<TaskRunner> CreateTaskRunner( + const TaskTraits& traits) override { + CheckTraitsCompatibleWithSequenceTraits(traits); + return sequenced_task_runner_; + } + + scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner( + const TaskTraits& traits) override { + CheckTraitsCompatibleWithSequenceTraits(traits); + return sequenced_task_runner_; + } + + scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) override { + CheckTraitsCompatibleWithSequenceTraits(traits); + return single_thread_task_runner_; + } + +#if defined(OS_WIN) + scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner( + const TaskTraits& traits, + SingleThreadTaskRunnerThreadMode thread_mode) override { + CheckTraitsCompatibleWithSequenceTraits(traits); + return single_thread_task_runner_; + } +#endif // defined(OS_WIN) + + private: + // Currently ignores |traits.priority()|. + void CheckTraitsCompatibleWithSequenceTraits(const TaskTraits& traits) { + if (traits.shutdown_behavior_set_explicitly()) { + DCHECK_EQ(traits.shutdown_behavior(), + sequence_traits_->shutdown_behavior()); + } + + DCHECK(!traits.may_block() || + traits.may_block() == sequence_traits_->may_block()); + + DCHECK(!traits.with_base_sync_primitives() || + traits.with_base_sync_primitives() == + sequence_traits_->with_base_sync_primitives()); + } + + SequencedTaskRunner* const sequenced_task_runner_; + SingleThreadTaskRunner* const single_thread_task_runner_; + const TaskTraits* const sequence_traits_; +}; + } // namespace // Atomic internal state used by TaskTracker to track items that are blocking @@ -228,78 +283,14 @@ class TaskTracker::State { DISALLOW_COPY_AND_ASSIGN(State); }; -// TODO(jessemckenna): Write a helper function to avoid code duplication below. TaskTracker::TaskTracker(StringPiece histogram_label) - : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()), + : histogram_label_(histogram_label), + has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()), state_(new State), can_run_policy_(CanRunPolicy::kAll), flush_cv_(flush_lock_.CreateConditionVariable()), shutdown_lock_(&flush_lock_), - task_latency_histograms_{ - {GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "BackgroundTaskPriority"), - GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "BackgroundTaskPriority_MayBlock")}, - {GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "UserVisibleTaskPriority"), - GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "UserVisibleTaskPriority_MayBlock")}, - {GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "UserBlockingTaskPriority"), - GetLatencyHistogram("TaskLatencyMicroseconds", - histogram_label, - "UserBlockingTaskPriority_MayBlock")}}, - heartbeat_latency_histograms_{ - {GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "BackgroundTaskPriority"), - GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "BackgroundTaskPriority_MayBlock")}, - {GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "UserVisibleTaskPriority"), - GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "UserVisibleTaskPriority_MayBlock")}, - {GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "UserBlockingTaskPriority"), - GetLatencyHistogram("HeartbeatLatencyMicroseconds", - histogram_label, - "UserBlockingTaskPriority_MayBlock")}}, - num_tasks_run_while_queuing_histograms_{ - {GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "BackgroundTaskPriority"), - GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "BackgroundTaskPriority_MayBlock")}, - {GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "UserVisibleTaskPriority"), - GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "UserVisibleTaskPriority_MayBlock")}, - {GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "UserBlockingTaskPriority"), - GetCountHistogram("NumTasksRunWhileQueuing", - histogram_label, - "UserBlockingTaskPriority_MayBlock")}}, - tracked_ref_factory_(this) { - // Confirm that all |task_latency_histograms_| have been initialized above. - for (TaskPriorityType i = 0; i < kNumTaskPriorities; ++i) { - for (uint8_t j = 0; j < kNumBlockingModes; ++j) { - DCHECK(task_latency_histograms_[i][j]); - } - } -} + tracked_ref_factory_(this) {} TaskTracker::~TaskTracker() = default; @@ -441,7 +432,7 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask( RegisteredTaskSource task_source) { DCHECK(task_source); - const bool can_run_worker_task = + const bool task_is_worker_task = BeforeRunTask(task_source->shutdown_behavior()); // Run the next task in |task_source|. @@ -449,7 +440,7 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask( TaskTraits traits{ThreadPool()}; { auto transaction = task_source->BeginTransaction(); - task = can_run_worker_task ? task_source.TakeTask(&transaction) + task = task_is_worker_task ? task_source.TakeTask(&transaction) : task_source.Clear(&transaction); traits = transaction.traits(); } @@ -458,13 +449,12 @@ RegisteredTaskSource TaskTracker::RunAndPopNextTask( // Run the |task| (whether it's a worker task or the Clear() closure). RunTask(std::move(task.value()), task_source.get(), traits); } - if (can_run_worker_task) { + if (task_is_worker_task) AfterRunTask(task_source->shutdown_behavior()); - const bool task_source_must_be_queued = task_source.DidProcessTask(); - // |task_source| should be reenqueued iff requested by DidProcessTask(). - if (task_source_must_be_queued) - return task_source; - } + const bool task_source_must_be_queued = task_source.DidProcessTask(); + // |task_source| should be reenqueued iff requested by DidProcessTask(). + if (task_source_must_be_queued) + return task_source; return nullptr; } @@ -477,37 +467,50 @@ bool TaskTracker::IsShutdownComplete() const { return shutdown_event_ && shutdown_event_->IsSignaled(); } -void TaskTracker::RecordLatencyHistogram( - LatencyHistogramType latency_histogram_type, - TaskTraits task_traits, - TimeTicks posted_time) const { - const TimeDelta task_latency = TimeTicks::Now() - posted_time; +void TaskTracker::RecordLatencyHistogram(TaskPriority priority, + TimeTicks posted_time) const { + if (histogram_label_.empty()) + return; - DCHECK(latency_histogram_type == LatencyHistogramType::TASK_LATENCY || - latency_histogram_type == LatencyHistogramType::HEARTBEAT_LATENCY); - const auto& histograms = - latency_histogram_type == LatencyHistogramType::TASK_LATENCY - ? task_latency_histograms_ - : heartbeat_latency_histograms_; - GetHistogramForTaskTraits(task_traits, histograms) - ->AddTimeMicrosecondsGranularity(task_latency); + auto get_latency_histogram_name = [this, priority]() { + return JoinString({"ThreadPool.TaskLatencyMicroseconds", histogram_label_, + GetTaskPrioritySuffix(priority)}, + "."); + }; + STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(get_latency_histogram_name(), priority, + TimeTicks::Now() - posted_time); } void TaskTracker::RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms( - TaskPriority task_priority, - bool may_block, + TaskPriority priority, TimeTicks posted_time, int num_tasks_run_when_posted) const { - TaskTraits task_traits{ThreadPool()}; - if (may_block) - task_traits = TaskTraits(ThreadPool(), task_priority, MayBlock()); - else - task_traits = TaskTraits(ThreadPool(), task_priority); - RecordLatencyHistogram(LatencyHistogramType::HEARTBEAT_LATENCY, task_traits, - posted_time); - GetHistogramForTaskTraits(task_traits, - num_tasks_run_while_queuing_histograms_) - ->Add(GetNumTasksRun() - num_tasks_run_when_posted); + if (histogram_label_.empty()) + return; + + auto get_heartbeat_latency_histogram_name = [this, priority]() { + return JoinString({"ThreadPool.HeartbeatLatencyMicroseconds", + histogram_label_, GetTaskPrioritySuffix(priority)}, + "."); + }; + STATIC_LATENCY_HISTOGRAM_POINTER_GROUP(get_heartbeat_latency_histogram_name(), + priority, + TimeTicks::Now() - posted_time); + + auto get_num_tasks_run_while_queuing_histogram_name = [this, priority]() { + return JoinString({"ThreadPool.NumTasksRunWhileQueuing", histogram_label_, + GetTaskPrioritySuffix(priority)}, + "."); + }; + STATIC_HISTOGRAM_POINTER_GROUP( + get_num_tasks_run_while_queuing_histogram_name(), + static_cast<int>(priority), static_cast<int>(TaskPriority::HIGHEST) + 1, + Add(GetNumTasksRun() - num_tasks_run_when_posted), + // 500 was chosen as the maximum number of tasks run while queuing because + // values this high would likely indicate an error, beyond which knowing + // the actual number of tasks is not informative. + Histogram::FactoryGet(get_num_tasks_run_while_queuing_histogram_name(), 1, + 500, 50, HistogramBase::kUmaTargetedHistogramFlag)); } int TaskTracker::GetNumTasksRun() const { @@ -522,8 +525,7 @@ void TaskTracker::RunTask(Task task, TaskSource* task_source, const TaskTraits& traits) { DCHECK(task_source); - RecordLatencyHistogram(LatencyHistogramType::TASK_LATENCY, traits, - task.queue_time); + RecordLatencyHistogram(traits.priority(), task.queue_time); const auto environment = task_source->GetExecutionEnvironment(); @@ -557,6 +559,7 @@ void TaskTracker::RunTask(Task task, // Set up TaskRunnerHandle as expected for the scope of the task. Optional<SequencedTaskRunnerHandle> sequenced_task_runner_handle; Optional<ThreadTaskRunnerHandle> single_thread_task_runner_handle; + Optional<EphemeralTaskExecutor> ephemiral_task_executor; switch (task_source->execution_mode()) { case TaskSourceExecutionMode::kJob: case TaskSourceExecutionMode::kParallel: @@ -565,11 +568,18 @@ void TaskTracker::RunTask(Task task, DCHECK(task_source->task_runner()); sequenced_task_runner_handle.emplace( static_cast<SequencedTaskRunner*>(task_source->task_runner())); + ephemiral_task_executor.emplace( + static_cast<SequencedTaskRunner*>(task_source->task_runner()), + nullptr, &traits); break; case TaskSourceExecutionMode::kSingleThread: DCHECK(task_source->task_runner()); single_thread_task_runner_handle.emplace( static_cast<SingleThreadTaskRunner*>(task_source->task_runner())); + ephemiral_task_executor.emplace( + static_cast<SequencedTaskRunner*>(task_source->task_runner()), + static_cast<SingleThreadTaskRunner*>(task_source->task_runner()), + &traits); break; } diff --git a/chromium/base/task/thread_pool/task_tracker.h b/chromium/base/task/thread_pool/task_tracker.h index bb5296b7da1..59f0726b8eb 100644 --- a/chromium/base/task/thread_pool/task_tracker.h +++ b/chromium/base/task/thread_pool/task_tracker.h @@ -31,7 +31,6 @@ namespace base { class ConditionVariable; -class HistogramBase; namespace internal { @@ -54,7 +53,8 @@ enum class CanRunPolicy { // and records metrics and trace events. This class is thread-safe. class BASE_EXPORT TaskTracker { public: - // |histogram_label| is used as a suffix for histograms, it must not be empty. + // |histogram_label| is used to label histograms. No histograms are recorded + // if it is empty. TaskTracker(StringPiece histogram_label); virtual ~TaskTracker(); @@ -131,26 +131,15 @@ class BASE_EXPORT TaskTracker { // no tasks are blocking shutdown). bool IsShutdownComplete() const; - enum class LatencyHistogramType { - // Records the latency of each individual task posted through TaskTracker. - TASK_LATENCY, - // Records the latency of heartbeat tasks which are independent of current - // workload. These avoid a bias towards TASK_LATENCY reporting that high- - // priority tasks are "slower" than regular tasks because high-priority - // tasks tend to be correlated with heavy workloads. - HEARTBEAT_LATENCY, - }; - // Records two histograms // 1. ThreadPool.[label].HeartbeatLatencyMicroseconds.[suffix]: // Now() - posted_time // 2. ThreadPool.[label].NumTasksRunWhileQueuing.[suffix]: // GetNumTasksRun() - num_tasks_run_when_posted. // [label] is the histogram label provided to the constructor. - // [suffix] is derived from |task_priority| and |may_block|. + // [suffix] is derived from |task_priority|. void RecordHeartbeatLatencyAndTasksRunWhileQueuingHistograms( TaskPriority task_priority, - bool may_block, TimeTicks posted_time, int num_tasks_run_when_posted) const; @@ -213,10 +202,9 @@ class BASE_EXPORT TaskTracker { // manner. void CallFlushCallbackForTesting(); - // Records |Now() - posted_time| to the appropriate |latency_histogram_type| - // based on |task_traits|. - void RecordLatencyHistogram(LatencyHistogramType latency_histogram_type, - TaskTraits task_traits, + // Records |Now() - posted_time| to the + // ThreadPool.TaskLatencyMicroseconds.[label].[priority] histogram. + void RecordLatencyHistogram(TaskPriority priority, TimeTicks posted_time) const; void IncrementNumTasksRun(); @@ -230,6 +218,9 @@ class BASE_EXPORT TaskTracker { TaskAnnotator task_annotator_; + // Suffix for histograms recorded by this TaskTracker. + const std::string histogram_label_; + // Indicates whether logging information about TaskPriority::BEST_EFFORT tasks // was enabled with a command line switch. const bool has_log_best_effort_tasks_switch_; @@ -277,25 +268,6 @@ class BASE_EXPORT TaskTracker { // a task queued to histogram. std::atomic_int num_tasks_run_{0}; - // ThreadPool.TaskLatencyMicroseconds.*, - // ThreadPool.HeartbeatLatencyMicroseconds.*, and - // ThreadPool.NumTasksRunWhileQueuing.* histograms. The first index is - // a TaskPriority. The second index is 0 for non-blocking tasks, 1 for - // blocking tasks. Intentionally leaked. - // TODO(scheduler-dev): Consider using STATIC_HISTOGRAM_POINTER_GROUP for - // these. - using TaskPriorityType = std::underlying_type<TaskPriority>::type; - static constexpr TaskPriorityType kNumTaskPriorities = - static_cast<TaskPriorityType>(TaskPriority::HIGHEST) + 1; - static constexpr uint8_t kNumBlockingModes = 2; - HistogramBase* const task_latency_histograms_[kNumTaskPriorities] - [kNumBlockingModes]; - HistogramBase* const heartbeat_latency_histograms_[kNumTaskPriorities] - [kNumBlockingModes]; - HistogramBase* const - num_tasks_run_while_queuing_histograms_[kNumTaskPriorities] - [kNumBlockingModes]; - // Ensures all state (e.g. dangling cleaned up workers) is coalesced before // destroying the TaskTracker (e.g. in test environments). // Ref. https://crbug.com/827615. diff --git a/chromium/base/task/thread_pool/task_tracker_unittest.cc b/chromium/base/task/thread_pool/task_tracker_unittest.cc index fa231c9b7be..a3ba41cf559 100644 --- a/chromium/base/task/thread_pool/task_tracker_unittest.cc +++ b/chromium/base/task/thread_pool/task_tracker_unittest.cc @@ -19,7 +19,6 @@ #include "base/memory/ref_counted.h" #include "base/metrics/histogram_base.h" #include "base/metrics/histogram_samples.h" -#include "base/metrics/statistics_recorder.h" #include "base/sequence_token.h" #include "base/sequenced_task_runner.h" #include "base/single_thread_task_runner.h" @@ -37,6 +36,7 @@ #include "base/threading/scoped_blocking_call.h" #include "base/threading/sequenced_task_runner_handle.h" #include "base/threading/simple_thread.h" +#include "base/threading/thread_restrictions.h" #include "base/threading/thread_task_runner_handle.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/gtest/include/gtest/gtest.h" @@ -1244,8 +1244,6 @@ TEST(ThreadPoolTaskTrackerWaitAllowedTest, WaitAllowed) { // Verify that ThreadPool.TaskLatency.* histograms are correctly recorded // when a task runs. TEST(ThreadPoolTaskTrackerHistogramTest, TaskLatency) { - auto statistics_recorder = StatisticsRecorder::CreateTemporaryForTesting(); - TaskTracker tracker("Test"); struct { @@ -1257,28 +1255,28 @@ TEST(ThreadPoolTaskTrackerHistogramTest, TaskLatency) { "BackgroundTaskPriority"}, {{ThreadPool(), MayBlock(), TaskPriority::BEST_EFFORT}, "ThreadPool.TaskLatencyMicroseconds.Test." - "BackgroundTaskPriority_MayBlock"}, + "BackgroundTaskPriority"}, {{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT}, "ThreadPool.TaskLatencyMicroseconds.Test." - "BackgroundTaskPriority_MayBlock"}, + "BackgroundTaskPriority"}, {{ThreadPool(), TaskPriority::USER_VISIBLE}, "ThreadPool.TaskLatencyMicroseconds.Test." "UserVisibleTaskPriority"}, {{ThreadPool(), MayBlock(), TaskPriority::USER_VISIBLE}, "ThreadPool.TaskLatencyMicroseconds.Test." - "UserVisibleTaskPriority_MayBlock"}, + "UserVisibleTaskPriority"}, {{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::USER_VISIBLE}, "ThreadPool.TaskLatencyMicroseconds.Test." - "UserVisibleTaskPriority_MayBlock"}, + "UserVisibleTaskPriority"}, {{ThreadPool(), TaskPriority::USER_BLOCKING}, "ThreadPool.TaskLatencyMicroseconds.Test." "UserBlockingTaskPriority"}, {{ThreadPool(), MayBlock(), TaskPriority::USER_BLOCKING}, "ThreadPool.TaskLatencyMicroseconds.Test." - "UserBlockingTaskPriority_MayBlock"}, + "UserBlockingTaskPriority"}, {{ThreadPool(), WithBaseSyncPrimitives(), TaskPriority::USER_BLOCKING}, "ThreadPool.TaskLatencyMicroseconds.Test." - "UserBlockingTaskPriority_MayBlock"}}; + "UserBlockingTaskPriority"}}; for (const auto& test : kTests) { Task task(FROM_HERE, DoNothing(), TimeDelta()); diff --git a/chromium/base/task/thread_pool/test_utils.cc b/chromium/base/task/thread_pool/test_utils.cc index a02e24bf3a6..d6236373f84 100644 --- a/chromium/base/task/thread_pool/test_utils.cc +++ b/chromium/base/task/thread_pool/test_utils.cc @@ -239,6 +239,11 @@ bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource( return true; } +void MockPooledTaskRunnerDelegate::RemoveJobTaskSource( + scoped_refptr<JobTaskSource> task_source) { + thread_group_->RemoveTaskSource(*task_source); +} + bool MockPooledTaskRunnerDelegate::IsRunningPoolWithTraits( const TaskTraits& traits) const { // |thread_group_| must be initialized with SetThreadGroup() before diff --git a/chromium/base/task/thread_pool/test_utils.h b/chromium/base/task/thread_pool/test_utils.h index 1d85ee8cea2..ebdfbe28467 100644 --- a/chromium/base/task/thread_pool/test_utils.h +++ b/chromium/base/task/thread_pool/test_utils.h @@ -62,6 +62,7 @@ class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate { bool PostTaskWithSequence(Task task, scoped_refptr<Sequence> sequence) override; bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override; + void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override; bool ShouldYield(const TaskSource* task_source) const override; bool IsRunningPoolWithTraits(const TaskTraits& traits) const override; void UpdatePriority(scoped_refptr<TaskSource> task_source, diff --git a/chromium/base/task/thread_pool/thread_group.cc b/chromium/base/task/thread_pool/thread_group.cc index 7f6dcef2630..8de5b518fde 100644 --- a/chromium/base/task/thread_pool/thread_group.cc +++ b/chromium/base/task/thread_pool/thread_group.cc @@ -139,9 +139,9 @@ ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() } RegisteredTaskSource ThreadGroup::RemoveTaskSource( - scoped_refptr<TaskSource> task_source) { + const TaskSource& task_source) { CheckedAutoLock auto_lock(lock_); - return priority_queue_.RemoveTaskSource(std::move(task_source)); + return priority_queue_.RemoveTaskSource(task_source); } void ThreadGroup::ReEnqueueTaskSourceLockRequired( diff --git a/chromium/base/task/thread_pool/thread_group.h b/chromium/base/task/thread_pool/thread_group.h index c00755413f5..4db7bd5f66d 100644 --- a/chromium/base/task/thread_pool/thread_group.h +++ b/chromium/base/task/thread_pool/thread_group.h @@ -65,7 +65,7 @@ class BASE_EXPORT ThreadGroup { // RegisteredTaskSource that evaluats to true if successful, or false if // |task_source| is not currently in |priority_queue_|, such as when a worker // is running a task from it. - RegisteredTaskSource RemoveTaskSource(scoped_refptr<TaskSource> task_source); + RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source); // Updates the position of the TaskSource in |transaction| in this // ThreadGroup's PriorityQueue based on the TaskSource's current traits. diff --git a/chromium/base/task/thread_pool/thread_group_impl.cc b/chromium/base/task/thread_pool/thread_group_impl.cc index 378493afc78..0c17b47d31b 100644 --- a/chromium/base/task/thread_pool/thread_group_impl.cc +++ b/chromium/base/task/thread_pool/thread_group_impl.cc @@ -41,25 +41,6 @@ #include "base/win/windows_version.h" #endif // defined(OS_WIN) -// Data from deprecated UMA histograms: -// -// ThreadPool.NumTasksBetweenWaits.(Browser/Renderer).Foreground, August 2019 -// Number of tasks between two waits by a foreground worker thread in a -// browser/renderer process. -// -// Windows (browser/renderer) -// 1 at 87th percentile / 92th percentile -// 2 at 95th percentile / 98th percentile -// 5 at 99th percentile / 100th percentile -// Mac (browser/renderer) -// 1 at 81th percentile / 90th percentile -// 2 at 92th percentile / 97th percentile -// 5 at 98th percentile / 100th percentile -// Android (browser/renderer) -// 1 at 92th percentile / 96th percentile -// 2 at 97th percentile / 98th percentile -// 5 at 99th percentile / 100th percentile - namespace base { namespace internal { @@ -342,40 +323,57 @@ ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label, priority_hint_(priority_hint), idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()), // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. - detach_duration_histogram_(Histogram::FactoryTimeGet( - JoinString({kDetachDurationHistogramPrefix, histogram_label}, ""), - TimeDelta::FromMilliseconds(1), - TimeDelta::FromHours(1), - 50, - HistogramBase::kUmaTargetedHistogramFlag)), + detach_duration_histogram_( + histogram_label.empty() + ? nullptr + : Histogram::FactoryTimeGet( + JoinString( + {kDetachDurationHistogramPrefix, histogram_label}, + ""), + TimeDelta::FromMilliseconds(1), + TimeDelta::FromHours(1), + 50, + HistogramBase::kUmaTargetedHistogramFlag)), // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more // than 1000 tasks before detaching, there is no need to know the exact // number of tasks that ran. - num_tasks_before_detach_histogram_(Histogram::FactoryGet( - JoinString({kNumTasksBeforeDetachHistogramPrefix, histogram_label}, - ""), - 1, - 1000, - 50, - HistogramBase::kUmaTargetedHistogramFlag)), + num_tasks_before_detach_histogram_( + histogram_label.empty() + ? nullptr + : Histogram::FactoryGet( + JoinString( + {kNumTasksBeforeDetachHistogramPrefix, histogram_label}, + ""), + 1, + 1000, + 50, + HistogramBase::kUmaTargetedHistogramFlag)), // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A ThreadGroup is // expected to run between zero and a few tens of workers. // When it runs more than 100 worker, there is no need to know the exact // number of workers that ran. - num_workers_histogram_(Histogram::FactoryGet( - JoinString({kNumWorkersHistogramPrefix, histogram_label}, ""), - 1, - 100, - 50, - HistogramBase::kUmaTargetedHistogramFlag)), - num_active_workers_histogram_(Histogram::FactoryGet( - JoinString({kNumActiveWorkersHistogramPrefix, histogram_label}, ""), - 1, - 100, - 50, - HistogramBase::kUmaTargetedHistogramFlag)), + num_workers_histogram_( + histogram_label.empty() + ? nullptr + : Histogram::FactoryGet( + JoinString({kNumWorkersHistogramPrefix, histogram_label}, + ""), + 1, + 100, + 50, + HistogramBase::kUmaTargetedHistogramFlag)), + num_active_workers_histogram_( + histogram_label.empty() + ? nullptr + : Histogram::FactoryGet( + JoinString( + {kNumActiveWorkersHistogramPrefix, histogram_label}, + ""), + 1, + 100, + 50, + HistogramBase::kUmaTargetedHistogramFlag)), tracked_ref_factory_(this) { - DCHECK(!histogram_label.empty()); DCHECK(!thread_group_label_.empty()); } @@ -484,10 +482,6 @@ void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) { } void ThreadGroupImpl::JoinForTesting() { -#if DCHECK_IS_ON() - join_for_testing_started_.Set(); -#endif - decltype(workers_) workers_copy; { CheckedAutoLock auto_lock(lock_); @@ -496,6 +490,8 @@ void ThreadGroupImpl::JoinForTesting() { DCHECK_GT(workers_.size(), size_t(0)) << "Joined an unstarted thread group."; + join_for_testing_started_ = true; + // Ensure WorkerThreads in |workers_| do not attempt to cleanup while // being joined. worker_cleanup_disallowed_for_testing_ = true; @@ -531,10 +527,13 @@ size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const { void ThreadGroupImpl::ReportHeartbeatMetrics() const { CheckedAutoLock auto_lock(lock_); - num_workers_histogram_->Add(workers_.size()); - - num_active_workers_histogram_->Add(workers_.size() - - idle_workers_stack_.Size()); + if (num_workers_histogram_) { + num_workers_histogram_->Add(workers_.size()); + } + if (num_active_workers_histogram_) { + num_active_workers_histogram_->Add(workers_.size() - + idle_workers_stack_.Size()); + } } ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl( @@ -710,10 +709,13 @@ bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired( void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired( WorkerThread* worker) { + DCHECK(!outer_->join_for_testing_started_); DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); - outer_->num_tasks_before_detach_histogram_->Add( - worker_only().num_tasks_since_last_detach); + if (outer_->num_tasks_before_detach_histogram_) { + outer_->num_tasks_before_detach_histogram_->Add( + worker_only().num_tasks_since_last_detach); + } outer_->cleanup_timestamps_.push(subtle::TimeTicksNowIgnoringOverride()); worker->Cleanup(); outer_->idle_workers_stack_.Remove(worker); @@ -756,7 +758,7 @@ void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit( // |workers_| by the time the thread is about to exit. (except in the cases // where the thread group is no longer going to be used - in which case, // it's fine for there to be invalid workers in the thread group. - if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) { + if (!shutdown_complete && !outer_->join_for_testing_started_) { DCHECK(!outer_->idle_workers_stack_.Contains(worker)); DCHECK(!ContainsWorker(outer_->workers_, worker)); } @@ -944,6 +946,7 @@ void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired( scoped_refptr<WorkerThread> ThreadGroupImpl::CreateAndRegisterWorkerLockRequired( ScopedWorkersExecutor* executor) { + DCHECK(!join_for_testing_started_); DCHECK_LT(workers_.size(), max_tasks_); DCHECK_LT(workers_.size(), kMaxNumberOfWorkers); DCHECK(idle_workers_stack_.IsEmpty()); @@ -962,8 +965,10 @@ ThreadGroupImpl::CreateAndRegisterWorkerLockRequired( DCHECK_LE(workers_.size(), max_tasks_); if (!cleanup_timestamps_.empty()) { - detach_duration_histogram_->AddTime(subtle::TimeTicksNowIgnoringOverride() - - cleanup_timestamps_.top()); + if (detach_duration_histogram_) { + detach_duration_histogram_->AddTime( + subtle::TimeTicksNowIgnoringOverride() - cleanup_timestamps_.top()); + } cleanup_timestamps_.pop(); } @@ -1011,7 +1016,7 @@ void ThreadGroupImpl::DidUpdateCanRunPolicy() { void ThreadGroupImpl::EnsureEnoughWorkersLockRequired( BaseScopedWorkersExecutor* base_executor) { // Don't do anything if the thread group isn't started. - if (max_tasks_ == 0) + if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_)) return; ScopedWorkersExecutor* executor = diff --git a/chromium/base/task/thread_pool/thread_group_impl.h b/chromium/base/task/thread_pool/thread_group_impl.h index 3df96cfdef0..e0782bf2614 100644 --- a/chromium/base/task/thread_pool/thread_group_impl.h +++ b/chromium/base/task/thread_pool/thread_group_impl.h @@ -20,7 +20,6 @@ #include "base/memory/ref_counted.h" #include "base/optional.h" #include "base/strings/string_piece.h" -#include "base/synchronization/atomic_flag.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/waitable_event.h" #include "base/task/thread_pool/task.h" @@ -329,10 +328,8 @@ class BASE_EXPORT ThreadGroupImpl : public ThreadGroup { std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_ GUARDED_BY(lock_); -#if DCHECK_IS_ON() // Set at the start of JoinForTesting(). - AtomicFlag join_for_testing_started_; -#endif + bool join_for_testing_started_ GUARDED_BY(lock_) = false; // ThreadPool.DetachDuration.[thread group name] histogram. Intentionally // leaked. diff --git a/chromium/base/task/thread_pool/thread_group_unittest.cc b/chromium/base/task/thread_pool/thread_group_unittest.cc index cb4481c3cf1..3ed430a60db 100644 --- a/chromium/base/task/thread_pool/thread_group_unittest.cc +++ b/chromium/base/task/thread_pool/thread_group_unittest.cc @@ -53,6 +53,7 @@ using ThreadGroupNativeType = #endif constexpr size_t kMaxTasks = 4; +constexpr size_t kTooManyTasks = 1000; // By default, tests allow half of the thread group to be used by best-effort // tasks. constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2; @@ -654,6 +655,50 @@ TEST_P(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) { task_tracker_.FlushForTesting(); } +// Verify that Cancel() on a job stops running the worker task and causes +// current workers to yield. +TEST_P(ThreadGroupTest, CancelJobTaskSource) { + StartThreadGroup(); + + CheckedLock tasks_running_lock; + std::unique_ptr<ConditionVariable> tasks_running_cv = + tasks_running_lock.CreateConditionVariable(); + bool tasks_running = false; + + // Schedule a big number of tasks. + auto job_task = base::MakeRefCounted<test::MockJobTask>( + BindLambdaForTesting([&](experimental::JobDelegate* delegate) { + { + CheckedAutoLock auto_lock(tasks_running_lock); + tasks_running = true; + } + tasks_running_cv->Signal(); + + while (!delegate->ShouldYield()) { + } + }), + /* num_tasks_to_run */ kTooManyTasks); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, {ThreadPool()}, &mock_pooled_task_runner_delegate_); + + mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source); + experimental::JobHandle job_handle = + internal::JobTaskSource::CreateJobHandle(task_source); + + // Wait for at least 1 task to start running. + { + CheckedAutoLock auto_lock(tasks_running_lock); + while (!tasks_running) + tasks_running_cv->Wait(); + } + + // Cancels pending tasks and unblocks running ones. + job_handle.Cancel(); + + // This should not block since the job got cancelled. + task_tracker_.FlushForTesting(); +} + // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule // tasks with the intended concurrency. TEST_P(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) { @@ -733,6 +778,37 @@ TEST_P(ThreadGroupTest, ScheduleEmptyJobTaskSource) { task_tracker_.FlushForTesting(); } +// Verify that Join() on a job contributes to max concurrency and waits for all +// workers to return. +TEST_P(ThreadGroupTest, JoinJobTaskSource) { + StartThreadGroup(); + + WaitableEvent threads_continue; + RepeatingClosure threads_continue_barrier = BarrierClosure( + kMaxTasks + 1, + BindOnce(&WaitableEvent::Signal, Unretained(&threads_continue))); + + auto job_task = base::MakeRefCounted<test::MockJobTask>( + BindLambdaForTesting([&](experimental::JobDelegate*) { + threads_continue_barrier.Run(); + test::WaitWithoutBlockingObserver(&threads_continue); + }), + /* num_tasks_to_run */ kMaxTasks + 1); + scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource( + FROM_HERE, {ThreadPool()}, &mock_pooled_task_runner_delegate_); + + mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source); + experimental::JobHandle job_handle = + internal::JobTaskSource::CreateJobHandle(task_source); + job_handle.Join(); + // All worker tasks should complete before Join() returns. + EXPECT_EQ(0U, job_task->GetMaxConcurrency()); + thread_group_->JoinForTesting(); + EXPECT_EQ(1U, task_source->HasOneRef()); + // Prevent TearDown() from calling JoinForTesting() again. + thread_group_ = nullptr; +} + // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently // in a thread group does not affect JobTaskSource with a priority that was // increased from BEST_EFFORT to USER_BLOCKING. diff --git a/chromium/base/task/thread_pool/thread_pool_impl.cc b/chromium/base/task/thread_pool/thread_pool_impl.cc index 64d580d08ad..ff1bfa78211 100644 --- a/chromium/base/task/thread_pool/thread_pool_impl.cc +++ b/chromium/base/task/thread_pool/thread_pool_impl.cc @@ -77,20 +77,23 @@ ThreadPoolImpl::ThreadPoolImpl(StringPiece histogram_label, &delayed_task_manager_), has_disable_best_effort_switch_(HasDisableBestEffortTasksSwitch()), tracked_ref_factory_(this) { - DCHECK(!histogram_label.empty()); - foreground_thread_group_ = std::make_unique<ThreadGroupImpl>( - JoinString( - {histogram_label, kForegroundPoolEnvironmentParams.name_suffix}, "."), + histogram_label.empty() + ? std::string() + : JoinString( + {histogram_label, kForegroundPoolEnvironmentParams.name_suffix}, + "."), kForegroundPoolEnvironmentParams.name_suffix, kForegroundPoolEnvironmentParams.priority_hint, task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef()); if (CanUseBackgroundPriorityForWorkerThread()) { background_thread_group_ = std::make_unique<ThreadGroupImpl>( - JoinString( - {histogram_label, kBackgroundPoolEnvironmentParams.name_suffix}, - "."), + histogram_label.empty() + ? std::string() + : JoinString({histogram_label, + kBackgroundPoolEnvironmentParams.name_suffix}, + "."), kBackgroundPoolEnvironmentParams.name_suffix, kBackgroundPoolEnvironmentParams.priority_hint, task_tracker_->GetTrackedRef(), tracked_ref_factory_.GetTrackedRef()); @@ -423,6 +426,14 @@ bool ThreadPoolImpl::EnqueueJobTaskSource( return true; } +void ThreadPoolImpl::RemoveJobTaskSource( + scoped_refptr<JobTaskSource> task_source) { + auto transaction = task_source->BeginTransaction(); + ThreadGroup* const current_thread_group = + GetThreadGroupForTraits(transaction.traits()); + current_thread_group->RemoveTaskSource(*task_source); +} + bool ThreadPoolImpl::IsRunningPoolWithTraits(const TaskTraits& traits) const { return GetThreadGroupForTraits(traits)->IsBoundToCurrentThread(); } @@ -455,7 +466,7 @@ void ThreadPoolImpl::UpdatePriority(scoped_refptr<TaskSource> task_source, // |task_source| is changing thread groups; remove it from its current // thread group and reenqueue it. auto registered_task_source = - current_thread_group->RemoveTaskSource(task_source); + current_thread_group->RemoveTaskSource(*task_source); if (registered_task_source) { DCHECK(task_source); new_thread_group->PushTaskSourceAndWakeUpWorkers( diff --git a/chromium/base/task/thread_pool/thread_pool_impl.h b/chromium/base/task/thread_pool/thread_pool_impl.h index 049ee6de692..8b3a6760ce9 100644 --- a/chromium/base/task/thread_pool/thread_pool_impl.h +++ b/chromium/base/task/thread_pool/thread_pool_impl.h @@ -60,8 +60,8 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance, TaskTracker; #endif - // Creates a ThreadPoolImpl with a production TaskTracker. - //|histogram_label| is used to label histograms, it must not be empty. + // Creates a ThreadPoolImpl with a production TaskTracker. |histogram_label| + // is used to label histograms. No histograms are recorded if it is empty. explicit ThreadPoolImpl(StringPiece histogram_label); // For testing only. Creates a ThreadPoolImpl with a custom TaskTracker. @@ -103,6 +103,7 @@ class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance, // PooledTaskRunnerDelegate: bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override; + void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override; void UpdatePriority(scoped_refptr<TaskSource> task_source, TaskPriority priority) override; |