summaryrefslogtreecommitdiffstats
path: root/src/corelib/thread/qfutureinterface.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/corelib/thread/qfutureinterface.cpp')
-rw-r--r--src/corelib/thread/qfutureinterface.cpp205
1 files changed, 148 insertions, 57 deletions
diff --git a/src/corelib/thread/qfutureinterface.cpp b/src/corelib/thread/qfutureinterface.cpp
index a82b7af873..f83306af00 100644
--- a/src/corelib/thread/qfutureinterface.cpp
+++ b/src/corelib/thread/qfutureinterface.cpp
@@ -6,13 +6,11 @@
#include "qfutureinterface_p.h"
#include <QtCore/qatomic.h>
+#include <QtCore/qcoreapplication.h>
#include <QtCore/qthread.h>
-#include <QtCore/private/qsimd_p.h> // for qYieldCpu()
+#include <QtCore/qvarlengtharray.h>
#include <private/qthreadpool_p.h>
-
-#ifdef interface
-# undef interface
-#endif
+#include <private/qobject_p.h>
// GCC 12 gets confused about QFutureInterfaceBase::state, for some non-obvious
// reason
@@ -29,6 +27,7 @@ namespace {
class ThreadPoolThreadReleaser {
QThreadPool *m_pool;
public:
+ Q_NODISCARD_CTOR
explicit ThreadPoolThreadReleaser(QThreadPool *pool)
: m_pool(pool)
{ if (pool) pool->releaseThread(); }
@@ -41,6 +40,63 @@ const auto suspendingOrSuspended =
} // unnamed namespace
+class QObjectContinuationWrapper : public QObject
+{
+ Q_OBJECT
+public:
+ explicit QObjectContinuationWrapper(QObject *parent = nullptr)
+ : QObject(parent)
+ {
+ }
+
+signals:
+ void run();
+};
+
+void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
+ QFutureInterfaceBase &fi)
+{
+ Q_ASSERT(context);
+ Q_ASSERT(slotObj);
+
+ auto slot = SlotObjUniquePtr(slotObj);
+
+ auto *watcher = new QObjectContinuationWrapper;
+ watcher->moveToThread(context->thread());
+
+ // We need to protect acccess to the watcher. The context object (and in turn, the watcher)
+ // could be destroyed while the continuation that emits the signal is running. We have to
+ // prevent that.
+ // The mutex has to be recursive, because the continuation itself could delete the context
+ // object (and thus the watcher), which will try to lock the mutex from the same thread twice.
+ auto watcherMutex = std::make_shared<QRecursiveMutex>();
+ const auto destroyWatcher = [watcherMutex, watcher]() mutable {
+ QMutexLocker lock(watcherMutex.get());
+ delete watcher;
+ };
+
+ // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
+ QObject::connect(watcher, &QObjectContinuationWrapper::run,
+ // for the following, cf. QMetaObject::invokeMethodImpl():
+ // we know `slot` is a lambda returning `void`, so we can just
+ // `call()` with `obj` and `args[0]` set to `nullptr`:
+ context, [slot = std::move(slot)] {
+ void *args[] = { nullptr }; // for `void` return value
+ slot->call(nullptr, args);
+ });
+ QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, &QObject::deleteLater);
+ QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
+
+ fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
+ (const QFutureInterfaceBase &parentData)
+ {
+ Q_UNUSED(parentData);
+ QMutexLocker lock(watcherMutex.get());
+ if (watcher)
+ emit watcher->run();
+ });
+}
+
QFutureCallOutInterface::~QFutureCallOutInterface()
= default;
@@ -105,6 +161,13 @@ void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
break;
}
+ // Cancel the continuations chain
+ QFutureInterfaceBasePrivate *next = d->continuationData;
+ while (next) {
+ next->continuationState = QFutureInterfaceBasePrivate::Canceled;
+ next = next->continuationData;
+ }
+
d->waitCondition.wakeAll();
d->pausedWaitCondition.wakeAll();
@@ -611,7 +674,6 @@ void QFutureInterfaceBase::reset()
{
d->m_progressValue = 0;
d->m_progress.reset();
- d->setState(QFutureInterfaceBase::NoState);
d->progressTime.invalidate();
d->isValid = false;
}
@@ -719,7 +781,7 @@ void QFutureInterfaceBasePrivate::sendCallOut(const QFutureCallOutEvent &callOut
if (outputConnections.isEmpty())
return;
- for (int i = 0; i < outputConnections.count(); ++i)
+ for (int i = 0; i < outputConnections.size(); ++i)
outputConnections.at(i)->postCallOutEvent(callOutEvent);
}
@@ -729,38 +791,37 @@ void QFutureInterfaceBasePrivate::sendCallOuts(const QFutureCallOutEvent &callOu
if (outputConnections.isEmpty())
return;
- for (int i = 0; i < outputConnections.count(); ++i) {
- QFutureCallOutInterface *interface = outputConnections.at(i);
- interface->postCallOutEvent(callOutEvent1);
- interface->postCallOutEvent(callOutEvent2);
+ for (int i = 0; i < outputConnections.size(); ++i) {
+ QFutureCallOutInterface *iface = outputConnections.at(i);
+ iface->postCallOutEvent(callOutEvent1);
+ iface->postCallOutEvent(callOutEvent2);
}
}
// This function connects an output interface (for example a QFutureWatcher)
// to this future. While holding the lock we check the state and ready results
-// and add the appropriate callouts to the queue. In order to avoid deadlocks,
-// the actual callouts are made at the end while not holding the lock.
-void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *interface)
+// and add the appropriate callouts to the queue.
+void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface *iface)
{
QMutexLocker locker(&m_mutex);
const auto currentState = state.loadRelaxed();
if (currentState & QFutureInterfaceBase::Started) {
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Started));
if (m_progress) {
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
- m_progress->minimum,
- m_progress->maximum));
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
- m_progressValue,
- m_progress->text));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
+ m_progress->minimum,
+ m_progress->maximum));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
+ m_progressValue,
+ m_progress->text));
} else {
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
- 0,
- 0));
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
- m_progressValue,
- QString()));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ProgressRange,
+ 0,
+ 0));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Progress,
+ m_progressValue,
+ QString()));
}
}
@@ -769,36 +830,36 @@ void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface
while (it != data.m_results.end()) {
const int begin = it.resultIndex();
const int end = begin + it.batchSize();
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
- begin,
- end));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::ResultsReady,
+ begin,
+ end));
it.batchedAdvance();
}
}
if (currentState & QFutureInterfaceBase::Suspended)
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
else if (currentState & QFutureInterfaceBase::Suspending)
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspending));
if (currentState & QFutureInterfaceBase::Canceled)
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
if (currentState & QFutureInterfaceBase::Finished)
- interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
+ iface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
- outputConnections.append(interface);
+ outputConnections.append(iface);
}
-void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *interface)
+void QFutureInterfaceBasePrivate::disconnectOutputInterface(QFutureCallOutInterface *iface)
{
QMutexLocker lock(&m_mutex);
- const int index = outputConnections.indexOf(interface);
+ const qsizetype index = outputConnections.indexOf(iface);
if (index == -1)
return;
outputConnections.removeAt(index);
- interface->callOutInterfaceDisconnected();
+ iface->callOutInterfaceDisconnected();
}
void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
@@ -816,45 +877,60 @@ void QFutureInterfaceBase::setContinuation(std::function<void(const QFutureInter
{
QMutexLocker lock(&d->continuationMutex);
- if (continuationFutureData)
- continuationFutureData->parentData = d;
-
// If the state is ready, run continuation immediately,
// otherwise save it for later.
if (isFinished()) {
lock.unlock();
func(*this);
- } else {
+ lock.relock();
+ }
+ // Unless the continuation has been cleaned earlier, we have to
+ // store the move-only continuation, to guarantee that the associated
+ // future's data stays alive.
+ if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned) {
+ if (d->continuation) {
+ qWarning() << "Adding a continuation to a future which already has a continuation. "
+ "The existing continuation is overwritten.";
+ }
d->continuation = std::move(func);
+ d->continuationData = continuationFutureData;
}
}
+void QFutureInterfaceBase::cleanContinuation()
+{
+ if (!d)
+ return;
+
+ QMutexLocker lock(&d->continuationMutex);
+ d->continuation = nullptr;
+ d->continuationState = QFutureInterfaceBasePrivate::Cleaned;
+ d->continuationData = nullptr;
+}
+
void QFutureInterfaceBase::runContinuation() const
{
QMutexLocker lock(&d->continuationMutex);
if (d->continuation) {
- auto fn = std::exchange(d->continuation, nullptr);
+ // Save the continuation in a local function, to avoid calling
+ // a null std::function below, in case cleanContinuation() is
+ // called from some other thread right after unlock() below.
+ auto fn = std::move(d->continuation);
lock.unlock();
fn(*this);
+
+ lock.relock();
+ // Unless the continuation has been cleaned earlier, we have to
+ // store the move-only continuation, to guarantee that the associated
+ // future's data stays alive.
+ if (d->continuationState != QFutureInterfaceBasePrivate::Cleaned)
+ d->continuation = std::move(fn);
}
}
bool QFutureInterfaceBase::isChainCanceled() const
{
- if (isCanceled())
- return true;
-
- auto parent = d->parentData;
- while (parent) {
- // If the future is in Canceled state because it had an exception, we want to
- // continue checking the chain of parents for cancellation, otherwise if the exception
- // is handled inside the chain, it won't be interrupted even though cancellation has
- // been requested.
- if ((parent->state.loadRelaxed() & Canceled) && !parent->hasException)
- return true;
- parent = parent->parentData;
- }
- return false;
+ return isCanceled() || d->continuationState == QFutureInterfaceBasePrivate::Canceled;
}
void QFutureInterfaceBase::setLaunchAsync(bool value)
@@ -867,4 +943,19 @@ bool QFutureInterfaceBase::launchAsync() const
return d->launchAsync;
}
+namespace QtFuture {
+
+QFuture<void> makeReadyVoidFuture()
+{
+ QFutureInterface<void> promise;
+ promise.reportStarted();
+ promise.reportFinished();
+
+ return promise.future();
+}
+
+} // namespace QtFuture
+
QT_END_NAMESPACE
+
+#include "qfutureinterface.moc"