From 96501b0a18f4f70048403dccc4cb42dd71db8f9d Mon Sep 17 00:00:00 2001 From: Lars Knoll Date: Fri, 3 Feb 2012 14:17:26 +0100 Subject: Move QtConcurrent into its own module Task-number: QTBUG-20892 Change-Id: I614500aafb6428915509983608bbb0ade4e4f016 Reviewed-by: Thiago Macieira --- src/concurrent/qfutureinterface.cpp | 565 ++++++++++++++++++++++++++++++++++++ 1 file changed, 565 insertions(+) create mode 100644 src/concurrent/qfutureinterface.cpp (limited to 'src/concurrent/qfutureinterface.cpp') diff --git a/src/concurrent/qfutureinterface.cpp b/src/concurrent/qfutureinterface.cpp new file mode 100644 index 0000000000..9a273a1076 --- /dev/null +++ b/src/concurrent/qfutureinterface.cpp @@ -0,0 +1,565 @@ +/**************************************************************************** +** +** Copyright (C) 2012 Nokia Corporation and/or its subsidiary(-ies). +** Contact: http://www.qt-project.org/ +** +** This file is part of the QtCore module of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:LGPL$ +** GNU Lesser General Public License Usage +** This file may be used under the terms of the GNU Lesser General Public +** License version 2.1 as published by the Free Software Foundation and +** appearing in the file LICENSE.LGPL included in the packaging of this +** file. Please review the following information to ensure the GNU Lesser +** General Public License version 2.1 requirements will be met: +** http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html. +** +** In addition, as a special exception, Nokia gives you certain additional +** rights. These rights are described in the Nokia Qt LGPL Exception +** version 1.1, included in the file LGPL_EXCEPTION.txt in this package. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU General +** Public License version 3.0 as published by the Free Software Foundation +** and appearing in the file LICENSE.GPL included in the packaging of this +** file. Please review the following information to ensure the GNU General +** Public License version 3.0 requirements will be met: +** http://www.gnu.org/copyleft/gpl.html. +** +** Other Usage +** Alternatively, this file may be used in accordance with the terms and +** conditions contained in a signed written agreement between you and Nokia. +** +** +** +** +** +** +** $QT_END_LICENSE$ +** +****************************************************************************/ + +// qfutureinterface.h included from qfuture.h +#include "qfuture.h" + +#ifndef QT_NO_QFUTURE + +#include +#include +#include +#include + +#include "qfutureinterface_p.h" + +QT_BEGIN_NAMESPACE + +enum { + MaxProgressEmitsPerSecond = 25 +}; + +QFutureInterfaceBase::QFutureInterfaceBase(State initialState) + : d(new QFutureInterfaceBasePrivate(initialState)) +{ } + +QFutureInterfaceBase::QFutureInterfaceBase(const QFutureInterfaceBase &other) + : d(other.d) +{ + d->refCount.ref(); +} + +QFutureInterfaceBase::~QFutureInterfaceBase() +{ + if (!d->refCount.deref()) + delete d; +} + +void QFutureInterfaceBase::cancel() +{ + QMutexLocker locker(&d->m_mutex); + if (d->state & Canceled) + return; + + d->state = State((d->state & ~Paused) | Canceled); + d->waitCondition.wakeAll(); + d->pausedWaitCondition.wakeAll(); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); +} + +void QFutureInterfaceBase::setPaused(bool paused) +{ + QMutexLocker locker(&d->m_mutex); + if (paused) { + d->state = State(d->state | Paused); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused)); + } else { + d->state = State(d->state & ~Paused); + d->pausedWaitCondition.wakeAll(); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); + } +} + +void QFutureInterfaceBase::togglePaused() +{ + QMutexLocker locker(&d->m_mutex); + if (d->state & Paused) { + d->state = State(d->state & ~Paused); + d->pausedWaitCondition.wakeAll(); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed)); + } else { + d->state = State(d->state | Paused); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused)); + } +} + +void QFutureInterfaceBase::setThrottled(bool enable) +{ + // bail out if we are not changing the state + if ((enable && (d->state & Throttled)) || (!enable && !(d->state & Throttled))) + return; + + // lock and change the state + QMutexLocker lock(&d->m_mutex); + if (enable) { + d->state = State(d->state | Throttled); + } else { + d->state = State(d->state & ~Throttled); + if (!(d->state & Paused)) + d->pausedWaitCondition.wakeAll(); + } +} + + +bool QFutureInterfaceBase::isRunning() const +{ + return queryState(Running); +} + +bool QFutureInterfaceBase::isStarted() const +{ + return queryState(Started); +} + +bool QFutureInterfaceBase::isCanceled() const +{ + return queryState(Canceled); +} + +bool QFutureInterfaceBase::isFinished() const +{ + return queryState(Finished); +} + +bool QFutureInterfaceBase::isPaused() const +{ + return queryState(Paused); +} + +bool QFutureInterfaceBase::isThrottled() const +{ + return queryState(Throttled); +} + +bool QFutureInterfaceBase::isResultReadyAt(int index) const +{ + QMutexLocker lock(&d->m_mutex); + return d->internal_isResultReadyAt(index); +} + +bool QFutureInterfaceBase::waitForNextResult() +{ + QMutexLocker lock(&d->m_mutex); + return d->internal_waitForNextResult(); +} + +void QFutureInterfaceBase::waitForResume() +{ + // return early if possible to avoid taking the mutex lock. + if ((d->state & Paused) == false || (d->state & Canceled)) + return; + + QMutexLocker lock(&d->m_mutex); + if ((d->state & Paused) == false || (d->state & Canceled)) + return; + + // decrease active thread count since this thread will wait. + QThreadPool::globalInstance()->releaseThread(); + + d->pausedWaitCondition.wait(&d->m_mutex); + + QThreadPool::globalInstance()->reserveThread(); +} + +int QFutureInterfaceBase::progressValue() const +{ + return d->m_progressValue; +} + +int QFutureInterfaceBase::progressMinimum() const +{ + return d->m_progressMinimum; +} + +int QFutureInterfaceBase::progressMaximum() const +{ + return d->m_progressMaximum; +} + +int QFutureInterfaceBase::resultCount() const +{ + QMutexLocker lock(&d->m_mutex); + return d->internal_resultCount(); +} + +QString QFutureInterfaceBase::progressText() const +{ + QMutexLocker locker(&d->m_mutex); + return d->m_progressText; +} + +bool QFutureInterfaceBase::isProgressUpdateNeeded() const +{ + QMutexLocker locker(&d->m_mutex); + return !d->progressTime.isValid() || (d->progressTime.elapsed() > (1000 / MaxProgressEmitsPerSecond)); +} + +void QFutureInterfaceBase::reportStarted() +{ + QMutexLocker locker(&d->m_mutex); + if ((d->state & Started) || (d->state & Canceled) || (d->state & Finished)) + return; + + d->setState(State(Started | Running)); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Started)); +} + +void QFutureInterfaceBase::reportCanceled() +{ + cancel(); +} + +#ifndef QT_NO_EXCEPTIONS +void QFutureInterfaceBase::reportException(const QtConcurrent::Exception &exception) +{ + QMutexLocker locker(&d->m_mutex); + if ((d->state & Canceled) || (d->state & Finished)) + return; + + d->m_exceptionStore.setException(exception); + d->state = State(d->state | Canceled); + d->waitCondition.wakeAll(); + d->pausedWaitCondition.wakeAll(); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); +} +#endif + +void QFutureInterfaceBase::reportFinished() +{ + QMutexLocker locker(&d->m_mutex); + if (!(d->state & Finished)) { + d->state = State((d->state & ~Running) | Finished); + d->waitCondition.wakeAll(); + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); + } +} + +void QFutureInterfaceBase::setExpectedResultCount(int resultCount) +{ + if (d->manualProgress == false) + setProgressRange(0, resultCount); + d->m_expectedResultCount = resultCount; +} + +int QFutureInterfaceBase::expectedResultCount() +{ + return d->m_expectedResultCount; +} + +bool QFutureInterfaceBase::queryState(State state) const +{ + return (d->state & state); +} + +void QFutureInterfaceBase::waitForResult(int resultIndex) +{ + d->m_exceptionStore.throwPossibleException(); + + if (!(d->state & Running)) + return; + + // To avoid deadlocks and reduce the number of threads used, try to + // run the runnable in the current thread. + QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable); + + QMutexLocker lock(&d->m_mutex); + + if (!(d->state & Running)) + return; + + const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex; + while ((d->state & Running) && d->internal_isResultReadyAt(waitIndex) == false) + d->waitCondition.wait(&d->m_mutex); + + d->m_exceptionStore.throwPossibleException(); +} + +void QFutureInterfaceBase::waitForFinished() +{ + if (d->state & Running) { + QThreadPool::globalInstance()->d_func()->stealRunnable(d->runnable); + + QMutexLocker lock(&d->m_mutex); + + while (d->state & Running) + d->waitCondition.wait(&d->m_mutex); + } + + d->m_exceptionStore.throwPossibleException(); +} + +void QFutureInterfaceBase::reportResultsReady(int beginIndex, int endIndex) +{ + if ((d->state & Canceled) || (d->state & Finished) || beginIndex == endIndex) + return; + + d->waitCondition.wakeAll(); + + if (d->manualProgress == false) { + if (d->internal_updateProgress(d->m_progressValue + endIndex - beginIndex) == false) { + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, + beginIndex, + endIndex)); + return; + } + + d->sendCallOuts(QFutureCallOutEvent(QFutureCallOutEvent::Progress, + d->m_progressValue, + d->m_progressText), + QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, + beginIndex, + endIndex)); + return; + } + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, beginIndex, endIndex)); +} + +void QFutureInterfaceBase::setRunnable(QRunnable *runnable) +{ + d->runnable = runnable; +} + +void QFutureInterfaceBase::setFilterMode(bool enable) +{ + QMutexLocker locker(&d->m_mutex); + resultStoreBase().setFilterMode(enable); +} + +void QFutureInterfaceBase::setProgressRange(int minimum, int maximum) +{ + QMutexLocker locker(&d->m_mutex); + d->m_progressMinimum = minimum; + d->m_progressMaximum = maximum; + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, minimum, maximum)); +} + +void QFutureInterfaceBase::setProgressValue(int progressValue) +{ + setProgressValueAndText(progressValue, QString()); +} + +void QFutureInterfaceBase::setProgressValueAndText(int progressValue, + const QString &progressText) +{ + QMutexLocker locker(&d->m_mutex); + if (d->manualProgress == false) + d->manualProgress = true; + if (d->m_progressValue >= progressValue) + return; + + if ((d->state & Canceled) || (d->state & Finished)) + return; + + if (d->internal_updateProgress(progressValue, progressText)) { + d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Progress, + d->m_progressValue, + d->m_progressText)); + } +} + +QMutex *QFutureInterfaceBase::mutex() const +{ + return &d->m_mutex; +} + +QtConcurrent::internal::ExceptionStore &QFutureInterfaceBase::exceptionStore() +{ + return d->m_exceptionStore; +} + +QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() +{ + return d->m_results; +} + +const QtConcurrent::ResultStoreBase &QFutureInterfaceBase::resultStoreBase() const +{ + return d->m_results; +} + +QFutureInterfaceBase &QFutureInterfaceBase::operator=(const QFutureInterfaceBase &other) +{ + other.d->refCount.ref(); + if (!d->refCount.deref()) + delete d; + d = other.d; + return *this; +} + +bool QFutureInterfaceBase::referenceCountIsOne() const +{ + return d->refCount.load() == 1; +} + +QFutureInterfaceBasePrivate::QFutureInterfaceBasePrivate(QFutureInterfaceBase::State initialState) + : refCount(1), m_progressValue(0), m_progressMinimum(0), m_progressMaximum(0), + state(initialState), pendingResults(0), + manualProgress(false), m_expectedResultCount(0), runnable(0) +{ + progressTime.invalidate(); +} + +int QFutureInterfaceBasePrivate::internal_resultCount() const +{ + return m_results.count(); // ### subtract canceled results. +} + +bool QFutureInterfaceBasePrivate::internal_isResultReadyAt(int index) const +{ + return (m_results.contains(index)); +} + +bool QFutureInterfaceBasePrivate::internal_waitForNextResult() +{ + if (m_results.hasNextResult()) + return true; + + while ((state & QFutureInterfaceBase::Running) && m_results.hasNextResult() == false) + waitCondition.wait(&m_mutex); + + return (!(state & QFutureInterfaceBase::Canceled) && m_results.hasNextResult()); +} + +bool QFutureInterfaceBasePrivate::internal_updateProgress(int progress, + const QString &progressText) +{ + if (m_progressValue >= progress) + return false; + + m_progressValue = progress; + m_progressText = progressText; + + if (progressTime.isValid() && m_progressValue != m_progressMaximum) // make sure the first and last steps are emitted. + if (progressTime.elapsed() < (1000 / MaxProgressEmitsPerSecond)) + return false; + + progressTime.start(); + return true; +} + +void QFutureInterfaceBasePrivate::internal_setThrottled(bool enable) +{ + // bail out if we are not changing the state + if ((enable && (state & QFutureInterfaceBase::Throttled)) + || (!enable && !(state & QFutureInterfaceBase::Throttled))) + return; + + // change the state + if (enable) { + state = QFutureInterfaceBase::State(state | QFutureInterfaceBase::Throttled); + } else { + state = QFutureInterfaceBase::State(state & ~QFutureInterfaceBase::Throttled); + if (!(state & QFutureInterfaceBase::Paused)) + pausedWaitCondition.wakeAll(); + } +} + +void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOutEvent) +{ + if (outputConnections.isEmpty()) + return; + + for (int i = 0; i < outputConnections.count(); ++i) + outputConnections.at(i)->postCallOutEvent(callOutEvent); +} + +void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOutEvent1, + const QFutureCallOutEvent &callOutEvent2) +{ + if (outputConnections.isEmpty()) + return; + + for (int i = 0; i < outputConnections.count(); ++i) { + QFutureCallOutInterface *interface = outputConnections.at(i); + interface->postCallOutEvent(callOutEvent1); + interface->postCallOutEvent(callOutEvent2); + } +} + +// This function connects an output interface (for example a QFutureWatcher) +// to this future. While holding the lock we check the state and ready results +// and add the appropriate callouts to the queue. In order to avoid deadlocks, +// the actual callouts are made at the end while not holding the lock. +void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface) +{ + QMutexLocker locker(&m_mutex); + + if (state & QFutureInterfaceBase::Started) { + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started)); + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, + m_progressMinimum, + m_progressMaximum)); + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, + m_progressValue, + m_progressText)); + } + + QtConcurrent::ResultIteratorBase it = m_results.begin(); + while (it != m_results.end()) { + const int begin = it.resultIndex(); + const int end = begin + it.batchSize(); + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, + begin, + end)); + it.batchedAdvance(); + } + + if (state & QFutureInterfaceBase::Paused) + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused)); + + if (state & QFutureInterfaceBase::Canceled) + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); + + if (state & QFutureInterfaceBase::Finished) + interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); + + outputConnections.append(interface); +} + +void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface) +{ + QMutexLocker lock(&m_mutex); + const int index = outputConnections.indexOf(interface); + if (index == -1) + return; + outputConnections.removeAt(index); + + interface->callOutInterfaceDisconnected(); +} + +void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState) +{ + state = newState; +} + +QT_END_NAMESPACE + +#endif // QT_NO_CONCURRENT -- cgit v1.2.3