diff options
Diffstat (limited to 'src/core/jobs/qthreadpooler.cpp')
-rw-r--r-- | src/core/jobs/qthreadpooler.cpp | 230 |
1 files changed, 58 insertions, 172 deletions
diff --git a/src/core/jobs/qthreadpooler.cpp b/src/core/jobs/qthreadpooler.cpp index b48cc5aad..890a8942a 100644 --- a/src/core/jobs/qthreadpooler.cpp +++ b/src/core/jobs/qthreadpooler.cpp @@ -35,9 +35,9 @@ ****************************************************************************/ #include "qthreadpooler_p.h" -#include "qthreadpooler_p_p.h" -#include "jobrunner_p.h" #include "dependencyhandler_p.h" + +#include <QtCore/QThreadPool> #include <QDebug> QT_BEGIN_NAMESPACE @@ -48,218 +48,104 @@ namespace Qt3D { \class Qt3D::QThreadPoolerPrivate \internal */ -QThreadPoolerPrivate::QThreadPoolerPrivate(QThreadPooler *qq) - : QObjectPrivate(), +QThreadPooler::QThreadPooler(QObject *parent) + : QObject(parent), + m_futureInterface(Q_NULLPTR), m_mutex(new QMutex(QMutex::NonRecursive)), - m_runningThreads(0) + m_taskCount(0) { - q_ptr = qq; } -QThreadPoolerPrivate::~QThreadPoolerPrivate() +QThreadPooler::~QThreadPooler() { - Q_FOREACH (QSharedPointer<TaskInterface> task, m_taskQueue) - task->setDependencyHandler(Q_NULLPTR); - delete m_dependencyHandler; - delete m_mutex; } -void QThreadPoolerPrivate::shutdown() -{ - m_jobFinished.wakeAll(); - - // When shutting down a signal is send for jobrunners to exit run() loop - // on next round. Sometimes the jobrunner is busy doing still the clean up - // tasks and isn't waiting the release of WaitCondition. Repeat the waking - // process max tryOuts. - const int tryOuts = 2; - - Q_FOREACH (JobRunner *jr, m_workers) { - if (!jr->isFinished()) { - for (int i = 0; i < tryOuts; i++) { - m_jobAvailable.wakeAll(); - if (jr->wait(100)) - break; - } - } - } -} - - -bool QThreadPoolerPrivate::isQueueEmpty() -{ - return m_taskQueue.isEmpty(); -} - -void QThreadPoolerPrivate::incRunningThreads() -{ - m_runningThreads += 1; -} - -void QThreadPoolerPrivate::decRunningThreads() +void QThreadPooler::setDependencyHandler(DependencyHandler *handler) { - m_runningThreads -= 1; - - // Sanity check - if (m_runningThreads < 0) - m_runningThreads = 0; + m_dependencyHandler = handler; + m_dependencyHandler->setMutex(m_mutex); } -void QThreadPoolerPrivate::createRunners(int threadCount) +void QThreadPooler::enqueueTasks(QVector<RunnableInterface *> &tasks) { - Q_Q(QThreadPooler); - - for (int i = 0; i < threadCount; i++) { - JobRunner *jr = new JobRunner(q); + // The caller have to set the mutex - jr->setMutex(m_mutex); - jr->setWaitConditions(&(m_jobAvailable)); - jr->start(); - - m_workers.append(jr); + for (QVector<RunnableInterface *>::iterator it = tasks.begin(); + it != tasks.end(); it++) { + if (!m_dependencyHandler->hasDependency((*it))) { + (*it)->setPooler(this); + QThreadPool::globalInstance()->start((*it)); + } } } -int QThreadPoolerPrivate::maxThreadCount() const -{ - return m_maxThreadCount; -} - -void QThreadPoolerPrivate::setMaxThreadCount(int threadCount) +void QThreadPooler::taskFinished(QVector<RunnableInterface *> tasks) { + const QMutexLocker locker(m_mutex); - m_maxThreadCount = threadCount; - createRunners(m_maxThreadCount); -} - -///////////////////////////////////////////////// - -QThreadPooler::QThreadPooler(QObject *parent) - : QObject(*new QThreadPoolerPrivate(this), parent) -{ -} - -QThreadPooler::~QThreadPooler() -{ - Q_D(QThreadPooler); + release(); - emit shuttingDown(); - d->m_jobAvailable.wakeAll(); + if (tasks.size()) + enqueueTasks(tasks); - d->shutdown(); -} - -int QThreadPooler::maxThreadCount() const -{ - Q_D(const QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - return d->maxThreadCount(); -} - -void QThreadPooler::setMaxThreadCount(int threadCount) -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - d->setMaxThreadCount(threadCount); -} - -QSharedPointer<TaskInterface> QThreadPooler::nextTask() -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - QSharedPointer<TaskInterface> task; - int queueSize = d->m_taskQueue.size(); - for (int i = 0; i < queueSize; i++) { - const QSharedPointer<TaskInterface> &candidate = d->m_taskQueue.at(i); - if (!hasDependencies(candidate)) { - task = candidate; - // Increment running thread counter before removing item from queue - // so that isIdle test keeps up - d->incRunningThreads(); - d->m_taskQueue.removeAt(i); - - break; + if (currentCount() == 0) { + if (m_futureInterface) { + m_futureInterface->reportFinished(); + delete m_futureInterface; } + m_futureInterface = Q_NULLPTR; } - - return task; } -bool QThreadPooler::hasDependencies(const QSharedPointer<TaskInterface> &task) +QFuture<void> QThreadPooler::mapDependables(QVector<RunnableInterface *> &taskQueue) { - DependencyHandler *handler = task->dependencyHandler(); - if (handler) - return handler->hasDependency(task); + const QMutexLocker locker(m_mutex); - return false; -} - -void QThreadPooler::enqueueTask(const QSharedPointer<TaskInterface> &task) -{ - Q_D(QThreadPooler); + if (!m_futureInterface) + m_futureInterface = new QFutureInterface<void>(); + if (taskQueue.size()) + m_futureInterface->reportStarted(); - const QMutexLocker locker(d->m_mutex); + acquire(taskQueue.size()); + enqueueTasks(taskQueue); - d->m_taskQueue.append(task); - d->m_jobAvailable.wakeAll(); + return QFuture<void>(m_futureInterface); } -void QThreadPooler::flush() +QFuture<void> QThreadPooler::future() { - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - -#ifdef QT_NO_DEBUG - const int waitTime = 50; -#else - const int waitTime = 500; -#endif - - while (!isIdling()) { - if (d->m_jobFinished.wait(d->m_mutex, waitTime) == false) - d->m_jobAvailable.wakeAll(); - } + if (!m_futureInterface) + return QFuture<void>(); + else + return QFuture<void>(m_futureInterface); } -bool QThreadPooler::isIdling() +void QThreadPooler::acquire(int add) { - Q_D(QThreadPooler); - - return d->isQueueEmpty() && d->m_runningThreads == 0; -} - -void QThreadPooler::startRunning() -{ - Q_D(QThreadPooler); - - const QMutexLocker locker(d->m_mutex); - - d->incRunningThreads(); + forever { + int localCount = m_taskCount.load(); + if (m_taskCount.testAndSetOrdered(localCount, localCount + add)) + return; + } } -void QThreadPooler::stopRunning() +void QThreadPooler::release() { - Q_D(QThreadPooler); + forever { + int localCount = m_taskCount.load(); - const QMutexLocker locker(d->m_mutex); + // Task counter going below zero means coding errors somewhere. + Q_ASSERT(localCount > 0); - d->decRunningThreads(); - d->m_jobFinished.wakeAll(); + if (m_taskCount.testAndSetOrdered(localCount, localCount - 1)) + return; + } } -void QThreadPooler::setDependencyHandler(DependencyHandler *handler) +int QThreadPooler::currentCount() { - Q_D(QThreadPooler); - - d->setDependencyHandler(handler); + return m_taskCount.load(); } } // namespace Qt3D |