From 1d1b84a579eff2e245e491012104b7c5b5d655d1 Mon Sep 17 00:00:00 2001 From: Mika Salmela Date: Fri, 24 Apr 2015 13:21:41 +0300 Subject: Implement ThreadPooler using QThreadPool Re-implemented ThreadPooler to use QRunnable and QThreadPool. Change-Id: I3da0182489b648dcb6b671c57b4474c183bddeec Reviewed-by: Miikka Heikkinen --- src/core/jobs/dependencyhandler.cpp | 39 +++--- src/core/jobs/dependencyhandler_p.h | 15 +-- src/core/jobs/jobrunner.cpp | 91 -------------- src/core/jobs/jobrunner_p.h | 88 -------------- src/core/jobs/jobs.pri | 3 - src/core/jobs/qaspectjobmanager.cpp | 48 +++++--- src/core/jobs/qaspectjobmanager_p.h | 3 +- src/core/jobs/qthreadpooler.cpp | 230 +++++++++--------------------------- src/core/jobs/qthreadpooler_p.h | 31 +++-- src/core/jobs/qthreadpooler_p_p.h | 93 --------------- src/core/jobs/task.cpp | 56 ++++----- src/core/jobs/task_p.h | 49 ++++---- 12 files changed, 184 insertions(+), 562 deletions(-) delete mode 100644 src/core/jobs/jobrunner.cpp delete mode 100644 src/core/jobs/jobrunner_p.h delete mode 100644 src/core/jobs/qthreadpooler_p_p.h (limited to 'src/core') diff --git a/src/core/jobs/dependencyhandler.cpp b/src/core/jobs/dependencyhandler.cpp index 9778ede58..118602a61 100644 --- a/src/core/jobs/dependencyhandler.cpp +++ b/src/core/jobs/dependencyhandler.cpp @@ -47,13 +47,13 @@ namespace { struct ByDepender { typedef bool result_type; - bool operator()(const QSharedPointer &lhs, const QSharedPointer &rhs) const Q_DECL_NOTHROW - { return Op >()(lhs, rhs); } + bool operator()(const RunnableInterface *lhs, const RunnableInterface *rhs) const Q_DECL_NOTHROW + { return Op()(lhs, rhs); } - bool operator()(const QSharedPointer &lhs, const Dependency &rhs) const Q_DECL_NOTHROW + bool operator()(const RunnableInterface *lhs, const Dependency &rhs) const Q_DECL_NOTHROW { return operator()(lhs, rhs.depender); } - bool operator()(const Dependency &lhs, const QSharedPointer &rhs) const Q_DECL_NOTHROW + bool operator()(const Dependency &lhs, const RunnableInterface *rhs) const Q_DECL_NOTHROW { return operator()(lhs.depender, rhs); } bool operator()(const Dependency &lhs, const Dependency &rhs) const Q_DECL_NOTHROW @@ -62,12 +62,17 @@ namespace { struct DependeeEquals : std::unary_function { - QSharedPointer dependee; - explicit DependeeEquals(QSharedPointer dependee) - : dependee(qMove(dependee)) {} + const RunnableInterface *dependee; + QVector *freedList; + explicit DependeeEquals(const RunnableInterface *dependee, QVector *freedList) + : dependee(qMove(dependee)), freedList(qMove(freedList)) {} bool operator()(const Dependency &candidate) const { - return dependee == candidate.dependee; + if (dependee == candidate.dependee) { + freedList->append(candidate.depender); + return true; + } + return false; } }; @@ -91,7 +96,7 @@ void DependencyHandler::addDependencies(QVector dependencies) { std::sort(dependencies.begin(), dependencies.end(), ByDependerThenDependee()); - const QMutexLocker locker(&m_mutex); + const QMutexLocker locker(m_mutex); QVector newDependencyMap; newDependencyMap.reserve(dependencies.size() + m_dependencyMap.size()); @@ -101,9 +106,9 @@ void DependencyHandler::addDependencies(QVector dependencies) m_dependencyMap.swap(newDependencyMap); // commit } -bool DependencyHandler::hasDependency(const QSharedPointer &depender) +bool DependencyHandler::hasDependency(const RunnableInterface *depender) { - const QMutexLocker locker(&m_mutex); + // The caller has to set the mutex, which is QThreadPooler::enqueueTasks return std::binary_search(m_dependencyMap.begin(), m_dependencyMap.end(), depender, ByDepender()); @@ -113,13 +118,17 @@ bool DependencyHandler::hasDependency(const QSharedPointer &depen * Removes all the entries on the m_dependencyMap that have given task as a dependee, * i.e. entries where the dependency is on the given task. */ -void DependencyHandler::freeDependencies(const QSharedPointer &dependee) +QVector DependencyHandler::freeDependencies(const RunnableInterface *dependee) { - const QMutexLocker locker(&m_mutex); + const QMutexLocker locker(m_mutex); - m_dependencyMap.erase(std::remove_if(m_dependencyMap.begin(), m_dependencyMap.end(), - DependeeEquals(dependee)), + QVector freedList; + m_dependencyMap.erase(std::remove_if(m_dependencyMap.begin(), + m_dependencyMap.end(), + DependeeEquals(dependee, &freedList)), m_dependencyMap.end()); + + return freedList; } } // namespace Qt3D diff --git a/src/core/jobs/dependencyhandler_p.h b/src/core/jobs/dependencyhandler_p.h index 0d2aafaa6..e48113b80 100644 --- a/src/core/jobs/dependencyhandler_p.h +++ b/src/core/jobs/dependencyhandler_p.h @@ -49,18 +49,18 @@ namespace Qt3D { struct Dependency { Dependency() {} - Dependency(QSharedPointer depender, QSharedPointer dependee) + Dependency(RunnableInterface *depender, RunnableInterface *dependee) : depender(qMove(depender)), dependee(qMove(dependee)) {} - QSharedPointer depender; - QSharedPointer dependee; + RunnableInterface *depender; + RunnableInterface *dependee; }; } // namespace Qt3D template <> -class QTypeInfo : public QTypeInfoMerger > {}; +class QTypeInfo : public QTypeInfoMerger {}; namespace Qt3D { @@ -80,14 +80,15 @@ public: DependencyHandler(); void addDependencies(QVector dependencies); - bool hasDependency(const QSharedPointer &depender); - void freeDependencies(const QSharedPointer &dependee); + bool hasDependency(const RunnableInterface *depender); + QVector freeDependencies(const RunnableInterface *dependee); + void setMutex(QMutex *mutex) { m_mutex = mutex; } private: Q_DISABLE_COPY(DependencyHandler) QVector m_dependencyMap; - mutable QMutex m_mutex; + QMutex *m_mutex; }; } // namespace Qt3D diff --git a/src/core/jobs/jobrunner.cpp b/src/core/jobs/jobrunner.cpp deleted file mode 100644 index ab5555585..000000000 --- a/src/core/jobs/jobrunner.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/**************************************************************************** -** -** Copyright (C) 2015 The Qt Company Ltd. -** Contact: http://www.qt.io/licensing/ -** -** This file is part of the Qt3D module of the Qt Toolkit. -** -** $QT_BEGIN_LICENSE:LGPL3$ -** Commercial License Usage -** Licensees holding valid commercial Qt licenses may use this file in -** accordance with the commercial license agreement provided with the -** Software or, alternatively, in accordance with the terms contained in -** a written agreement between you and The Qt Company. For licensing terms -** and conditions see http://www.qt.io/terms-conditions. For further -** information use the contact form at http://www.qt.io/contact-us. -** -** GNU Lesser General Public License Usage -** Alternatively, this file may be used under the terms of the GNU Lesser -** General Public License version 3 as published by the Free Software -** Foundation and appearing in the file LICENSE.LGPLv3 included in the -** packaging of this file. Please review the following information to -** ensure the GNU Lesser General Public License version 3 requirements -** will be met: https://www.gnu.org/licenses/lgpl.html. -** -** GNU General Public License Usage -** Alternatively, this file may be used under the terms of the GNU -** General Public License version 2.0 or later as published by the Free -** Software Foundation and appearing in the file LICENSE.GPL included in -** the packaging of this file. Please review the following information to -** ensure the GNU General Public License version 2.0 requirements will be -** met: http://www.gnu.org/licenses/gpl-2.0.html. -** -** $QT_END_LICENSE$ -** -****************************************************************************/ - -#include "jobrunner_p.h" -#include "qthreadpooler_p.h" - -#include -#include -#include - -QT_BEGIN_NAMESPACE - -namespace Qt3D { - -JobRunner::JobRunner(QThreadPooler *parent) - : QThread(parent), - m_abort(0), - m_pooler(parent), - m_jobAvailable(Q_NULLPTR), - m_mutex(Q_NULLPTR) -{ - QObject::connect(parent, SIGNAL(shuttingDown()), this, SLOT(shutDown())); -} - -JobRunner::~JobRunner() -{ - shutDown(); -} - -void JobRunner::run() -{ - Q_ASSERT(m_jobAvailable != Q_NULLPTR); - - while (!m_abort.load()) { - if (const QSharedPointer task = m_pooler->nextTask()) { - task->run(task, this); - m_pooler->stopRunning(); - } else { - suspend(); - } - } -} - -void JobRunner::suspend() -{ - const QMutexLocker locker(m_mutex); - - m_jobAvailable->wait(m_mutex); -} - -void JobRunner::shutDown() -{ - m_abort.store(1); -} - -} // namespace Qt3D - -QT_END_NAMESPACE diff --git a/src/core/jobs/jobrunner_p.h b/src/core/jobs/jobrunner_p.h deleted file mode 100644 index 14b5213e9..000000000 --- a/src/core/jobs/jobrunner_p.h +++ /dev/null @@ -1,88 +0,0 @@ -/**************************************************************************** -** -** Copyright (C) 2015 The Qt Company Ltd. -** Contact: http://www.qt.io/licensing/ -** -** This file is part of the Qt3D module of the Qt Toolkit. -** -** $QT_BEGIN_LICENSE:LGPL3$ -** Commercial License Usage -** Licensees holding valid commercial Qt licenses may use this file in -** accordance with the commercial license agreement provided with the -** Software or, alternatively, in accordance with the terms contained in -** a written agreement between you and The Qt Company. For licensing terms -** and conditions see http://www.qt.io/terms-conditions. For further -** information use the contact form at http://www.qt.io/contact-us. -** -** GNU Lesser General Public License Usage -** Alternatively, this file may be used under the terms of the GNU Lesser -** General Public License version 3 as published by the Free Software -** Foundation and appearing in the file LICENSE.LGPLv3 included in the -** packaging of this file. Please review the following information to -** ensure the GNU Lesser General Public License version 3 requirements -** will be met: https://www.gnu.org/licenses/lgpl.html. -** -** GNU General Public License Usage -** Alternatively, this file may be used under the terms of the GNU -** General Public License version 2.0 or later as published by the Free -** Software Foundation and appearing in the file LICENSE.GPL included in -** the packaging of this file. Please review the following information to -** ensure the GNU General Public License version 2.0 requirements will be -** met: http://www.gnu.org/licenses/gpl-2.0.html. -** -** $QT_END_LICENSE$ -** -****************************************************************************/ - -#ifndef QT3D_JOBRUNNER_P_H -#define QT3D_JOBRUNNER_P_H - -#include "qaspectjobmanager.h" -#include "task_p.h" - -#include -#include -#include - -QT_BEGIN_NAMESPACE - -namespace Qt3D { - -class QThreadPooler; - -class JobRunner : public QThread -{ - Q_OBJECT - -public: - explicit JobRunner(QThreadPooler *parent = 0); - ~JobRunner(); - - void run() Q_DECL_OVERRIDE; - - inline void setWaitConditions(QWaitCondition *jobAvailable) - { - m_jobAvailable = jobAvailable; - } - inline void setMutex(QMutex *mutex) { m_mutex = mutex; } - -private: - void suspend(); - -private: - QAtomicInt m_abort; - QThreadPooler *m_pooler; - - QWaitCondition *m_jobAvailable; - QMutex *m_mutex; // For waiting next available job - -private Q_SLOTS: - void shutDown(); -}; - -} // namespace Qt3D - -QT_END_NAMESPACE - -#endif // QT3D_JOBRUNNER_P_H - diff --git a/src/core/jobs/jobs.pri b/src/core/jobs/jobs.pri index b6def7aa8..d21d21975 100644 --- a/src/core/jobs/jobs.pri +++ b/src/core/jobs/jobs.pri @@ -3,7 +3,6 @@ SOURCES += \ $$PWD/qaspectjob.cpp \ $$PWD/qaspectjobmanager.cpp \ $$PWD/qabstractaspectjobmanager.cpp \ - $$PWD/jobrunner.cpp \ $$PWD/qthreadpooler.cpp \ $$PWD/task.cpp \ $$PWD/dependencyhandler.cpp @@ -26,9 +25,7 @@ HEADERS += \ $$PWD/qaspectjobmanager_p.h \ $$PWD/qabstractaspectjobmanager_p.h \ $$PWD/dependencyhandler_p.h \ - $$PWD/jobrunner_p.h \ $$PWD/task_p.h \ - $$PWD/qthreadpooler_p_p.h \ $$PWD/qthreadpooler_p.h INCLUDEPATH += $$PWD diff --git a/src/core/jobs/qaspectjobmanager.cpp b/src/core/jobs/qaspectjobmanager.cpp index f959bee18..c9c230d33 100644 --- a/src/core/jobs/qaspectjobmanager.cpp +++ b/src/core/jobs/qaspectjobmanager.cpp @@ -50,7 +50,9 @@ #include #include #include - +#include +#include +#include QT_BEGIN_NAMESPACE @@ -112,6 +114,11 @@ QAspectJobManagerPrivate::QAspectJobManagerPrivate(QAspectJobManager *qq) { } +QAspectJobManagerPrivate::~QAspectJobManagerPrivate() +{ + delete m_dependencyHandler; +} + QAspectJobManager::QAspectJobManager(QObject *parent) : QAbstractAspectJobManager(*new QAspectJobManagerPrivate(this), parent) { @@ -121,8 +128,6 @@ QAspectJobManager::QAspectJobManager(QObject *parent) d->m_weaver->setMaximumNumberOfThreads(QThread::idealThreadCount()); #else d->m_threadPooler = new QThreadPooler(this); - d->m_threadPooler->setMaxThreadCount(QThread::idealThreadCount()); - d->m_dependencyHandler = new DependencyHandler(); d->m_threadPooler->setDependencyHandler(d->m_dependencyHandler); #endif @@ -138,9 +143,8 @@ QAspectJobManager::QAspectJobManager(QAspectJobManagerPrivate &dd, QObject *pare d->m_weaver->setMaximumNumberOfThreads(QThread::idealThreadCount()); #else d->m_threadPooler = new QThreadPooler(this); - d->m_threadPooler->setMaxThreadCount(QThread::idealThreadCount()); - d->m_dependencyHandler = new DependencyHandler(); + d->m_threadPooler->setDependencyHandler(d->m_dependencyHandler); #endif } @@ -182,24 +186,27 @@ void QAspectJobManager::enqueueJobs(const QVector &jobQueue) } #else // Convert QJobs to Tasks - QHash> tasksMap; + QHash tasksMap; + QVector taskList; Q_FOREACH (const QAspectJobPtr &job, jobQueue) { - QSharedPointer task = QSharedPointer::create(); + AspectTaskRunnable *task = new AspectTaskRunnable(); task->m_job = job; tasksMap.insert(job.data(), task); + + taskList << task; } // Resolve dependencies QVector dependencyList; - Q_FOREACH (const QAspectJobPtr &job, jobQueue) { + Q_FOREACH (const QSharedPointer &job, jobQueue) { const QVector > &deps = job->dependencies(); Q_FOREACH (const QWeakPointer &dep, deps) { - QSharedPointer taskDependee = tasksMap.value(dep.data()); + AspectTaskRunnable *taskDependee = tasksMap.value(dep.data()); if (taskDependee) { - QSharedPointer taskDepender = tasksMap.value(job.data()); + AspectTaskRunnable *taskDepender = tasksMap.value(job.data()); dependencyList.append(Dependency(taskDepender, taskDependee)); taskDepender->setDependencyHandler(d->m_dependencyHandler); taskDependee->setDependencyHandler(d->m_dependencyHandler); @@ -208,10 +215,7 @@ void QAspectJobManager::enqueueJobs(const QVector &jobQueue) } d->m_dependencyHandler->addDependencies(qMove(dependencyList)); - Q_FOREACH (const QAspectJobPtr &job, jobQueue) { - QSharedPointer task = tasksMap.value(job.data()); - d->m_threadPooler->enqueueTask(task); - } + d->m_threadPooler->mapDependables(taskList); #endif } @@ -221,7 +225,9 @@ void QAspectJobManager::waitForAllJobs() #ifdef THREAD_WEAVER d->m_weaver->finish(); #else - d->m_threadPooler->flush(); + QFutureWatcher futureWatcher; + futureWatcher.setFuture(d->m_threadPooler->future()); + futureWatcher.waitForFinished(); #endif } @@ -240,15 +246,19 @@ void QAspectJobManager::waitForPerThreadFunction(JobFunction func, void *arg) d->m_weaver->finish(); #else - const int threadCount = d->m_threadPooler->maxThreadCount(); + const int threadCount = QThread::idealThreadCount(); QAtomicInt atomicCount(threadCount); + QVector taskList; for (int i = 0; i < threadCount; ++i) { - QSharedPointer syncTask(new SynchronizedTask(func, arg, &atomicCount)); - d->m_threadPooler->enqueueTask(syncTask); + SyncTaskRunnable *syncTask = new SyncTaskRunnable(func, arg, &atomicCount); + taskList << syncTask; } - d->m_threadPooler->flush(); + QFuture future = d->m_threadPooler->mapDependables(taskList); + QFutureWatcher futureWatcher; + futureWatcher.setFuture(future); + futureWatcher.waitForFinished(); #endif } diff --git a/src/core/jobs/qaspectjobmanager_p.h b/src/core/jobs/qaspectjobmanager_p.h index 5ea9cc54a..096c0470a 100644 --- a/src/core/jobs/qaspectjobmanager_p.h +++ b/src/core/jobs/qaspectjobmanager_p.h @@ -58,6 +58,7 @@ class QAspectJobManagerPrivate : public QAbstractAspectJobManagerPrivate { public: QAspectJobManagerPrivate(QAspectJobManager *qq); + ~QAspectJobManagerPrivate(); Q_DECLARE_PUBLIC(QAspectJobManager) QAspectJobManager *q_ptr; @@ -69,8 +70,6 @@ public: QThreadPooler *m_threadPooler; DependencyHandler *m_dependencyHandler; - QMutex *m_syncMutex; - QWaitCondition m_syncFinished; }; } // Qt3D diff --git a/src/core/jobs/qthreadpooler.cpp b/src/core/jobs/qthreadpooler.cpp index b48cc5aad..890a8942a 100644 --- a/src/core/jobs/qthreadpooler.cpp +++ b/src/core/jobs/qthreadpooler.cpp @@ -35,9 +35,9 @@ ****************************************************************************/ #include "qthreadpooler_p.h" -#include "qthreadpooler_p_p.h" -#include "jobrunner_p.h" #include "dependencyhandler_p.h" + +#include #include QT_BEGIN_NAMESPACE @@ -48,218 +48,104 @@ namespace Qt3D { \class Qt3D::QThreadPoolerPrivate \internal */ -QThreadPoolerPrivate::QThreadPoolerPrivate(QThreadPooler *qq) - : QObjectPrivate(), +QThreadPooler::QThreadPooler(QObject *parent) + : QObject(parent), + m_futureInterface(Q_NULLPTR), m_mutex(new QMutex(QMutex::NonRecursive)), - m_runningThreads(0) + m_taskCount(0) { - q_ptr = qq; } -QThreadPoolerPrivate::~QThreadPoolerPrivate() +QThreadPooler::~QThreadPooler() { - Q_FOREACH (QSharedPointer task, m_taskQueue) - task->setDependencyHandler(Q_NULLPTR); - delete m_dependencyHandler; - delete m_mutex; } -void QThreadPoolerPrivate::shutdown() -{ - m_jobFinished.wakeAll(); - - // When shutting down a signal is send for jobrunners to exit run() loop - // on next round. Sometimes the jobrunner is busy doing still the clean up - // tasks and isn't waiting the release of WaitCondition. Repeat the waking - // process max tryOuts. - const int tryOuts = 2; - - Q_FOREACH (JobRunner *jr, m_workers) { - if (!jr->isFinished()) { - for (int i = 0; i < tryOuts; i++) { - m_jobAvailable.wakeAll(); - if (jr->wait(100)) - break; - } - } - } -} - - -bool QThreadPoolerPrivate::isQueueEmpty() -{ - return m_taskQueue.isEmpty(); -} - -void QThreadPoolerPrivate::incRunningThreads() -{ - m_runningThreads += 1; -} - -void QThreadPoolerPrivate::decRunningThreads() +void QThreadPooler::setDependencyHandler(DependencyHandler *handler) { - m_runningThreads -= 1; - - // Sanity check - if (m_runningThreads < 0) - m_runningThreads = 0; + m_dependencyHandler = handler; + m_dependencyHandler->setMutex(m_mutex); } -void QThreadPoolerPrivate::createRunners(int threadCount) +void QThreadPooler::enqueueTasks(QVector &tasks) { - Q_Q(QThreadPooler); - - for (int i = 0; i < threadCount; i++) { - JobRunner *jr = new JobRunner(q); + // The caller have to set the mutex - jr->setMutex(m_mutex); - jr->setWaitConditions(&(m_jobAvailable)); - jr->start(); - - m_workers.append(jr); + for (QVector::iterator it = tasks.begin(); + it != tasks.end(); it++) { + if (!m_dependencyHandler->hasDependency((*it))) { + (*it)->setPooler(this); + QThreadPool::globalInstance()->start((*it)); + } } } -int QThreadPoolerPrivate::maxThreadCount() const -{ - return m_maxThreadCount; -} - -void QThreadPoolerPrivate::setMaxThreadCount(int threadCount) +void QThreadPooler::taskFinished(QVector tasks) { + const QMutexLocker locker(m_mutex); - m_maxThreadCount = threadCount; - createRunners(m_maxThreadCount); -} - -///////////////////////////////////////////////// - -QThreadPooler::QThreadPooler(QObject *parent) - : QObject(*new QThreadPoolerPrivate(this), parent) -{ -} - -QThreadPooler::~QThreadPooler() -{ - Q_D(QThreadPooler); + release(); - emit shuttingDown(); - d->m_jobAvailable.wakeAll(); + if (tasks.size()) + enqueueTasks(tasks); - d->shutdown(); -} - -int QThreadPooler::maxThreadCount() const -{ - Q_D(const QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - return d->maxThreadCount(); -} - -void QThreadPooler::setMaxThreadCount(int threadCount) -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - d->setMaxThreadCount(threadCount); -} - -QSharedPointer QThreadPooler::nextTask() -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - QSharedPointer task; - int queueSize = d->m_taskQueue.size(); - for (int i = 0; i < queueSize; i++) { - const QSharedPointer &candidate = d->m_taskQueue.at(i); - if (!hasDependencies(candidate)) { - task = candidate; - // Increment running thread counter before removing item from queue - // so that isIdle test keeps up - d->incRunningThreads(); - d->m_taskQueue.removeAt(i); - - break; + if (currentCount() == 0) { + if (m_futureInterface) { + m_futureInterface->reportFinished(); + delete m_futureInterface; } + m_futureInterface = Q_NULLPTR; } - - return task; } -bool QThreadPooler::hasDependencies(const QSharedPointer &task) +QFuture QThreadPooler::mapDependables(QVector &taskQueue) { - DependencyHandler *handler = task->dependencyHandler(); - if (handler) - return handler->hasDependency(task); + const QMutexLocker locker(m_mutex); - return false; -} - -void QThreadPooler::enqueueTask(const QSharedPointer &task) -{ - Q_D(QThreadPooler); + if (!m_futureInterface) + m_futureInterface = new QFutureInterface(); + if (taskQueue.size()) + m_futureInterface->reportStarted(); - const QMutexLocker locker(d->m_mutex); + acquire(taskQueue.size()); + enqueueTasks(taskQueue); - d->m_taskQueue.append(task); - d->m_jobAvailable.wakeAll(); + return QFuture(m_futureInterface); } -void QThreadPooler::flush() +QFuture QThreadPooler::future() { - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - -#ifdef QT_NO_DEBUG - const int waitTime = 50; -#else - const int waitTime = 500; -#endif - - while (!isIdling()) { - if (d->m_jobFinished.wait(d->m_mutex, waitTime) == false) - d->m_jobAvailable.wakeAll(); - } + if (!m_futureInterface) + return QFuture(); + else + return QFuture(m_futureInterface); } -bool QThreadPooler::isIdling() +void QThreadPooler::acquire(int add) { - Q_D(QThreadPooler); - - return d->isQueueEmpty() && d->m_runningThreads == 0; -} - -void QThreadPooler::startRunning() -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - d->incRunningThreads(); + forever { + int localCount = m_taskCount.load(); + if (m_taskCount.testAndSetOrdered(localCount, localCount + add)) + return; + } } -void QThreadPooler::stopRunning() +void QThreadPooler::release() { - Q_D(QThreadPooler); + forever { + int localCount = m_taskCount.load(); - const QMutexLocker locker(d->m_mutex); + // Task counter going below zero means coding errors somewhere. + Q_ASSERT(localCount > 0); - d->decRunningThreads(); - d->m_jobFinished.wakeAll(); + if (m_taskCount.testAndSetOrdered(localCount, localCount - 1)) + return; + } } -void QThreadPooler::setDependencyHandler(DependencyHandler *handler) +int QThreadPooler::currentCount() { - Q_D(QThreadPooler); - - d->setDependencyHandler(handler); + return m_taskCount.load(); } } // namespace Qt3D diff --git a/src/core/jobs/qthreadpooler_p.h b/src/core/jobs/qthreadpooler_p.h index 6579e6283..9abbdc8d0 100644 --- a/src/core/jobs/qthreadpooler_p.h +++ b/src/core/jobs/qthreadpooler_p.h @@ -41,15 +41,14 @@ #include "dependencyhandler_p.h" #include -#include #include +#include +#include QT_BEGIN_NAMESPACE namespace Qt3D { -class QThreadPoolerPrivate; - class QThreadPooler : public QObject { Q_OBJECT @@ -58,25 +57,23 @@ public: explicit QThreadPooler(QObject *parent = 0); ~QThreadPooler(); - int maxThreadCount() const; - void setMaxThreadCount(int threadCount); - QSharedPointer nextTask(); - void enqueueTask(const QSharedPointer &task); - void flush(); - void startRunning(); - void stopRunning(); - void setDependencyHandler(DependencyHandler *handler); + QFuture mapDependables(QVector &taskQueue); + void taskFinished(QVector tasks); + QFuture future(); -signals: - void shuttingDown(); + void setDependencyHandler(DependencyHandler *handler); private: - void manageThreads(); - bool hasDependencies(const QSharedPointer &task); - bool isIdling(); + void enqueueTasks(QVector &tasks); + void acquire(int add); + void release(); + int currentCount(); private: - Q_DECLARE_PRIVATE(QThreadPooler) + QFutureInterface *m_futureInterface; + QMutex *m_mutex; + DependencyHandler *m_dependencyHandler; + QAtomicInt m_taskCount; }; } // namespace Qt3D diff --git a/src/core/jobs/qthreadpooler_p_p.h b/src/core/jobs/qthreadpooler_p_p.h deleted file mode 100644 index 0057acc36..000000000 --- a/src/core/jobs/qthreadpooler_p_p.h +++ /dev/null @@ -1,93 +0,0 @@ -/**************************************************************************** -** -** Copyright (C) 2015 The Qt Company Ltd. -** Contact: http://www.qt.io/licensing/ -** -** This file is part of the Qt3D module of the Qt Toolkit. -** -** $QT_BEGIN_LICENSE:LGPL3$ -** Commercial License Usage -** Licensees holding valid commercial Qt licenses may use this file in -** accordance with the commercial license agreement provided with the -** Software or, alternatively, in accordance with the terms contained in -** a written agreement between you and The Qt Company. For licensing terms -** and conditions see http://www.qt.io/terms-conditions. For further -** information use the contact form at http://www.qt.io/contact-us. -** -** GNU Lesser General Public License Usage -** Alternatively, this file may be used under the terms of the GNU Lesser -** General Public License version 3 as published by the Free Software -** Foundation and appearing in the file LICENSE.LGPLv3 included in the -** packaging of this file. Please review the following information to -** ensure the GNU Lesser General Public License version 3 requirements -** will be met: https://www.gnu.org/licenses/lgpl.html. -** -** GNU General Public License Usage -** Alternatively, this file may be used under the terms of the GNU -** General Public License version 2.0 or later as published by the Free -** Software Foundation and appearing in the file LICENSE.GPL included in -** the packaging of this file. Please review the following information to -** ensure the GNU General Public License version 2.0 requirements will be -** met: http://www.gnu.org/licenses/gpl-2.0.html. -** -** $QT_END_LICENSE$ -** -****************************************************************************/ - -#ifndef QT3D_QTHREADPOOLER_P_H -#define QT3D_QTHREADPOOLER_P_H - -#include "jobrunner_p.h" -#include "dependencyhandler_p.h" - -#include -#include - -QT_BEGIN_NAMESPACE - -namespace Qt3D { - -class QThreadPooler; - -class QThreadPoolerPrivate : public QObjectPrivate -{ -public: - QThreadPoolerPrivate(QThreadPooler *qq); - ~QThreadPoolerPrivate(); - - bool isQueueEmpty(); - - void incRunningThreads(); - void decRunningThreads(); - - inline void setDependencyHandler(DependencyHandler *handler) - { - m_dependencyHandler = handler; - } - - void createRunners(int threadCount); - void shutdown(); - - int maxThreadCount() const; - void setMaxThreadCount(int threadCount); - - Q_DECLARE_PUBLIC(QThreadPooler) - -private: - QList m_workers; - QVector > m_taskQueue; - - QWaitCondition m_jobAvailable; - QWaitCondition m_jobFinished; - QMutex *m_mutex; - int m_runningThreads; - int m_maxThreadCount; - DependencyHandler *m_dependencyHandler; -}; - -} - -QT_END_NAMESPACE - -#endif // QT3D_QTHREADPOOLER_P_H - diff --git a/src/core/jobs/task.cpp b/src/core/jobs/task.cpp index 87ddd57a5..55f4bacb6 100644 --- a/src/core/jobs/task.cpp +++ b/src/core/jobs/task.cpp @@ -35,8 +35,8 @@ ****************************************************************************/ #include "task_p.h" -#include "jobrunner_p.h" #include "dependencyhandler_p.h" +#include "qthreadpooler_p.h" #include @@ -46,75 +46,61 @@ QT_BEGIN_NAMESPACE namespace Qt3D { - -TaskInterface::~TaskInterface() +RunnableInterface::~RunnableInterface() { } // Aspect task -AspectTask::AspectTask() +AspectTaskRunnable::AspectTaskRunnable() : m_dependencyHandler(0) { } -AspectTask::~AspectTask() +AspectTaskRunnable::~AspectTaskRunnable() { } -void AspectTask::run(QSharedPointer self, JobRunner *jr) +void AspectTaskRunnable::run() { - Q_UNUSED(self); - Q_UNUSED(jr); if (m_job) m_job->run(); - // Cleanup stuff - // For now at least dependecies. + QVector freedTasks; if (m_dependencyHandler) - m_dependencyHandler->freeDependencies(self); -} + freedTasks = m_dependencyHandler->freeDependencies(this); -void AspectTask::run() -{ + if (m_pooler) + m_pooler->taskFinished(freedTasks); } -void AspectTask::setDependencyHandler(DependencyHandler *handler) +void AspectTaskRunnable::setDependencyHandler(DependencyHandler *handler) { m_dependencyHandler = handler; } -DependencyHandler *AspectTask::dependencyHandler() +DependencyHandler *AspectTaskRunnable::dependencyHandler() { return m_dependencyHandler; } // Synchronized task -SynchronizedTask::SynchronizedTask(QAbstractAspectJobManager::JobFunction func, +SyncTaskRunnable::SyncTaskRunnable(QAbstractAspectJobManager::JobFunction func, void *arg, QAtomicInt *atomicCount) : m_func(func), m_arg(arg), - m_atomicCount(atomicCount) -{ -} - -SynchronizedTask::~SynchronizedTask() + m_atomicCount(atomicCount), + m_pooler(Q_NULLPTR) { } -void SynchronizedTask::run() +SyncTaskRunnable::~SyncTaskRunnable() { - m_func(m_arg); } -void SynchronizedTask::run(QSharedPointer self, JobRunner *jr) +void SyncTaskRunnable::run() { - Q_UNUSED(self); - Q_UNUSED(jr); - Q_ASSERT(m_func); - Q_ASSERT(jr); - // Call the function m_func(m_arg); @@ -123,20 +109,22 @@ void SynchronizedTask::run(QSharedPointer self, JobRunner *jr) // Wait for the other worker threads to be done while (m_atomicCount->load() > 0) - jr->yieldCurrentThread(); + QThread::currentThread()->yieldCurrentThread(); + + if (m_pooler) + m_pooler->taskFinished(QVector()); } -void SynchronizedTask::setDependencyHandler(DependencyHandler *handler) +void SyncTaskRunnable::setDependencyHandler(DependencyHandler *handler) { Q_UNUSED(handler); } -DependencyHandler *SynchronizedTask::dependencyHandler() +DependencyHandler *SyncTaskRunnable::dependencyHandler() { return Q_NULLPTR; } - } // namespace Qt3D { QT_END_NAMESPACE diff --git a/src/core/jobs/task_p.h b/src/core/jobs/task_p.h index cde8f90eb..9b5c6a0f2 100644 --- a/src/core/jobs/task_p.h +++ b/src/core/jobs/task_p.h @@ -42,7 +42,8 @@ #include #include #include -#include + +#include QT_BEGIN_NAMESPACE @@ -50,13 +51,13 @@ namespace Qt3D { class JobRunner; class DependencyHandler; +class QThreadPooler; -class TaskInterface +class RunnableInterface : public QRunnable { public: - virtual ~TaskInterface(); + virtual ~RunnableInterface(); - virtual void run(QSharedPointer self, JobRunner *jr) = 0; virtual void run() = 0; virtual void setDependencyHandler(DependencyHandler *) = 0; @@ -64,54 +65,60 @@ public: virtual int id() = 0; virtual void setId(int id) = 0; + + virtual void setPooler(QThreadPooler *pooler) = 0; }; -class AspectTask : public TaskInterface +class AspectTaskRunnable : public RunnableInterface { public: - AspectTask(); - ~AspectTask(); + AspectTaskRunnable(); + ~AspectTaskRunnable(); - int id() Q_DECL_OVERRIDE { return m_id; } - void setId(int id) Q_DECL_OVERRIDE { m_id = id; } + void run(); void setDependencyHandler(DependencyHandler *handler) Q_DECL_OVERRIDE; DependencyHandler *dependencyHandler() Q_DECL_OVERRIDE; + void setPooler(QThreadPooler *pooler) Q_DECL_OVERRIDE { m_pooler = pooler; } + + int id() Q_DECL_OVERRIDE { return m_id; } + void setId(int id) Q_DECL_OVERRIDE { m_id = id; } + public: QSharedPointer m_job; -protected: - void run(QSharedPointer self, JobRunner *jr) Q_DECL_OVERRIDE; - void run() Q_DECL_OVERRIDE; - private: DependencyHandler *m_dependencyHandler; + QThreadPooler *m_pooler; + int m_id; // For testing purposes for now }; -class SynchronizedTask : public TaskInterface +class SyncTaskRunnable : public RunnableInterface { public: - explicit SynchronizedTask(QAbstractAspectJobManager::JobFunction func, void *arg, + explicit SyncTaskRunnable(QAbstractAspectJobManager::JobFunction func, void *arg, QAtomicInt *atomicCount); - ~SynchronizedTask(); + ~SyncTaskRunnable(); - int id() Q_DECL_OVERRIDE { return m_id; } - void setId(int id) Q_DECL_OVERRIDE { m_id = id; } + void run(); void setDependencyHandler(DependencyHandler *handler) Q_DECL_OVERRIDE; DependencyHandler *dependencyHandler() Q_DECL_OVERRIDE; -protected: - void run(QSharedPointer self, JobRunner *jr) Q_DECL_OVERRIDE; - void run() Q_DECL_OVERRIDE; + void setPooler(QThreadPooler *pooler) Q_DECL_OVERRIDE { m_pooler = pooler; } + + int id() Q_DECL_OVERRIDE { return m_id; } + void setId(int id) Q_DECL_OVERRIDE { m_id = id; } private: QAbstractAspectJobManager::JobFunction m_func; void *m_arg; QAtomicInt *m_atomicCount; + QThreadPooler *m_pooler; + int m_id; }; -- cgit v1.2.3