summaryrefslogtreecommitdiffstats
path: root/src/core/jobs/qthreadpooler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/jobs/qthreadpooler.cpp')
-rw-r--r--src/core/jobs/qthreadpooler.cpp230
1 files changed, 58 insertions, 172 deletions
diff --git a/src/core/jobs/qthreadpooler.cpp b/src/core/jobs/qthreadpooler.cpp
index b48cc5aad..890a8942a 100644
--- a/src/core/jobs/qthreadpooler.cpp
+++ b/src/core/jobs/qthreadpooler.cpp
@@ -35,9 +35,9 @@
****************************************************************************/
#include "qthreadpooler_p.h"
-#include "qthreadpooler_p_p.h"
-#include "jobrunner_p.h"
#include "dependencyhandler_p.h"
+
+#include <QtCore/QThreadPool>
#include <QDebug>
QT_BEGIN_NAMESPACE
@@ -48,218 +48,104 @@ namespace Qt3D {
\class Qt3D::QThreadPoolerPrivate
\internal
*/
-QThreadPoolerPrivate::QThreadPoolerPrivate(QThreadPooler *qq)
- : QObjectPrivate(),
+QThreadPooler::QThreadPooler(QObject *parent)
+ : QObject(parent),
+ m_futureInterface(Q_NULLPTR),
m_mutex(new QMutex(QMutex::NonRecursive)),
- m_runningThreads(0)
+ m_taskCount(0)
{
- q_ptr = qq;
}
-QThreadPoolerPrivate::~QThreadPoolerPrivate()
+QThreadPooler::~QThreadPooler()
{
- Q_FOREACH (QSharedPointer<TaskInterface> task, m_taskQueue)
- task->setDependencyHandler(Q_NULLPTR);
- delete m_dependencyHandler;
-
delete m_mutex;
}
-void QThreadPoolerPrivate::shutdown()
-{
- m_jobFinished.wakeAll();
-
- // When shutting down a signal is send for jobrunners to exit run() loop
- // on next round. Sometimes the jobrunner is busy doing still the clean up
- // tasks and isn't waiting the release of WaitCondition. Repeat the waking
- // process max tryOuts.
- const int tryOuts = 2;
-
- Q_FOREACH (JobRunner *jr, m_workers) {
- if (!jr->isFinished()) {
- for (int i = 0; i < tryOuts; i++) {
- m_jobAvailable.wakeAll();
- if (jr->wait(100))
- break;
- }
- }
- }
-}
-
-
-bool QThreadPoolerPrivate::isQueueEmpty()
-{
- return m_taskQueue.isEmpty();
-}
-
-void QThreadPoolerPrivate::incRunningThreads()
-{
- m_runningThreads += 1;
-}
-
-void QThreadPoolerPrivate::decRunningThreads()
+void QThreadPooler::setDependencyHandler(DependencyHandler *handler)
{
- m_runningThreads -= 1;
-
- // Sanity check
- if (m_runningThreads < 0)
- m_runningThreads = 0;
+ m_dependencyHandler = handler;
+ m_dependencyHandler->setMutex(m_mutex);
}
-void QThreadPoolerPrivate::createRunners(int threadCount)
+void QThreadPooler::enqueueTasks(QVector<RunnableInterface *> &tasks)
{
- Q_Q(QThreadPooler);
-
- for (int i = 0; i < threadCount; i++) {
- JobRunner *jr = new JobRunner(q);
+ // The caller have to set the mutex
- jr->setMutex(m_mutex);
- jr->setWaitConditions(&(m_jobAvailable));
- jr->start();
-
- m_workers.append(jr);
+ for (QVector<RunnableInterface *>::iterator it = tasks.begin();
+ it != tasks.end(); it++) {
+ if (!m_dependencyHandler->hasDependency((*it))) {
+ (*it)->setPooler(this);
+ QThreadPool::globalInstance()->start((*it));
+ }
}
}
-int QThreadPoolerPrivate::maxThreadCount() const
-{
- return m_maxThreadCount;
-}
-
-void QThreadPoolerPrivate::setMaxThreadCount(int threadCount)
+void QThreadPooler::taskFinished(QVector<RunnableInterface *> tasks)
{
+ const QMutexLocker locker(m_mutex);
- m_maxThreadCount = threadCount;
- createRunners(m_maxThreadCount);
-}
-
-/////////////////////////////////////////////////
-
-QThreadPooler::QThreadPooler(QObject *parent)
- : QObject(*new QThreadPoolerPrivate(this), parent)
-{
-}
-
-QThreadPooler::~QThreadPooler()
-{
- Q_D(QThreadPooler);
+ release();
- emit shuttingDown();
- d->m_jobAvailable.wakeAll();
+ if (tasks.size())
+ enqueueTasks(tasks);
- d->shutdown();
-}
-
-int QThreadPooler::maxThreadCount() const
-{
- Q_D(const QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- return d->maxThreadCount();
-}
-
-void QThreadPooler::setMaxThreadCount(int threadCount)
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- d->setMaxThreadCount(threadCount);
-}
-
-QSharedPointer<TaskInterface> QThreadPooler::nextTask()
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- QSharedPointer<TaskInterface> task;
- int queueSize = d->m_taskQueue.size();
- for (int i = 0; i < queueSize; i++) {
- const QSharedPointer<TaskInterface> &candidate = d->m_taskQueue.at(i);
- if (!hasDependencies(candidate)) {
- task = candidate;
- // Increment running thread counter before removing item from queue
- // so that isIdle test keeps up
- d->incRunningThreads();
- d->m_taskQueue.removeAt(i);
-
- break;
+ if (currentCount() == 0) {
+ if (m_futureInterface) {
+ m_futureInterface->reportFinished();
+ delete m_futureInterface;
}
+ m_futureInterface = Q_NULLPTR;
}
-
- return task;
}
-bool QThreadPooler::hasDependencies(const QSharedPointer<TaskInterface> &task)
+QFuture<void> QThreadPooler::mapDependables(QVector<RunnableInterface *> &taskQueue)
{
- DependencyHandler *handler = task->dependencyHandler();
- if (handler)
- return handler->hasDependency(task);
+ const QMutexLocker locker(m_mutex);
- return false;
-}
-
-void QThreadPooler::enqueueTask(const QSharedPointer<TaskInterface> &task)
-{
- Q_D(QThreadPooler);
+ if (!m_futureInterface)
+ m_futureInterface = new QFutureInterface<void>();
+ if (taskQueue.size())
+ m_futureInterface->reportStarted();
- const QMutexLocker locker(d->m_mutex);
+ acquire(taskQueue.size());
+ enqueueTasks(taskQueue);
- d->m_taskQueue.append(task);
- d->m_jobAvailable.wakeAll();
+ return QFuture<void>(m_futureInterface);
}
-void QThreadPooler::flush()
+QFuture<void> QThreadPooler::future()
{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
-#ifdef QT_NO_DEBUG
- const int waitTime = 50;
-#else
- const int waitTime = 500;
-#endif
-
- while (!isIdling()) {
- if (d->m_jobFinished.wait(d->m_mutex, waitTime) == false)
- d->m_jobAvailable.wakeAll();
- }
+ if (!m_futureInterface)
+ return QFuture<void>();
+ else
+ return QFuture<void>(m_futureInterface);
}
-bool QThreadPooler::isIdling()
+void QThreadPooler::acquire(int add)
{
- Q_D(QThreadPooler);
-
- return d->isQueueEmpty() && d->m_runningThreads == 0;
-}
-
-void QThreadPooler::startRunning()
-{
- Q_D(QThreadPooler);
-
- const QMutexLocker locker(d->m_mutex);
-
- d->incRunningThreads();
+ forever {
+ int localCount = m_taskCount.load();
+ if (m_taskCount.testAndSetOrdered(localCount, localCount + add))
+ return;
+ }
}
-void QThreadPooler::stopRunning()
+void QThreadPooler::release()
{
- Q_D(QThreadPooler);
+ forever {
+ int localCount = m_taskCount.load();
- const QMutexLocker locker(d->m_mutex);
+ // Task counter going below zero means coding errors somewhere.
+ Q_ASSERT(localCount > 0);
- d->decRunningThreads();
- d->m_jobFinished.wakeAll();
+ if (m_taskCount.testAndSetOrdered(localCount, localCount - 1))
+ return;
+ }
}
-void QThreadPooler::setDependencyHandler(DependencyHandler *handler)
+int QThreadPooler::currentCount()
{
- Q_D(QThreadPooler);
-
- d->setDependencyHandler(handler);
+ return m_taskCount.load();
}
} // namespace Qt3D