diff options
Diffstat (limited to 'src/3rdparty/resonance-audio/resonance_audio/utils/task_thread_pool.cc')
-rw-r--r-- | src/3rdparty/resonance-audio/resonance_audio/utils/task_thread_pool.cc | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/src/3rdparty/resonance-audio/resonance_audio/utils/task_thread_pool.cc b/src/3rdparty/resonance-audio/resonance_audio/utils/task_thread_pool.cc new file mode 100644 index 000000000..485e834df --- /dev/null +++ b/src/3rdparty/resonance-audio/resonance_audio/utils/task_thread_pool.cc @@ -0,0 +1,249 @@ +/* +Copyright 2018 Google Inc. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS-IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "utils/task_thread_pool.h" + +#include <thread> + +#include "base/integral_types.h" +#include "base/logging.h" + +namespace vraudio { + +// A simple worker thread wrapper, to be used by TaskThreadPool. +class TaskThreadPool::WorkerThread { + public: + // Constructor. + // + // @param parent_pool The |TaskThreadPool| which owns and manages this + // |WorkerThread| instance. + WorkerThread() + : parent_pool_(), task_closure_(), task_loop_triggered_(false) {} + + // Copy constructor. Necessary for allowing this class to be storied in + // std::vector<WorkerThread> container class. + // + // @param other |WorkerThread| instance this instance will be copied from. + WorkerThread(const TaskThreadPool::WorkerThread& other) + : parent_pool_(), task_closure_(), task_loop_triggered_(false) {} + + // Destructor. + ~WorkerThread(); + + // Sets the parent pool for this |WorkerThread|. + // + // @param parent_pool The |TaskThreadPool| which owns this |WorkerThread|. + // @return True indicates that the thread could be started. + bool SetParentAndStart(TaskThreadPool* parent_pool); + + // Indicates that the worker thread should run the |TaskClosure| task. + // + // @param task_closure The task function which |TaskThreadPool| assigns to + // this |WorkerThread|. + void Run(TaskClosure task_closure); + + // Waits for the |WorkerThread| task loop to exit, for shutdown. + void Join(); + + // Checks whether this |WorkerThread| is available for a task assignment. + bool IsAvailable() const; + + private: + // The task loop for this |WorkerThread|. This function will wait for an + // assigned task, execute the task, and then reset itself to wait for the next + // task. + void TaskLoop(); + + // This |WorkerThread|'s parent |TaskThreadPool|. + TaskThreadPool* parent_pool_; + + // Condition allowing the |TaskThreadPool| to trigger this worker thread to + // continue once it has been given a work assignment. + std::condition_variable execute_task_condition_; + + // Mutex for receiving |execute_task_condition_| notification. + std::mutex execute_task_mutex_; + + // The current task binding which the Worker Thread has been asked to + // execute. + TaskClosure task_closure_; + + // An atomic boolean to indicate that a task loop trigger has occurred. This + // may happen even when a task has not been assigned during shutdown. + std::atomic<bool> task_loop_triggered_; + + // The worker thread. + std::thread task_thread_; +}; + +TaskThreadPool::TaskThreadPool() + : num_worker_threads_available_(0), + is_pool_running_(false) {} + +TaskThreadPool::~TaskThreadPool() { StopThreadPool(); } + +bool TaskThreadPool::StartThreadPool(size_t num_worker_threads) { + if (is_pool_running_) { + return true; + } + is_pool_running_ = true; + worker_threads_.resize(num_worker_threads); + + // Start all worker threads. + for (auto& worker_thread : worker_threads_) { + bool thread_started = worker_thread.SetParentAndStart(this); + if (!thread_started) { + StopThreadPool(); + return false; + } + } + + // Wait for all worker threads to be launched. + std::unique_lock<std::mutex> worker_lock(worker_available_mutex_); + worker_available_condition_.wait(worker_lock, [this, num_worker_threads]() { + return !is_pool_running_.load() || num_worker_threads_available_.load() == + static_cast<int>(num_worker_threads); + }); + + return true; +} + +void TaskThreadPool::StopThreadPool() { + if (!is_pool_running_) { + return; + } + // Shut down all active worker threads. + { + std::lock_guard<std::mutex> worker_lock(worker_available_mutex_); + is_pool_running_ = false; + } + worker_available_condition_.notify_one(); + + // Join and destruct workers. + worker_threads_.resize(0); +} + +bool TaskThreadPool::WaitUntilWorkerBecomesAvailable() { + if (!is_pool_running_.load()) { + return false; + } + if (num_worker_threads_available_.load() > 0) { + return true; + } + std::unique_lock<std::mutex> worker_lock(worker_available_mutex_); + worker_available_condition_.wait(worker_lock, [this]() { + return (num_worker_threads_available_.load() > 0) || + !is_pool_running_.load(); + }); + return num_worker_threads_available_.load() > 0 && is_pool_running_.load(); +} + +bool TaskThreadPool::RunOnWorkerThread(TaskThreadPool::TaskClosure closure) { + if (!is_pool_running_.load() || num_worker_threads_available_.load() == 0) { + return false; + } + // Find the first available worker thread. + WorkerThread* available_worker_thread = nullptr; + for (auto& worker_thread : worker_threads_) { + if (worker_thread.IsAvailable()) { + available_worker_thread = &worker_thread; + break; + } + } + DCHECK(available_worker_thread); + { + std::lock_guard<std::mutex> lock(worker_available_mutex_); + --num_worker_threads_available_; + } + available_worker_thread->Run(std::move(closure)); + return true; +} + +size_t TaskThreadPool::GetAvailableTaskThreadCount() const { + return num_worker_threads_available_.load(); +} + +bool TaskThreadPool::IsPoolRunning() { return is_pool_running_.load(); } + +void TaskThreadPool::SignalWorkerAvailable() { + { + std::lock_guard<std::mutex> lock(worker_available_mutex_); + ++num_worker_threads_available_; + } + worker_available_condition_.notify_one(); +} + +TaskThreadPool::WorkerThread::~WorkerThread() { Join(); } + +bool TaskThreadPool::WorkerThread::SetParentAndStart( + TaskThreadPool* parent_pool) { + parent_pool_ = parent_pool; + + // Start the worker thread. + task_thread_ = std::thread(std::bind(&WorkerThread::TaskLoop, this)); + return true; +} + +void TaskThreadPool::WorkerThread::Run(TaskThreadPool::TaskClosure closure) { + if (closure) { + task_closure_ = std::move(closure); + { + std::lock_guard<std::mutex> lock(execute_task_mutex_); + task_loop_triggered_ = true; + } + execute_task_condition_.notify_one(); + } +} + +void TaskThreadPool::WorkerThread::Join() { + DCHECK(!parent_pool_->IsPoolRunning()); + // Aquire and release lock to assure that the notify cannot happen between the + // check of the predicate and wait of the |push_conditional_|. + { std::lock_guard<std::mutex> lock(execute_task_mutex_); } + execute_task_condition_.notify_one(); + if (task_thread_.joinable()) { + task_thread_.join(); + } +} + +bool TaskThreadPool::WorkerThread::IsAvailable() const { + return !task_loop_triggered_.load(); +} + +void TaskThreadPool::WorkerThread::TaskLoop() { + // Signal back to the parent thread pool that this thread has started and is + // ready for use. + task_loop_triggered_ = false; + + while (parent_pool_->IsPoolRunning() || task_closure_ != nullptr) { + parent_pool_->SignalWorkerAvailable(); + std::unique_lock<std::mutex> task_lock(execute_task_mutex_); + execute_task_condition_.wait(task_lock, [this]() { + return task_loop_triggered_.load() || !parent_pool_->IsPoolRunning(); + }); + + // Execute the assigned task. + if (task_closure_ != nullptr) { + task_closure_(); + + // Clear the assigned task and return to ready state. + task_closure_ = nullptr; + } + task_loop_triggered_ = false; + } +} + +} // namespace vraudio |