summaryrefslogtreecommitdiffstats
path: root/src/core
diff options
context:
space:
mode:
authorMika Salmela <mika.salmela@theqtcompany.com>2015-04-24 13:21:41 +0300
committerMika Salmela <mika.salmela@theqtcompany.com>2015-04-24 10:48:27 +0000
commit1d1b84a579eff2e245e491012104b7c5b5d655d1 (patch)
tree46dc47c91145182ed96214c97a7ce18300a1b1cf /src/core
parent1a4f9e8953753172ffde2fe6389ad8c6e40c4d1e (diff)
Implement ThreadPooler using QThreadPool
Re-implemented ThreadPooler to use QRunnable and QThreadPool. Change-Id: I3da0182489b648dcb6b671c57b4474c183bddeec Reviewed-by: Miikka Heikkinen <miikka.heikkinen@theqtcompany.com>
Diffstat (limited to 'src/core')
-rw-r--r--src/core/jobs/dependencyhandler.cpp39
-rw-r--r--src/core/jobs/dependencyhandler_p.h15
-rw-r--r--src/core/jobs/jobrunner.cpp91
-rw-r--r--src/core/jobs/jobrunner_p.h88
-rw-r--r--src/core/jobs/jobs.pri3
-rw-r--r--src/core/jobs/qaspectjobmanager.cpp48
-rw-r--r--src/core/jobs/qaspectjobmanager_p.h3
-rw-r--r--src/core/jobs/qthreadpooler.cpp230
-rw-r--r--src/core/jobs/qthreadpooler_p.h31
-rw-r--r--src/core/jobs/qthreadpooler_p_p.h93
-rw-r--r--src/core/jobs/task.cpp56
-rw-r--r--src/core/jobs/task_p.h49
12 files changed, 184 insertions, 562 deletions
diff --git a/src/core/jobs/dependencyhandler.cpp b/src/core/jobs/dependencyhandler.cpp
index 9778ede58..118602a61 100644
--- a/src/core/jobs/dependencyhandler.cpp
+++ b/src/core/jobs/dependencyhandler.cpp
@@ -47,13 +47,13 @@ namespace {
struct ByDepender {
typedef bool result_type;
- bool operator()(const QSharedPointer<TaskInterface> &lhs, const QSharedPointer<TaskInterface> &rhs) const Q_DECL_NOTHROW
- { return Op<QSharedPointer<TaskInterface> >()(lhs, rhs); }
+ bool operator()(const RunnableInterface *lhs, const RunnableInterface *rhs) const Q_DECL_NOTHROW
+ { return Op<const RunnableInterface *>()(lhs, rhs); }
- bool operator()(const QSharedPointer<TaskInterface> &lhs, const Dependency &rhs) const Q_DECL_NOTHROW
+ bool operator()(const RunnableInterface *lhs, const Dependency &rhs) const Q_DECL_NOTHROW
{ return operator()(lhs, rhs.depender); }
- bool operator()(const Dependency &lhs, const QSharedPointer<TaskInterface> &rhs) const Q_DECL_NOTHROW
+ bool operator()(const Dependency &lhs, const RunnableInterface *rhs) const Q_DECL_NOTHROW
{ return operator()(lhs.depender, rhs); }
bool operator()(const Dependency &lhs, const Dependency &rhs) const Q_DECL_NOTHROW
@@ -62,12 +62,17 @@ namespace {
struct DependeeEquals : std::unary_function<Dependency, bool>
{
- QSharedPointer<TaskInterface> dependee;
- explicit DependeeEquals(QSharedPointer<TaskInterface> dependee)
- : dependee(qMove(dependee)) {}
+ const RunnableInterface *dependee;
+ QVector<RunnableInterface *> *freedList;
+ explicit DependeeEquals(const RunnableInterface *dependee, QVector<RunnableInterface *> *freedList)
+ : dependee(qMove(dependee)), freedList(qMove(freedList)) {}
bool operator()(const Dependency &candidate) const
{
- return dependee == candidate.dependee;
+ if (dependee == candidate.dependee) {
+ freedList->append(candidate.depender);
+ return true;
+ }
+ return false;
}
};
@@ -91,7 +96,7 @@ void DependencyHandler::addDependencies(QVector<Dependency> dependencies)
{
std::sort(dependencies.begin(), dependencies.end(), ByDependerThenDependee());
- const QMutexLocker locker(&m_mutex);
+ const QMutexLocker locker(m_mutex);
QVector<Dependency> newDependencyMap;
newDependencyMap.reserve(dependencies.size() + m_dependencyMap.size());
@@ -101,9 +106,9 @@ void DependencyHandler::addDependencies(QVector<Dependency> dependencies)
m_dependencyMap.swap(newDependencyMap); // commit
}
-bool DependencyHandler::hasDependency(const QSharedPointer<TaskInterface> &depender)
+bool DependencyHandler::hasDependency(const RunnableInterface *depender)
{
- const QMutexLocker locker(&m_mutex);
+ // The caller has to set the mutex, which is QThreadPooler::enqueueTasks
return std::binary_search(m_dependencyMap.begin(), m_dependencyMap.end(),
depender, ByDepender<std::less>());
@@ -113,13 +118,17 @@ bool DependencyHandler::hasDependency(const QSharedPointer<TaskInterface> &depen
* Removes all the entries on the m_dependencyMap that have given task as a dependee,
* i.e. entries where the dependency is on the given task.
*/
-void DependencyHandler::freeDependencies(const QSharedPointer<TaskInterface> &dependee)
+QVector<RunnableInterface *> DependencyHandler::freeDependencies(const RunnableInterface *dependee)
{
- const QMutexLocker locker(&m_mutex);
+ const QMutexLocker locker(m_mutex);
- m_dependencyMap.erase(std::remove_if(m_dependencyMap.begin(), m_dependencyMap.end(),
- DependeeEquals(dependee)),
+ QVector<RunnableInterface *> freedList;
+ m_dependencyMap.erase(std::remove_if(m_dependencyMap.begin(),
+ m_dependencyMap.end(),
+ DependeeEquals(dependee, &freedList)),
m_dependencyMap.end());
+
+ return freedList;
}
} // namespace Qt3D
diff --git a/src/core/jobs/dependencyhandler_p.h b/src/core/jobs/dependencyhandler_p.h
index 0d2aafaa6..e48113b80 100644
--- a/src/core/jobs/dependencyhandler_p.h
+++ b/src/core/jobs/dependencyhandler_p.h
@@ -49,18 +49,18 @@ namespace Qt3D {
struct Dependency
{
Dependency() {}
- Dependency(QSharedPointer<TaskInterface> depender, QSharedPointer<TaskInterface> dependee)
+ Dependency(RunnableInterface *depender, RunnableInterface *dependee)
: depender(qMove(depender)),
dependee(qMove(dependee)) {}
- QSharedPointer<TaskInterface> depender;
- QSharedPointer<TaskInterface> dependee;
+ RunnableInterface *depender;
+ RunnableInterface *dependee;
};
} // namespace Qt3D
template <>
-class QTypeInfo<Qt3D::Dependency> : public QTypeInfoMerger<Qt3D::Dependency, QSharedPointer<Qt3D::TaskInterface> > {};
+class QTypeInfo<Qt3D::Dependency> : public QTypeInfoMerger<Qt3D::Dependency, Qt3D::RunnableInterface *> {};
namespace Qt3D {
@@ -80,14 +80,15 @@ public:
DependencyHandler();
void addDependencies(QVector<Dependency> dependencies);
- bool hasDependency(const QSharedPointer<TaskInterface> &depender);
- void freeDependencies(const QSharedPointer<TaskInterface> &dependee);
+ bool hasDependency(const RunnableInterface *depender);
+ QVector<RunnableInterface *> freeDependencies(const RunnableInterface *dependee);
+ void setMutex(QMutex *mutex) { m_mutex = mutex; }
private:
Q_DISABLE_COPY(DependencyHandler)
QVector<Dependency> m_dependencyMap;
- mutable QMutex m_mutex;
+ QMutex *m_mutex;
};
} // namespace Qt3D
diff --git a/src/core/jobs/jobrunner.cpp b/src/core/jobs/jobrunner.cpp
deleted file mode 100644
index ab5555585..000000000
--- a/src/core/jobs/jobrunner.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/****************************************************************************
-**
-** Copyright (C) 2015 The Qt Company Ltd.
-** Contact: http://www.qt.io/licensing/
-**
-** This file is part of the Qt3D module of the Qt Toolkit.
-**
-** $QT_BEGIN_LICENSE:LGPL3$
-** Commercial License Usage
-** Licensees holding valid commercial Qt licenses may use this file in
-** accordance with the commercial license agreement provided with the
-** Software or, alternatively, in accordance with the terms contained in
-** a written agreement between you and The Qt Company. For licensing terms
-** and conditions see http://www.qt.io/terms-conditions. For further
-** information use the contact form at http://www.qt.io/contact-us.
-**
-** GNU Lesser General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU Lesser
-** General Public License version 3 as published by the Free Software
-** Foundation and appearing in the file LICENSE.LGPLv3 included in the
-** packaging of this file. Please review the following information to
-** ensure the GNU Lesser General Public License version 3 requirements
-** will be met: https://www.gnu.org/licenses/lgpl.html.
-**
-** GNU General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU
-** General Public License version 2.0 or later as published by the Free
-** Software Foundation and appearing in the file LICENSE.GPL included in
-** the packaging of this file. Please review the following information to
-** ensure the GNU General Public License version 2.0 requirements will be
-** met: http://www.gnu.org/licenses/gpl-2.0.html.
-**
-** $QT_END_LICENSE$
-**
-****************************************************************************/
-
-#include "jobrunner_p.h"
-#include "qthreadpooler_p.h"
-
-#include <QtCore/QThread>
-#include <QtCore/QMutexLocker>
-#include <QtCore/QAtomicInt>
-
-QT_BEGIN_NAMESPACE
-
-namespace Qt3D {
-
-JobRunner::JobRunner(QThreadPooler *parent)
- : QThread(parent),
- m_abort(0),
- m_pooler(parent),
- m_jobAvailable(Q_NULLPTR),
- m_mutex(Q_NULLPTR)
-{
- QObject::connect(parent, SIGNAL(shuttingDown()), this, SLOT(shutDown()));
-}
-
-JobRunner::~JobRunner()
-{
- shutDown();
-}
-
-void JobRunner::run()
-{
- Q_ASSERT(m_jobAvailable != Q_NULLPTR);
-
- while (!m_abort.load()) {
- if (const QSharedPointer<TaskInterface> task = m_pooler->nextTask()) {
- task->run(task, this);
- m_pooler->stopRunning();
- } else {
- suspend();
- }
- }
-}
-
-void JobRunner::suspend()
-{
- const QMutexLocker locker(m_mutex);
-
- m_jobAvailable->wait(m_mutex);
-}
-
-void JobRunner::shutDown()
-{
- m_abort.store(1);
-}
-
-} // namespace Qt3D
-
-QT_END_NAMESPACE
diff --git a/src/core/jobs/jobrunner_p.h b/src/core/jobs/jobrunner_p.h
deleted file mode 100644
index 14b5213e9..000000000
--- a/src/core/jobs/jobrunner_p.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/****************************************************************************
-**
-** Copyright (C) 2015 The Qt Company Ltd.
-** Contact: http://www.qt.io/licensing/
-**
-** This file is part of the Qt3D module of the Qt Toolkit.
-**
-** $QT_BEGIN_LICENSE:LGPL3$
-** Commercial License Usage
-** Licensees holding valid commercial Qt licenses may use this file in
-** accordance with the commercial license agreement provided with the
-** Software or, alternatively, in accordance with the terms contained in
-** a written agreement between you and The Qt Company. For licensing terms
-** and conditions see http://www.qt.io/terms-conditions. For further
-** information use the contact form at http://www.qt.io/contact-us.
-**
-** GNU Lesser General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU Lesser
-** General Public License version 3 as published by the Free Software
-** Foundation and appearing in the file LICENSE.LGPLv3 included in the
-** packaging of this file. Please review the following information to
-** ensure the GNU Lesser General Public License version 3 requirements
-** will be met: https://www.gnu.org/licenses/lgpl.html.
-**
-** GNU General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU
-** General Public License version 2.0 or later as published by the Free
-** Software Foundation and appearing in the file LICENSE.GPL included in
-** the packaging of this file. Please review the following information to
-** ensure the GNU General Public License version 2.0 requirements will be
-** met: http://www.gnu.org/licenses/gpl-2.0.html.
-**
-** $QT_END_LICENSE$
-**
-****************************************************************************/
-
-#ifndef QT3D_JOBRUNNER_P_H
-#define QT3D_JOBRUNNER_P_H
-
-#include "qaspectjobmanager.h"
-#include "task_p.h"
-
-#include <QtCore/QThread>
-#include <QtCore/QWaitCondition>
-#include <QtCore/QAtomicInt>
-
-QT_BEGIN_NAMESPACE
-
-namespace Qt3D {
-
-class QThreadPooler;
-
-class JobRunner : public QThread
-{
- Q_OBJECT
-
-public:
- explicit JobRunner(QThreadPooler *parent = 0);
- ~JobRunner();
-
- void run() Q_DECL_OVERRIDE;
-
- inline void setWaitConditions(QWaitCondition *jobAvailable)
- {
- m_jobAvailable = jobAvailable;
- }
- inline void setMutex(QMutex *mutex) { m_mutex = mutex; }
-
-private:
- void suspend();
-
-private:
- QAtomicInt m_abort;
- QThreadPooler *m_pooler;
-
- QWaitCondition *m_jobAvailable;
- QMutex *m_mutex; // For waiting next available job
-
-private Q_SLOTS:
- void shutDown();
-};
-
-} // namespace Qt3D
-
-QT_END_NAMESPACE
-
-#endif // QT3D_JOBRUNNER_P_H
-
diff --git a/src/core/jobs/jobs.pri b/src/core/jobs/jobs.pri
index b6def7aa8..d21d21975 100644
--- a/src/core/jobs/jobs.pri
+++ b/src/core/jobs/jobs.pri
@@ -3,7 +3,6 @@ SOURCES += \
$$PWD/qaspectjob.cpp \
$$PWD/qaspectjobmanager.cpp \
$$PWD/qabstractaspectjobmanager.cpp \
- $$PWD/jobrunner.cpp \
$$PWD/qthreadpooler.cpp \
$$PWD/task.cpp \
$$PWD/dependencyhandler.cpp
@@ -26,9 +25,7 @@ HEADERS += \
$$PWD/qaspectjobmanager_p.h \
$$PWD/qabstractaspectjobmanager_p.h \
$$PWD/dependencyhandler_p.h \
- $$PWD/jobrunner_p.h \
$$PWD/task_p.h \
- $$PWD/qthreadpooler_p_p.h \
$$PWD/qthreadpooler_p.h
INCLUDEPATH += $$PWD
diff --git a/src/core/jobs/qaspectjobmanager.cpp b/src/core/jobs/qaspectjobmanager.cpp
index f959bee18..c9c230d33 100644
--- a/src/core/jobs/qaspectjobmanager.cpp
+++ b/src/core/jobs/qaspectjobmanager.cpp
@@ -50,7 +50,9 @@
#include <QAtomicInt>
#include <QDebug>
#include <QThread>
-
+#include <QCoreApplication>
+#include <QtCore/QFuture>
+#include <QtCore/QFutureWatcher>
QT_BEGIN_NAMESPACE
@@ -112,6 +114,11 @@ QAspectJobManagerPrivate::QAspectJobManagerPrivate(QAspectJobManager *qq)
{
}
+QAspectJobManagerPrivate::~QAspectJobManagerPrivate()
+{
+ delete m_dependencyHandler;
+}
+
QAspectJobManager::QAspectJobManager(QObject *parent)
: QAbstractAspectJobManager(*new QAspectJobManagerPrivate(this), parent)
{
@@ -121,8 +128,6 @@ QAspectJobManager::QAspectJobManager(QObject *parent)
d->m_weaver->setMaximumNumberOfThreads(QThread::idealThreadCount());
#else
d->m_threadPooler = new QThreadPooler(this);
- d->m_threadPooler->setMaxThreadCount(QThread::idealThreadCount());
-
d->m_dependencyHandler = new DependencyHandler();
d->m_threadPooler->setDependencyHandler(d->m_dependencyHandler);
#endif
@@ -138,9 +143,8 @@ QAspectJobManager::QAspectJobManager(QAspectJobManagerPrivate &dd, QObject *pare
d->m_weaver->setMaximumNumberOfThreads(QThread::idealThreadCount());
#else
d->m_threadPooler = new QThreadPooler(this);
- d->m_threadPooler->setMaxThreadCount(QThread::idealThreadCount());
-
d->m_dependencyHandler = new DependencyHandler();
+ d->m_threadPooler->setDependencyHandler(d->m_dependencyHandler);
#endif
}
@@ -182,24 +186,27 @@ void QAspectJobManager::enqueueJobs(const QVector<QAspectJobPtr> &jobQueue)
}
#else
// Convert QJobs to Tasks
- QHash<QAspectJob *, QSharedPointer<AspectTask>> tasksMap;
+ QHash<QAspectJob *, AspectTaskRunnable *> tasksMap;
+ QVector<RunnableInterface *> taskList;
Q_FOREACH (const QAspectJobPtr &job, jobQueue) {
- QSharedPointer<AspectTask> task = QSharedPointer<AspectTask>::create();
+ AspectTaskRunnable *task = new AspectTaskRunnable();
task->m_job = job;
tasksMap.insert(job.data(), task);
+
+ taskList << task;
}
// Resolve dependencies
QVector<Dependency> dependencyList;
- Q_FOREACH (const QAspectJobPtr &job, jobQueue) {
+ Q_FOREACH (const QSharedPointer<QAspectJob> &job, jobQueue) {
const QVector<QWeakPointer<QAspectJob> > &deps = job->dependencies();
Q_FOREACH (const QWeakPointer<QAspectJob> &dep, deps) {
- QSharedPointer<AspectTask> taskDependee = tasksMap.value(dep.data());
+ AspectTaskRunnable *taskDependee = tasksMap.value(dep.data());
if (taskDependee) {
- QSharedPointer<AspectTask> taskDepender = tasksMap.value(job.data());
+ AspectTaskRunnable *taskDepender = tasksMap.value(job.data());
dependencyList.append(Dependency(taskDepender, taskDependee));
taskDepender->setDependencyHandler(d->m_dependencyHandler);
taskDependee->setDependencyHandler(d->m_dependencyHandler);
@@ -208,10 +215,7 @@ void QAspectJobManager::enqueueJobs(const QVector<QAspectJobPtr> &jobQueue)
}
d->m_dependencyHandler->addDependencies(qMove(dependencyList));
- Q_FOREACH (const QAspectJobPtr &job, jobQueue) {
- QSharedPointer<AspectTask> task = tasksMap.value(job.data());
- d->m_threadPooler->enqueueTask(task);
- }
+ d->m_threadPooler->mapDependables(taskList);
#endif
}
@@ -221,7 +225,9 @@ void QAspectJobManager::waitForAllJobs()
#ifdef THREAD_WEAVER
d->m_weaver->finish();
#else
- d->m_threadPooler->flush();
+ QFutureWatcher<void> futureWatcher;
+ futureWatcher.setFuture(d->m_threadPooler->future());
+ futureWatcher.waitForFinished();
#endif
}
@@ -240,15 +246,19 @@ void QAspectJobManager::waitForPerThreadFunction(JobFunction func, void *arg)
d->m_weaver->finish();
#else
- const int threadCount = d->m_threadPooler->maxThreadCount();
+ const int threadCount = QThread::idealThreadCount();
QAtomicInt atomicCount(threadCount);
+ QVector<RunnableInterface *> taskList;
for (int i = 0; i < threadCount; ++i) {
- QSharedPointer<SynchronizedTask> syncTask(new SynchronizedTask(func, arg, &atomicCount));
- d->m_threadPooler->enqueueTask(syncTask);
+ SyncTaskRunnable *syncTask = new SyncTaskRunnable(func, arg, &atomicCount);
+ taskList << syncTask;
}
- d->m_threadPooler->flush();
+ QFuture<void> future = d->m_threadPooler->mapDependables(taskList);
+ QFutureWatcher<void> futureWatcher;
+ futureWatcher.setFuture(future);
+ futureWatcher.waitForFinished();
#endif
}
diff --git a/src/core/jobs/qaspectjobmanager_p.h b/src/core/jobs/qaspectjobmanager_p.h
index 5ea9cc54a..096c0470a 100644
--- a/src/core/jobs/qaspectjobmanager_p.h
+++ b/src/core/jobs/qaspectjobmanager_p.h
@@ -58,6 +58,7 @@ class QAspectJobManagerPrivate : public QAbstractAspectJobManagerPrivate
{
public:
QAspectJobManagerPrivate(QAspectJobManager *qq);
+ ~QAspectJobManagerPrivate();
Q_DECLARE_PUBLIC(QAspectJobManager)
QAspectJobManager *q_ptr;
@@ -69,8 +70,6 @@ public:
QThreadPooler *m_threadPooler;
DependencyHandler *m_dependencyHandler;
- QMutex *m_syncMutex;
- QWaitCondition m_syncFinished;
};
} // Qt3D
diff --git a/src/core/jobs/qthreadpooler.cpp b/src/core/jobs/qthreadpooler.cpp
index b48cc5aad..890a8942a 100644
--- a/src/core/jobs/qthreadpooler.cpp
+++ b/src/core/jobs/qthreadpooler.cpp
@@ -35,9 +35,9 @@
****************************************************************************/
#include "qthreadpooler_p.h"
-#include "qthreadpooler_p_p.h"
-#include "jobrunner_p.h"
#include "dependencyhandler_p.h"
+
+#include <QtCore/QThreadPool>
#include <QDebug>
QT_BEGIN_NAMESPACE
@@ -48,218 +48,104 @@ namespace Qt3D {
\class Qt3D::QThreadPoolerPrivate
\internal
*/
-QThreadPoolerPrivate::QThreadPoolerPrivate(QThreadPooler *qq)
- : QObjectPrivate(),
+QThreadPooler::QThreadPooler(QObject *parent)
+ : QObject(parent),
+ m_futureInterface(Q_NULLPTR),
m_mutex(new QMutex(QMutex::NonRecursive)),
- m_runningThreads(0)
+ m_taskCount(0)
{
- q_ptr = qq;
}
-QThreadPoolerPrivate::~QThreadPoolerPrivate()
+QThreadPooler::~QThreadPooler()
{
- Q_FOREACH (QSharedPointer<TaskInterface> task, m_taskQueue)
- task->setDependencyHandler(Q_NULLPTR);
- delete m_dependencyHandler;
-
delete m_mutex;
}
-void QThreadPoolerPrivate::shutdown()
-{
- m_jobFinished.wakeAll();
-
- // When shutting down a signal is send for jobrunners to exit run() loop
- // on next round. Sometimes the jobrunner is busy doing still the clean up
- // tasks and isn't waiting the release of WaitCondition. Repeat the waking
- // process max tryOuts.
- const int tryOuts = 2;
-
- Q_FOREACH (JobRunner *jr, m_workers) {
- if (!jr->isFinished()) {
- for (int i = 0; i < tryOuts; i++) {
- m_jobAvailable.wakeAll();
- if (jr->wait(100))
- break;
- }
- }
- }
-}
-
-
-bool QThreadPoolerPrivate::isQueueEmpty()
-{
- return m_taskQueue.isEmpty();
-}
-
-void QThreadPoolerPrivate::incRunningThreads()
-{
- m_runningThreads += 1;
-}
-
-void QThreadPoolerPrivate::decRunningThreads()
+void QThreadPooler::setDependencyHandler(DependencyHandler *handler)
{
- m_runningThreads -= 1;
-
- // Sanity check
- if (m_runningThreads < 0)
- m_runningThreads = 0;
+ m_dependencyHandler = handler;
+ m_dependencyHandler->setMutex(m_mutex);
}
-void QThreadPoolerPrivate::createRunners(int threadCount)
+void QThreadPooler::enqueueTasks(QVector<RunnableInterface *> &tasks)
{
- Q_Q(QThreadPooler);
-
- for (int i = 0; i < threadCount; i++) {
- JobRunner *jr = new JobRunner(q);
+ // The caller have to set the mutex
- jr->setMutex(m_mutex);
- jr->setWaitConditions(&(m_jobAvailable));
- jr->start();
-
- m_workers.append(jr);
+ for (QVector<RunnableInterface *>::iterator it = tasks.begin();
+ it != tasks.end(); it++) {
+ if (!m_dependencyHandler->hasDependency((*it))) {
+ (*it)->setPooler(this);
+ QThreadPool::globalInstance()->start((*it));
+ }
}
}
-int QThreadPoolerPrivate::maxThreadCount() const
-{
- return m_maxThreadCount;
-}
-
-void QThreadPoolerPrivate::setMaxThreadCount(int threadCount)
+void QThreadPooler::taskFinished(QVector<RunnableInterface *> tasks)
{
+ const QMutexLocker locker(m_mutex);
- m_maxThreadCount = threadCount;
- createRunners(m_maxThreadCount);
-}
-
-/////////////////////////////////////////////////
-
-QThreadPooler::QThreadPooler(QObject *parent)
- : QObject(*new QThreadPoolerPrivate(this), parent)
-{
-}
-
-QThreadPooler::~QThreadPooler()
-{
- Q_D(QThreadPooler);
+ release();
- emit shuttingDown();
- d->m_jobAvailable.wakeAll();
+ if (tasks.size())
+ enqueueTasks(tasks);
- d->shutdown();
-}
-
-int QThreadPooler::maxThreadCount() const
-{
- Q_D(const QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- return d->maxThreadCount();
-}
-
-void QThreadPooler::setMaxThreadCount(int threadCount)
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- d->setMaxThreadCount(threadCount);
-}
-
-QSharedPointer<TaskInterface> QThreadPooler::nextTask()
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- QSharedPointer<TaskInterface> task;
- int queueSize = d->m_taskQueue.size();
- for (int i = 0; i < queueSize; i++) {
- const QSharedPointer<TaskInterface> &candidate = d->m_taskQueue.at(i);
- if (!hasDependencies(candidate)) {
- task = candidate;
- // Increment running thread counter before removing item from queue
- // so that isIdle test keeps up
- d->incRunningThreads();
- d->m_taskQueue.removeAt(i);
-
- break;
+ if (currentCount() == 0) {
+ if (m_futureInterface) {
+ m_futureInterface->reportFinished();
+ delete m_futureInterface;
}
+ m_futureInterface = Q_NULLPTR;
}
-
- return task;
}
-bool QThreadPooler::hasDependencies(const QSharedPointer<TaskInterface> &task)
+QFuture<void> QThreadPooler::mapDependables(QVector<RunnableInterface *> &taskQueue)
{
- DependencyHandler *handler = task->dependencyHandler();
- if (handler)
- return handler->hasDependency(task);
+ const QMutexLocker locker(m_mutex);
- return false;
-}
-
-void QThreadPooler::enqueueTask(const QSharedPointer<TaskInterface> &task)
-{
- Q_D(QThreadPooler);
+ if (!m_futureInterface)
+ m_futureInterface = new QFutureInterface<void>();
+ if (taskQueue.size())
+ m_futureInterface->reportStarted();
- const QMutexLocker locker(d->m_mutex);
+ acquire(taskQueue.size());
+ enqueueTasks(taskQueue);
- d->m_taskQueue.append(task);
- d->m_jobAvailable.wakeAll();
+ return QFuture<void>(m_futureInterface);
}
-void QThreadPooler::flush()
+QFuture<void> QThreadPooler::future()
{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
-#ifdef QT_NO_DEBUG
- const int waitTime = 50;
-#else
- const int waitTime = 500;
-#endif
-
- while (!isIdling()) {
- if (d->m_jobFinished.wait(d->m_mutex, waitTime) == false)
- d->m_jobAvailable.wakeAll();
- }
+ if (!m_futureInterface)
+ return QFuture<void>();
+ else
+ return QFuture<void>(m_futureInterface);
}
-bool QThreadPooler::isIdling()
+void QThreadPooler::acquire(int add)
{
- Q_D(QThreadPooler);
-
- return d->isQueueEmpty() && d->m_runningThreads == 0;
-}
-
-void QThreadPooler::startRunning()
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- d->incRunningThreads();
+ forever {
+ int localCount = m_taskCount.load();
+ if (m_taskCount.testAndSetOrdered(localCount, localCount + add))
+ return;
+ }
}
-void QThreadPooler::stopRunning()
+void QThreadPooler::release()
{
- Q_D(QThreadPooler);
+ forever {
+ int localCount = m_taskCount.load();
- const QMutexLocker locker(d->m_mutex);
+ // Task counter going below zero means coding errors somewhere.
+ Q_ASSERT(localCount > 0);
- d->decRunningThreads();
- d->m_jobFinished.wakeAll();
+ if (m_taskCount.testAndSetOrdered(localCount, localCount - 1))
+ return;
+ }
}
-void QThreadPooler::setDependencyHandler(DependencyHandler *handler)
+int QThreadPooler::currentCount()
{
- Q_D(QThreadPooler);
-
- d->setDependencyHandler(handler);
+ return m_taskCount.load();
}
} // namespace Qt3D
diff --git a/src/core/jobs/qthreadpooler_p.h b/src/core/jobs/qthreadpooler_p.h
index 6579e6283..9abbdc8d0 100644
--- a/src/core/jobs/qthreadpooler_p.h
+++ b/src/core/jobs/qthreadpooler_p.h
@@ -41,15 +41,14 @@
#include "dependencyhandler_p.h"
#include <QtCore/QObject>
-#include <QtCore/QWaitCondition>
#include <QtCore/QSharedPointer>
+#include <QtCore/QFutureInterface>
+#include <QtCore/QFuture>
QT_BEGIN_NAMESPACE
namespace Qt3D {
-class QThreadPoolerPrivate;
-
class QThreadPooler : public QObject
{
Q_OBJECT
@@ -58,25 +57,23 @@ public:
explicit QThreadPooler(QObject *parent = 0);
~QThreadPooler();
- int maxThreadCount() const;
- void setMaxThreadCount(int threadCount);
- QSharedPointer<TaskInterface> nextTask();
- void enqueueTask(const QSharedPointer<TaskInterface> &task);
- void flush();
- void startRunning();
- void stopRunning();
- void setDependencyHandler(DependencyHandler *handler);
+ QFuture<void> mapDependables(QVector<RunnableInterface *> &taskQueue);
+ void taskFinished(QVector<RunnableInterface *> tasks);
+ QFuture<void> future();
-signals:
- void shuttingDown();
+ void setDependencyHandler(DependencyHandler *handler);
private:
- void manageThreads();
- bool hasDependencies(const QSharedPointer<TaskInterface> &task);
- bool isIdling();
+ void enqueueTasks(QVector<RunnableInterface *> &tasks);
+ void acquire(int add);
+ void release();
+ int currentCount();
private:
- Q_DECLARE_PRIVATE(QThreadPooler)
+ QFutureInterface<void> *m_futureInterface;
+ QMutex *m_mutex;
+ DependencyHandler *m_dependencyHandler;
+ QAtomicInt m_taskCount;
};
} // namespace Qt3D
diff --git a/src/core/jobs/qthreadpooler_p_p.h b/src/core/jobs/qthreadpooler_p_p.h
deleted file mode 100644
index 0057acc36..000000000
--- a/src/core/jobs/qthreadpooler_p_p.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/****************************************************************************
-**
-** Copyright (C) 2015 The Qt Company Ltd.
-** Contact: http://www.qt.io/licensing/
-**
-** This file is part of the Qt3D module of the Qt Toolkit.
-**
-** $QT_BEGIN_LICENSE:LGPL3$
-** Commercial License Usage
-** Licensees holding valid commercial Qt licenses may use this file in
-** accordance with the commercial license agreement provided with the
-** Software or, alternatively, in accordance with the terms contained in
-** a written agreement between you and The Qt Company. For licensing terms
-** and conditions see http://www.qt.io/terms-conditions. For further
-** information use the contact form at http://www.qt.io/contact-us.
-**
-** GNU Lesser General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU Lesser
-** General Public License version 3 as published by the Free Software
-** Foundation and appearing in the file LICENSE.LGPLv3 included in the
-** packaging of this file. Please review the following information to
-** ensure the GNU Lesser General Public License version 3 requirements
-** will be met: https://www.gnu.org/licenses/lgpl.html.
-**
-** GNU General Public License Usage
-** Alternatively, this file may be used under the terms of the GNU
-** General Public License version 2.0 or later as published by the Free
-** Software Foundation and appearing in the file LICENSE.GPL included in
-** the packaging of this file. Please review the following information to
-** ensure the GNU General Public License version 2.0 requirements will be
-** met: http://www.gnu.org/licenses/gpl-2.0.html.
-**
-** $QT_END_LICENSE$
-**
-****************************************************************************/
-
-#ifndef QT3D_QTHREADPOOLER_P_H
-#define QT3D_QTHREADPOOLER_P_H
-
-#include "jobrunner_p.h"
-#include "dependencyhandler_p.h"
-
-#include <QtCore/QtGlobal>
-#include <private/qobject_p.h>
-
-QT_BEGIN_NAMESPACE
-
-namespace Qt3D {
-
-class QThreadPooler;
-
-class QThreadPoolerPrivate : public QObjectPrivate
-{
-public:
- QThreadPoolerPrivate(QThreadPooler *qq);
- ~QThreadPoolerPrivate();
-
- bool isQueueEmpty();
-
- void incRunningThreads();
- void decRunningThreads();
-
- inline void setDependencyHandler(DependencyHandler *handler)
- {
- m_dependencyHandler = handler;
- }
-
- void createRunners(int threadCount);
- void shutdown();
-
- int maxThreadCount() const;
- void setMaxThreadCount(int threadCount);
-
- Q_DECLARE_PUBLIC(QThreadPooler)
-
-private:
- QList<JobRunner *> m_workers;
- QVector<QSharedPointer<TaskInterface> > m_taskQueue;
-
- QWaitCondition m_jobAvailable;
- QWaitCondition m_jobFinished;
- QMutex *m_mutex;
- int m_runningThreads;
- int m_maxThreadCount;
- DependencyHandler *m_dependencyHandler;
-};
-
-}
-
-QT_END_NAMESPACE
-
-#endif // QT3D_QTHREADPOOLER_P_H
-
diff --git a/src/core/jobs/task.cpp b/src/core/jobs/task.cpp
index 87ddd57a5..55f4bacb6 100644
--- a/src/core/jobs/task.cpp
+++ b/src/core/jobs/task.cpp
@@ -35,8 +35,8 @@
****************************************************************************/
#include "task_p.h"
-#include "jobrunner_p.h"
#include "dependencyhandler_p.h"
+#include "qthreadpooler_p.h"
#include <QMutexLocker>
@@ -46,75 +46,61 @@ QT_BEGIN_NAMESPACE
namespace Qt3D {
-
-TaskInterface::~TaskInterface()
+RunnableInterface::~RunnableInterface()
{
}
// Aspect task
-AspectTask::AspectTask()
+AspectTaskRunnable::AspectTaskRunnable()
: m_dependencyHandler(0)
{
}
-AspectTask::~AspectTask()
+AspectTaskRunnable::~AspectTaskRunnable()
{
}
-void AspectTask::run(QSharedPointer<TaskInterface> self, JobRunner *jr)
+void AspectTaskRunnable::run()
{
- Q_UNUSED(self);
- Q_UNUSED(jr);
if (m_job)
m_job->run();
- // Cleanup stuff
- // For now at least dependecies.
+ QVector<RunnableInterface *> freedTasks;
if (m_dependencyHandler)
- m_dependencyHandler->freeDependencies(self);
-}
+ freedTasks = m_dependencyHandler->freeDependencies(this);
-void AspectTask::run()
-{
+ if (m_pooler)
+ m_pooler->taskFinished(freedTasks);
}
-void AspectTask::setDependencyHandler(DependencyHandler *handler)
+void AspectTaskRunnable::setDependencyHandler(DependencyHandler *handler)
{
m_dependencyHandler = handler;
}
-DependencyHandler *AspectTask::dependencyHandler()
+DependencyHandler *AspectTaskRunnable::dependencyHandler()
{
return m_dependencyHandler;
}
// Synchronized task
-SynchronizedTask::SynchronizedTask(QAbstractAspectJobManager::JobFunction func,
+SyncTaskRunnable::SyncTaskRunnable(QAbstractAspectJobManager::JobFunction func,
void *arg, QAtomicInt *atomicCount)
: m_func(func),
m_arg(arg),
- m_atomicCount(atomicCount)
-{
-}
-
-SynchronizedTask::~SynchronizedTask()
+ m_atomicCount(atomicCount),
+ m_pooler(Q_NULLPTR)
{
}
-void SynchronizedTask::run()
+SyncTaskRunnable::~SyncTaskRunnable()
{
- m_func(m_arg);
}
-void SynchronizedTask::run(QSharedPointer<TaskInterface> self, JobRunner *jr)
+void SyncTaskRunnable::run()
{
- Q_UNUSED(self);
- Q_UNUSED(jr);
- Q_ASSERT(m_func);
- Q_ASSERT(jr);
-
// Call the function
m_func(m_arg);
@@ -123,20 +109,22 @@ void SynchronizedTask::run(QSharedPointer<TaskInterface> self, JobRunner *jr)
// Wait for the other worker threads to be done
while (m_atomicCount->load() > 0)
- jr->yieldCurrentThread();
+ QThread::currentThread()->yieldCurrentThread();
+
+ if (m_pooler)
+ m_pooler->taskFinished(QVector<RunnableInterface *>());
}
-void SynchronizedTask::setDependencyHandler(DependencyHandler *handler)
+void SyncTaskRunnable::setDependencyHandler(DependencyHandler *handler)
{
Q_UNUSED(handler);
}
-DependencyHandler *SynchronizedTask::dependencyHandler()
+DependencyHandler *SyncTaskRunnable::dependencyHandler()
{
return Q_NULLPTR;
}
-
} // namespace Qt3D {
QT_END_NAMESPACE
diff --git a/src/core/jobs/task_p.h b/src/core/jobs/task_p.h
index cde8f90eb..9b5c6a0f2 100644
--- a/src/core/jobs/task_p.h
+++ b/src/core/jobs/task_p.h
@@ -42,7 +42,8 @@
#include <QtCore/QtGlobal>
#include <QtCore/QThread>
#include <QtCore/QSharedPointer>
-#include <QtCore/QWaitCondition>
+
+#include <QtCore/QRunnable>
QT_BEGIN_NAMESPACE
@@ -50,13 +51,13 @@ namespace Qt3D {
class JobRunner;
class DependencyHandler;
+class QThreadPooler;
-class TaskInterface
+class RunnableInterface : public QRunnable
{
public:
- virtual ~TaskInterface();
+ virtual ~RunnableInterface();
- virtual void run(QSharedPointer<TaskInterface> self, JobRunner *jr) = 0;
virtual void run() = 0;
virtual void setDependencyHandler(DependencyHandler *) = 0;
@@ -64,54 +65,60 @@ public:
virtual int id() = 0;
virtual void setId(int id) = 0;
+
+ virtual void setPooler(QThreadPooler *pooler) = 0;
};
-class AspectTask : public TaskInterface
+class AspectTaskRunnable : public RunnableInterface
{
public:
- AspectTask();
- ~AspectTask();
+ AspectTaskRunnable();
+ ~AspectTaskRunnable();
- int id() Q_DECL_OVERRIDE { return m_id; }
- void setId(int id) Q_DECL_OVERRIDE { m_id = id; }
+ void run();
void setDependencyHandler(DependencyHandler *handler) Q_DECL_OVERRIDE;
DependencyHandler *dependencyHandler() Q_DECL_OVERRIDE;
+ void setPooler(QThreadPooler *pooler) Q_DECL_OVERRIDE { m_pooler = pooler; }
+
+ int id() Q_DECL_OVERRIDE { return m_id; }
+ void setId(int id) Q_DECL_OVERRIDE { m_id = id; }
+
public:
QSharedPointer<QAspectJob> m_job;
-protected:
- void run(QSharedPointer<TaskInterface> self, JobRunner *jr) Q_DECL_OVERRIDE;
- void run() Q_DECL_OVERRIDE;
-
private:
DependencyHandler *m_dependencyHandler;
+ QThreadPooler *m_pooler;
+
int m_id; // For testing purposes for now
};
-class SynchronizedTask : public TaskInterface
+class SyncTaskRunnable : public RunnableInterface
{
public:
- explicit SynchronizedTask(QAbstractAspectJobManager::JobFunction func, void *arg,
+ explicit SyncTaskRunnable(QAbstractAspectJobManager::JobFunction func, void *arg,
QAtomicInt *atomicCount);
- ~SynchronizedTask();
+ ~SyncTaskRunnable();
- int id() Q_DECL_OVERRIDE { return m_id; }
- void setId(int id) Q_DECL_OVERRIDE { m_id = id; }
+ void run();
void setDependencyHandler(DependencyHandler *handler) Q_DECL_OVERRIDE;
DependencyHandler *dependencyHandler() Q_DECL_OVERRIDE;
-protected:
- void run(QSharedPointer<TaskInterface> self, JobRunner *jr) Q_DECL_OVERRIDE;
- void run() Q_DECL_OVERRIDE;
+ void setPooler(QThreadPooler *pooler) Q_DECL_OVERRIDE { m_pooler = pooler; }
+
+ int id() Q_DECL_OVERRIDE { return m_id; }
+ void setId(int id) Q_DECL_OVERRIDE { m_id = id; }
private:
QAbstractAspectJobManager::JobFunction m_func;
void *m_arg;
QAtomicInt *m_atomicCount;
+ QThreadPooler *m_pooler;
+
int m_id;
};