summaryrefslogtreecommitdiffstats
path: root/chromium/base/task_scheduler/scheduler_worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'chromium/base/task_scheduler/scheduler_worker.cc')
-rw-r--r--chromium/base/task_scheduler/scheduler_worker.cc222
1 files changed, 222 insertions, 0 deletions
diff --git a/chromium/base/task_scheduler/scheduler_worker.cc b/chromium/base/task_scheduler/scheduler_worker.cc
new file mode 100644
index 00000000000..fcbc28382fa
--- /dev/null
+++ b/chromium/base/task_scheduler/scheduler_worker.cc
@@ -0,0 +1,222 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/scheduler_worker.h"
+
+#include <stddef.h>
+
+#include <utility>
+
+#include "base/logging.h"
+#include "base/task_scheduler/task_tracker.h"
+
+namespace base {
+namespace internal {
+
+class SchedulerWorker::Thread : public PlatformThread::Delegate {
+ public:
+ ~Thread() override = default;
+
+ static std::unique_ptr<Thread> Create(SchedulerWorker* outer) {
+ std::unique_ptr<Thread> thread(new Thread(outer));
+ thread->Initialize();
+ if (thread->thread_handle_.is_null())
+ return nullptr;
+ return thread;
+ }
+
+ // PlatformThread::Delegate.
+ void ThreadMain() override {
+ // Set if this thread was detached.
+ std::unique_ptr<Thread> detached_thread;
+
+ outer_->delegate_->OnMainEntry(outer_);
+
+ // A SchedulerWorker starts out waiting for work.
+ WaitForWork();
+
+ while (!outer_->task_tracker_->shutdown_completed() &&
+ !outer_->ShouldExitForTesting()) {
+ DCHECK(outer_);
+ // Get the sequence containing the next task to execute.
+ scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_);
+ if (!sequence) {
+ if (outer_->delegate_->CanDetach(outer_)) {
+ detached_thread = outer_->Detach();
+ if (detached_thread) {
+ DCHECK_EQ(detached_thread.get(), this);
+ PlatformThread::Detach(thread_handle_);
+ outer_ = nullptr;
+ break;
+ }
+ }
+ WaitForWork();
+ continue;
+ }
+
+ outer_->task_tracker_->RunTask(sequence->PeekTask());
+
+ const bool sequence_became_empty = sequence->PopTask();
+
+ // If |sequence| isn't empty immediately after the pop, re-enqueue it to
+ // maintain the invariant that a non-empty Sequence is always referenced
+ // by either a PriorityQueue or a SchedulerWorker. If it is empty
+ // and there are live references to it, it will be enqueued when a Task is
+ // added to it. Otherwise, it will be destroyed at the end of this scope.
+ if (!sequence_became_empty)
+ outer_->delegate_->ReEnqueueSequence(std::move(sequence));
+
+ // Calling WakeUp() guarantees that this SchedulerWorker will run
+ // Tasks from Sequences returned by the GetWork() method of |delegate_|
+ // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
+ // this invariant and avoids a useless loop iteration before going to
+ // sleep if WakeUp() is called while this SchedulerWorker is awake.
+ wake_up_event_.Reset();
+ }
+
+ // If a wake up is pending and we successfully detached, somehow |outer_|
+ // was able to signal us which means it probably thinks we're still alive.
+ // This is bad as it will cause the WakeUp to no-op and |outer_| will be
+ // stuck forever.
+ DCHECK(!detached_thread || !IsWakeUpPending()) <<
+ "This thread was detached and woken up at the same time.";
+ }
+
+ void Join() { PlatformThread::Join(thread_handle_); }
+
+ void WakeUp() { wake_up_event_.Signal(); }
+
+ bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); }
+
+ private:
+ Thread(SchedulerWorker* outer)
+ : outer_(outer),
+ wake_up_event_(WaitableEvent::ResetPolicy::MANUAL,
+ WaitableEvent::InitialState::NOT_SIGNALED) {
+ DCHECK(outer_);
+ }
+
+ void Initialize() {
+ constexpr size_t kDefaultStackSize = 0;
+ PlatformThread::CreateWithPriority(kDefaultStackSize, this,
+ &thread_handle_,
+ outer_->thread_priority_);
+ }
+
+ void WaitForWork() {
+ DCHECK(outer_);
+ const TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout();
+ if (sleep_time.is_max()) {
+ // Calling TimedWait with TimeDelta::Max is not recommended per
+ // http://crbug.com/465948.
+ wake_up_event_.Wait();
+ } else {
+ wake_up_event_.TimedWait(sleep_time);
+ }
+ wake_up_event_.Reset();
+ }
+
+ PlatformThreadHandle thread_handle_;
+
+ SchedulerWorker* outer_;
+
+ // Event signaled to wake up this thread.
+ WaitableEvent wake_up_event_;
+
+ DISALLOW_COPY_AND_ASSIGN(Thread);
+};
+
+std::unique_ptr<SchedulerWorker> SchedulerWorker::Create(
+ ThreadPriority thread_priority,
+ std::unique_ptr<Delegate> delegate,
+ TaskTracker* task_tracker,
+ InitialState initial_state) {
+ std::unique_ptr<SchedulerWorker> worker(
+ new SchedulerWorker(thread_priority, std::move(delegate), task_tracker));
+ // Creation happens before any other thread can reference this one, so no
+ // synchronization is necessary.
+ if (initial_state == SchedulerWorker::InitialState::ALIVE) {
+ worker->CreateThread();
+ if (!worker->thread_) {
+ return nullptr;
+ }
+ }
+
+ return worker;
+}
+
+SchedulerWorker::~SchedulerWorker() {
+ // It is unexpected for |thread_| to be alive and for SchedulerWorker to
+ // destroy since SchedulerWorker owns the delegate needed by |thread_|.
+ // For testing, this generally means JoinForTesting was not called.
+ DCHECK(!thread_);
+}
+
+void SchedulerWorker::WakeUp() {
+ AutoSchedulerLock auto_lock(thread_lock_);
+ if (!thread_)
+ CreateThreadAssertSynchronized();
+
+ if (thread_)
+ thread_->WakeUp();
+}
+
+void SchedulerWorker::JoinForTesting() {
+ {
+ AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
+ should_exit_for_testing_ = true;
+ }
+ WakeUp();
+
+ // Normally holding a lock and joining is dangerous. However, since this is
+ // only for testing, we're okay since the only scenario that could impact this
+ // is a call to Detach, which is disallowed by having the delegate always
+ // return false for the CanDetach call.
+ AutoSchedulerLock auto_lock(thread_lock_);
+ if (thread_)
+ thread_->Join();
+
+ thread_.reset();
+}
+
+bool SchedulerWorker::ThreadAliveForTesting() const {
+ AutoSchedulerLock auto_lock(thread_lock_);
+ return !!thread_;
+}
+
+SchedulerWorker::SchedulerWorker(ThreadPriority thread_priority,
+ std::unique_ptr<Delegate> delegate,
+ TaskTracker* task_tracker)
+ : thread_priority_(thread_priority),
+ delegate_(std::move(delegate)),
+ task_tracker_(task_tracker) {
+ DCHECK(delegate_);
+ DCHECK(task_tracker_);
+}
+
+std::unique_ptr<SchedulerWorker::Thread> SchedulerWorker::Detach() {
+ DCHECK(!ShouldExitForTesting()) << "Worker was already joined";
+ AutoSchedulerLock auto_lock(thread_lock_);
+ // If a wakeup is pending, then a WakeUp() came in while we were deciding to
+ // detach. This means we can't go away anymore since we would break the
+ // guarantee that we call GetWork() after a successful wakeup.
+ return thread_->IsWakeUpPending() ? nullptr : std::move(thread_);
+}
+
+void SchedulerWorker::CreateThread() {
+ thread_ = Thread::Create(this);
+}
+
+void SchedulerWorker::CreateThreadAssertSynchronized() {
+ thread_lock_.AssertAcquired();
+ CreateThread();
+}
+
+bool SchedulerWorker::ShouldExitForTesting() const {
+ AutoSchedulerLock auto_lock(should_exit_for_testing_lock_);
+ return should_exit_for_testing_;
+}
+
+} // namespace internal
+} // namespace base