diff options
Diffstat (limited to 'src/corelib/thread/qfutureinterface.cpp')
-rw-r--r-- | src/corelib/thread/qfutureinterface.cpp | 205 |
1 files changed, 148 insertions, 57 deletions
diff --git a/src/corelib/thread/qfutureinterface.cpp b/src/corelib/thread/qfutureinterface.cpp index a82b7af873..f83306af00 100644 --- a/src/corelib/thread/qfutureinterface.cpp +++ b/src/corelib/thread/qfutureinterface.cpp @@ -6,13 +6,11 @@ #include "qfutureinterface_p.h" #include <QtCore/qatomic.h> +#include <QtCore/qcoreapplication.h> #include <QtCore/qthread.h> -#include <QtCore/private/qsimd_p.h> // for qYieldCpu() +#include <QtCore/qvarlengtharray.h> #include <private/qthreadpool_p.h> - -#ifdef interface -# undef interface -#endif +#include <private/qobject_p.h> // GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious // reason @@ -29,6 +27,7 @@ namespace { class ThreadPoolThreadReleaser { QThreadPool *m_pool; public: + Q_NODISCARD_CTOR explicit ThreadPoolThreadReleaser(QThreadPool *pool) : m_pool(pool) { if (pool) pool->releaseThread(); } @@ -41,6 +40,63 @@ const auto suspendingOrSuspended = } // unnamed namespace +class QObjectContinuationWrapper : public QObject +{ + Q_OBJECT +public: + explicit QObjectContinuationWrapper(QObject *parent = nullptr) + : QObject(parent) + { + } + +signals: + void run(); +}; + +void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj, + QFutureInterfaceBase &fi) +{ + Q_ASSERT(context); + Q_ASSERT(slotObj); + + auto slot = SlotObjUniquePtr(slotObj); + + auto *watcher = new QObjectContinuationWrapper; + watcher->moveToThread(context->thread()); + + // We need to protect acccess to the watcher. The context object (and in turn, the watcher) + // could be destroyed while the continuation that emits the signal is running. We have to + // prevent that. + // The mutex has to be recursive, because the continuation itself could delete the context + // object (and thus the watcher), which will try to lock the mutex from the same thread twice. + auto watcherMutex = std::make_shared<QRecursiveMutex>(); + const auto destroyWatcher = [watcherMutex, watcher]() mutable { + QMutexLocker lock(watcherMutex.get()); + delete watcher; + }; + + // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`... + QObject::connect(watcher, &QObjectContinuationWrapper::run, + // for the following, cf. QMetaObject::invokeMethodImpl(): + // we know `slot` is a lambda returning `void`, so we can just + // `call()` with `obj` and `args[0]` set to `nullptr`: + context, [slot = std::move(slot)] { + void *args[] = { nullptr }; // for `void` return value + slot->call(nullptr, args); + }); + QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, &QObject::deleteLater); + QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher); + + fi.setContinuation([watcherMutex, watcher = QPointer(watcher)] + (const QFutureInterfaceBase &parentData) + { + Q_UNUSED(parentData); + QMutexLocker lock(watcherMutex.get()); + if (watcher) + emit watcher->run(); + }); +} + QFutureCallOutInterface::~QFutureCallOutInterface() = default; @@ -105,6 +161,13 @@ void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode) break; } + // Cancel the continuations chain + QFutureInterfaceBasePrivate *next = d->continuationData; + while (next) { + next->continuationState = QFutureInterfaceBasePrivate::Canceled; + next = next->continuationData; + } + d->waitCondition.wakeAll(); d->pausedWaitCondition.wakeAll(); @@ -611,7 +674,6 @@ void QFutureInterfaceBase::reset() { d->m_progressValue = 0; d->m_progress.reset(); - d->setState(QFutureInterfaceBase::NoState); d->progressTime.invalidate(); d->isValid = false; } @@ -719,7 +781,7 @@ void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOut if (outputConnections.isEmpty()) return; - for (int i = 0; i < outputConnections.count(); ++i) + for (int i = 0; i < outputConnections.size(); ++i) outputConnections.at(i)->postCallOutEvent(callOutEvent); } @@ -729,38 +791,37 @@ void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOu if (outputConnections.isEmpty()) return; - for (int i = 0; i < outputConnections.count(); ++i) { - QFutureCallOutInterface *interface = outputConnections.at(i); - interface->postCallOutEvent(callOutEvent1); - interface->postCallOutEvent(callOutEvent2); + for (int i = 0; i < outputConnections.size(); ++i) { + QFutureCallOutInterface *iface = outputConnections.at(i); + iface->postCallOutEvent(callOutEvent1); + iface->postCallOutEvent(callOutEvent2); } } // This function connects an output interface (for example a QFutureWatcher) // to this future. While holding the lock we check the state and ready results -// and add the appropriate callouts to the queue. In order to avoid deadlocks, -// the actual callouts are made at the end while not holding the lock. -void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface) +// and add the appropriate callouts to the queue. +void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface) { QMutexLocker locker(&m_mutex); const auto currentState = state.loadRelaxed(); if (currentState & QFutureInterfaceBase::Started) { - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started)); if (m_progress) { - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, - m_progress->minimum, - m_progress->maximum)); - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, - m_progressValue, - m_progress->text)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, + m_progress->minimum, + m_progress->maximum)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, + m_progressValue, + m_progress->text)); } else { - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, - 0, - 0)); - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, - m_progressValue, - QString())); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange, + 0, + 0)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress, + m_progressValue, + QString())); } } @@ -769,36 +830,36 @@ void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface while (it != data.m_results.end()) { const int begin = it.resultIndex(); const int end = begin + it.batchSize(); - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, - begin, - end)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady, + begin, + end)); it.batchedAdvance(); } } if (currentState & QFutureInterfaceBase::Suspended) - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended)); else if (currentState & QFutureInterfaceBase::Suspending) - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending)); if (currentState & QFutureInterfaceBase::Canceled) - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); if (currentState & QFutureInterfaceBase::Finished) - interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); + iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); - outputConnections.append(interface); + outputConnections.append(iface); } -void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface) +void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface) { QMutexLocker lock(&m_mutex); - const int index = outputConnections.indexOf(interface); + const qsizetype index = outputConnections.indexOf(iface); if (index == -1) return; outputConnections.removeAt(index); - interface->callOutInterfaceDisconnected(); + iface->callOutInterfaceDisconnected(); } void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState) @@ -816,45 +877,60 @@ void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInter { QMutexLocker lock(&d->continuationMutex); - if (continuationFutureData) - continuationFutureData->parentData = d; - // If the state is ready, run continuation immediately, // otherwise save it for later. if (isFinished()) { lock.unlock(); func(*this); - } else { + lock.relock(); + } + // Unless the continuation has been cleaned earlier, we have to + // store the move-only continuation, to guarantee that the associated + // future's data stays alive. + if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) { + if (d->continuation) { + qWarning() << "Adding a continuation to a future which already has a continuation. " + "The existing continuation is overwritten."; + } d->continuation = std::move(func); + d->continuationData = continuationFutureData; } } +void QFutureInterfaceBase::cleanContinuation() +{ + if (!d) + return; + + QMutexLocker lock(&d->continuationMutex); + d->continuation = nullptr; + d->continuationState = QFutureInterfaceBasePrivate::Cleaned; + d->continuationData = nullptr; +} + void QFutureInterfaceBase::runContinuation() const { QMutexLocker lock(&d->continuationMutex); if (d->continuation) { - auto fn = std::exchange(d->continuation, nullptr); + // Save the continuation in a local function, to avoid calling + // a null std::function below, in case cleanContinuation() is + // called from some other thread right after unlock() below. + auto fn = std::move(d->continuation); lock.unlock(); fn(*this); + + lock.relock(); + // Unless the continuation has been cleaned earlier, we have to + // store the move-only continuation, to guarantee that the associated + // future's data stays alive. + if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) + d->continuation = std::move(fn); } } bool QFutureInterfaceBase::isChainCanceled() const { - if (isCanceled()) - return true; - - auto parent = d->parentData; - while (parent) { - // If the future is in Canceled state because it had an exception, we want to - // continue checking the chain of parents for cancellation, otherwise if the exception - // is handled inside the chain, it won't be interrupted even though cancellation has - // been requested. - if ((parent->state.loadRelaxed() & Canceled) && !parent->hasException) - return true; - parent = parent->parentData; - } - return false; + return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled; } void QFutureInterfaceBase::setLaunchAsync(bool value) @@ -867,4 +943,19 @@ bool QFutureInterfaceBase::launchAsync() const return d->launchAsync; } +namespace QtFuture { + +QFuture<void> makeReadyVoidFuture() +{ + QFutureInterface<void> promise; + promise.reportStarted(); + promise.reportFinished(); + + return promise.future(); +} + +} // namespace QtFuture + QT_END_NAMESPACE + +#include "qfutureinterface.moc" |