summaryrefslogtreecommitdiffstats
path: root/chromium/base/task/post_job.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/base/task/post_job.cc')
-rw-r--r--chromium/base/task/post_job.cc94
1 files changed, 92 insertions, 2 deletions
diff --git a/chromium/base/task/post_job.cc b/chromium/base/task/post_job.cc
index e57b32e4b40..3c83068995a 100644
--- a/chromium/base/task/post_job.cc
+++ b/chromium/base/task/post_job.cc
@@ -4,8 +4,11 @@
#include "base/task/post_job.h"
+#include "base/task/scoped_set_task_priority_for_current_thread.h"
#include "base/task/thread_pool/job_task_source.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
+#include "base/task/thread_pool/thread_pool_impl.h"
+#include "base/task/thread_pool/thread_pool_instance.h"
namespace base {
namespace experimental {
@@ -16,7 +19,6 @@ JobDelegate::JobDelegate(
: task_source_(task_source),
pooled_task_runner_delegate_(pooled_task_runner_delegate) {
DCHECK(task_source_);
- DCHECK(pooled_task_runner_delegate_);
#if DCHECK_IS_ON()
recorded_increase_version_ = task_source_->GetConcurrencyIncreaseVersion();
// Record max concurrency before running the worker task.
@@ -42,7 +44,9 @@ bool JobDelegate::ShouldYield() {
AssertExpectedConcurrency(recorded_max_concurrency_);
#endif // DCHECK_IS_ON()
const bool should_yield =
- pooled_task_runner_delegate_->ShouldYield(task_source_);
+ task_source_->ShouldYield() ||
+ (pooled_task_runner_delegate_ &&
+ pooled_task_runner_delegate_->ShouldYield(task_source_));
#if DCHECK_IS_ON()
last_should_yield_ = should_yield;
@@ -98,5 +102,91 @@ void JobDelegate::AssertExpectedConcurrency(size_t expected_max_concurrency) {
#endif // DCHECK_IS_ON()
}
+JobHandle::JobHandle() = default;
+
+JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
+ : task_source_(std::move(task_source)) {}
+
+JobHandle::~JobHandle() {
+ DCHECK(!task_source_)
+ << "The Job must be cancelled, detached or joined before its "
+ "JobHandle is destroyed.";
+}
+
+JobHandle::JobHandle(JobHandle&&) = default;
+
+JobHandle& JobHandle::operator=(JobHandle&& other) {
+ DCHECK(!task_source_)
+ << "The Job must be cancelled, detached or joined before its "
+ "JobHandle is re-assigned.";
+ task_source_ = std::move(other.task_source_);
+ return *this;
+}
+
+void JobHandle::UpdatePriority(TaskPriority new_priority) {
+ task_source_->delegate()->UpdatePriority(task_source_, new_priority);
+}
+
+void JobHandle::NotifyConcurrencyIncrease() {
+ task_source_->NotifyConcurrencyIncrease();
+}
+
+void JobHandle::Join() {
+ DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
+ task_source_->priority_racy())
+ << "Join may not be called on Job with higher priority than the current "
+ "one.";
+ UpdatePriority(internal::GetTaskPriorityForCurrentThread());
+ bool must_run = task_source_->WillJoin();
+ while (must_run)
+ must_run = task_source_->RunJoinTask();
+ // Remove |task_source_| from the ThreadPool to prevent access to
+ // |max_concurrency_callback| after Join().
+ task_source_->delegate()->RemoveJobTaskSource(task_source_);
+ task_source_ = nullptr;
+}
+
+void JobHandle::Cancel() {
+ task_source_->Cancel();
+ Join();
+}
+
+void JobHandle::CancelAndDetach() {
+ task_source_->Cancel();
+ Detach();
+}
+
+void JobHandle::Detach() {
+ DCHECK(task_source_);
+ task_source_ = nullptr;
+}
+
+JobHandle PostJob(const Location& from_here,
+ const TaskTraits& traits,
+ RepeatingCallback<void(JobDelegate*)> worker_task,
+ RepeatingCallback<size_t()> max_concurrency_callback) {
+ DCHECK(ThreadPoolInstance::Get())
+ << "Ref. Prerequisite section of post_task.h.\n\n"
+ "Hint: if this is in a unit test, you're likely merely missing a "
+ "base::test::TaskEnvironment member in your fixture.\n";
+ DCHECK(traits.use_thread_pool())
+ << "The base::ThreadPool() trait is mandatory with PostJob().";
+ DCHECK_EQ(traits.extension_id(),
+ TaskTraitsExtensionStorage::kInvalidExtensionId)
+ << "Extension traits cannot be used with PostJob().";
+ TaskTraits adjusted_traits = traits;
+ adjusted_traits.InheritPriority(internal::GetTaskPriorityForCurrentThread());
+ auto task_source = base::MakeRefCounted<internal::JobTaskSource>(
+ from_here, adjusted_traits, std::move(worker_task),
+ std::move(max_concurrency_callback),
+ static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
+ const bool queued =
+ static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())
+ ->EnqueueJobTaskSource(task_source);
+ if (queued)
+ return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
+ return JobHandle();
+}
+
} // namespace experimental
} // namespace base \ No newline at end of file