From dfaca09e85a49d2983bb89893bfbe1ba4c19eab4 Mon Sep 17 00:00:00 2001 From: Sona Kurazyan Date: Thu, 27 Feb 2020 17:30:07 +0100 Subject: Add support for attaching continuations to QFuture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added QFuture::then() methods to allow chaining multiple asynchronous computations. Continuations can use the following execution policies: * QtFuture::Launch::Sync - the continuation will be launched in the same thread in which the parent has been executing. * QtFuture::Launch::Async - the continuation will be launched in a new thread. * QtFuture::Launch::Inherit - the continuation will inherit the launch policy of the parent, or its thread pool (if it was using a custom one). * Additionally then() also accepts a custom QThreadPool* instance. Note, that if the parent future gets canceled, its continuation(s) will be also canceled. If the parent throws an exception, it will be propagated to the continuation's future, unless it is caught inside the continuation (if it has a QFuture arg). Some example usages: QFuture future = ...; future.then([](int res1){ ... }).then([](int res2){ ... })... QFuture future = ...; future.then([](QFuture fut1){ /* do something with fut1 */ })... In the examples above all continuations will run in the same thread as future. QFuture future = ...; future.then(QtFuture::Launch::Async, [](int res1){ ... }) .then([](int res2){ ... }).. In this example the continuations will run in a new thread (but on the same one). QThreadPool pool; QFuture future = ...; future.then(&pool, [](int res1){ ... }) .then([](int res2){ ... }).. In this example the continuations will run in the given thread pool. [ChangeLog][QtCore] Added support for attaching continuations to QFuture. Task-number: QTBUG-81587 Change-Id: I5b2e176694f7ae8ce00404aca725e9a170818955 Reviewed-by: Leena Miettinen Reviewed-by: Timur Pocheptsov Reviewed-by: MÃ¥rten Nordheim --- src/corelib/.prev_CMakeLists.txt | 1 + src/corelib/CMakeLists.txt | 1 + src/corelib/thread/qfuture.h | 86 +++- src/corelib/thread/qfuture.qdoc | 141 ++++++ src/corelib/thread/qfuture_impl.h | 339 +++++++++++++++ src/corelib/thread/qfutureinterface.cpp | 50 ++- src/corelib/thread/qfutureinterface.h | 29 +- src/corelib/thread/qfutureinterface_p.h | 6 + src/corelib/thread/thread.pri | 1 + tests/auto/corelib/thread/qfuture/tst_qfuture.cpp | 504 ++++++++++++++++++++++ 10 files changed, 1151 insertions(+), 7 deletions(-) create mode 100644 src/corelib/thread/qfuture_impl.h diff --git a/src/corelib/.prev_CMakeLists.txt b/src/corelib/.prev_CMakeLists.txt index 51885497bf..a9fb15f505 100644 --- a/src/corelib/.prev_CMakeLists.txt +++ b/src/corelib/.prev_CMakeLists.txt @@ -574,6 +574,7 @@ qt_extend_target(Core CONDITION QT_FEATURE_future SOURCES thread/qexception.cpp thread/qexception.h thread/qfuture.h + thread/qfuture_impl.h thread/qfutureinterface.cpp thread/qfutureinterface.h thread/qfutureinterface_p.h thread/qfuturesynchronizer.h thread/qfuturewatcher.cpp thread/qfuturewatcher.h thread/qfuturewatcher_p.h diff --git a/src/corelib/CMakeLists.txt b/src/corelib/CMakeLists.txt index b62ce89f4d..49bc440ea9 100644 --- a/src/corelib/CMakeLists.txt +++ b/src/corelib/CMakeLists.txt @@ -677,6 +677,7 @@ qt_extend_target(Core CONDITION QT_FEATURE_future SOURCES thread/qexception.cpp thread/qexception.h thread/qfuture.h + thread/qfuture_impl.h thread/qfutureinterface.cpp thread/qfutureinterface.h thread/qfutureinterface_p.h thread/qfuturesynchronizer.h thread/qfuturewatcher.cpp thread/qfuturewatcher.h thread/qfuturewatcher_p.h diff --git a/src/corelib/thread/qfuture.h b/src/corelib/thread/qfuture.h index d3135510b3..e103cfb552 100644 --- a/src/corelib/thread/qfuture.h +++ b/src/corelib/thread/qfuture.h @@ -45,11 +45,12 @@ #include #include +#include + QT_REQUIRE_CONFIG(future); QT_BEGIN_NAMESPACE - template class QFutureWatcher; template <> @@ -101,6 +102,18 @@ public: operator T() const { return result(); } QList results() const { return d.results(); } + template + using ResultType = typename QtPrivate::ResultTypeHelper::ResultType; + + template + QFuture> then(Function &&function); + + template + QFuture> then(QtFuture::Launch policy, Function &&function); + + template + QFuture> then(QThreadPool *pool, Function &&function); + class const_iterator { public: @@ -199,6 +212,7 @@ private: friend class QFutureWatcher; public: // Warning: the d pointer is not documented and is considered private. + // TODO: make this private mutable QFutureInterface d; }; @@ -222,6 +236,35 @@ inline QFuture QFutureInterface::future() return QFuture(this); } +template +template +QFuture::template ResultType> QFuture::then(Function &&function) +{ + return then(QtFuture::Launch::Sync, std::forward(function)); +} + +template +template +QFuture::template ResultType> +QFuture::then(QtFuture::Launch policy, Function &&function) +{ + QFutureInterface> promise(QFutureInterfaceBase::State::Pending); + QtPrivate::Continuation, T>::create( + std::forward(function), this, promise, policy); + return promise.future(); +} + +template +template +QFuture::template ResultType> QFuture::then(QThreadPool *pool, + Function &&function) +{ + QFutureInterface> promise(QFutureInterfaceBase::State::Pending); + QtPrivate::Continuation, T>::create( + std::forward(function), this, promise, pool); + return promise.future(); +} + Q_DECLARE_SEQUENTIAL_ITERATOR(Future) template <> @@ -272,6 +315,18 @@ public: QString progressText() const { return d.progressText(); } void waitForFinished() { d.waitForFinished(); } + template + using ResultType = typename QtPrivate::ResultTypeHelper::ResultType; + + template + QFuture> then(Function &&function); + + template + QFuture> then(QtFuture::Launch policy, Function &&function); + + template + QFuture> then(QThreadPool *pool, Function &&function); + private: friend class QFutureWatcher; @@ -279,6 +334,9 @@ private: public: #endif mutable QFutureInterfaceBase d; + + template + friend class QtPrivate::Continuation; }; inline QFuture QFutureInterface::future() @@ -292,6 +350,32 @@ QFuture qToVoidFuture(const QFuture &future) return QFuture(future.d); } +template +QFuture::ResultType> QFuture::then(Function &&function) +{ + return then(QtFuture::Launch::Sync, std::forward(function)); +} + +template +QFuture::ResultType> QFuture::then(QtFuture::Launch policy, + Function &&function) +{ + QFutureInterface> promise(QFutureInterfaceBase::State::Pending); + QtPrivate::Continuation, void>::create( + std::forward(function), this, promise, policy); + return promise.future(); +} + +template +QFuture::ResultType> QFuture::then(QThreadPool *pool, + Function &&function) +{ + QFutureInterface> promise(QFutureInterfaceBase::State::Pending); + QtPrivate::Continuation, void>::create( + std::forward(function), this, promise, pool); + return promise.future(); +} + QT_END_NAMESPACE #endif // QFUTURE_H diff --git a/src/corelib/thread/qfuture.qdoc b/src/corelib/thread/qfuture.qdoc index 076725e19c..989ffa36c8 100644 --- a/src/corelib/thread/qfuture.qdoc +++ b/src/corelib/thread/qfuture.qdoc @@ -682,3 +682,144 @@ \sa findNext() */ + +/*! + \namespace QtFuture + + \inmodule QtCore + \brief Contains miscellaneous identifiers used by the QFuture class. +*/ + + +/*! + \enum QtFuture::Launch + + \since 6.0 + + Represents execution policies for running a QFuture continuation. + + \value Sync The continuation will be launched in the same thread in + which the parent has been executing. + + \value Async The continuation will be launched in in a separate thread taken from + the global QThreadPool. + + \value Inherit The continuation will inherit the launch policy of the parent or its + thread pool, if it was using a custom one. + + \sa QFuture::then(), QThreadPool::globalInstance() + +*/ + +/*! \fn template template QFuture::ResultType> QFuture::then(Function &&function) + + \since 6.0 + \overload + + Attaches a continuation to this future, allowing to chain multiple asynchronous + computations if desired. When the asynchronous computation represented by this + future finishes, \a function will be invoked in the same thread in which this + future has been running. A new QFuture representing the result of the continuation + is returned. + + \note Use other overloads of this method if you need to launch the continuation in + a separate thread. + + If this future has a result (is not a QFuture), \a function takes the result + of this future as its argument. + + You can chain multiple operations like this: + + \code + QFuture future = ...; + future.then([](int res1){ ... }).then([](int res2){ ... })... + \endcode + + Or: + \code + QFuture future = ...; + future.then([](){ ... }).then([](){ ... })... + \endcode + + The continuation can also take a QFuture argument (instead of its value), representing + the previous future. This can be useful if, for example, QFuture has multiple results, + and the user wants to access them inside the continuation. Or the user needs to handle + the exception of the previous future inside the continuation, to not interrupt the chain + of multiple continuations. For example: + + \code + QFuture future = ...; + future.then([](QFuture f) { + try { + ... + auto result = f.result(); + ... + } catch (QException &e) { + // handle the exception + } + }).then(...); + \endcode + + If the previous future throws an exception and it is not handled inside the + continuation, the exception will be propagated to the continuation future, to + allow the caller to handle it: + + \code + QFuture parentFuture = ...; + auto continuation = parentFuture.then([](int res1){ ... }).then([](int res2){ ... })... + ... + // parentFuture throws an exception + try { + auto result = continuation.result(); + } catch (QException &e) { + // handle the exception + } + \endcode + + In this case the whole chain of continuations will be interrupted. + + \note If the parent future gets canceled, its continuations will + also be canceled. +*/ + +/*! \fn template template QFuture::ResultType> QFuture::then(QtFuture::Launch policy, Function &&function) + + \since 6.0 + \overload + + Attaches a continuation to this future, allowing to chain multiple asynchronous + computations. When the asynchronous computation represented by this future + finishes, \a function will be invoked according to the given launch \a policy. + A new QFuture representing the result of the continuation is returned. + + Depending on the \a policy, continuation will run in the same thread as the parent, + run in a new thread, or inherit the launch policy and thread pool of the parent. + + In the following example both continuations will run in a new thread (but in + the same one). + + \code + QFuture future = ...; + future.then(QtFuture::Launch::Async, [](int res){ ... }).then([](int res2){ ... }); + \endcode + + In the following example both continuations will run in new threads using the same + thread pool. + + \code + QFuture future = ...; + future.then(QtFuture::Launch::Async, [](int res){ ... }) + .then(QtFuture::Launch::Inherit, [](int res2){ ... }); + \endcode +*/ + +/*! \fn template template QFuture::ResultType> QFuture::then(QThreadPool *pool, Function &&function) + + \since 6.0 + \overload + + Attaches a continuation to this future, allowing to chain multiple asynchronous + computations if desired. When the asynchronous computation represented by this + future finishes, \a function will be invoked in a separate thread taken from the + QThreadPool \a pool. +*/ diff --git a/src/corelib/thread/qfuture_impl.h b/src/corelib/thread/qfuture_impl.h new file mode 100644 index 0000000000..eed7828c35 --- /dev/null +++ b/src/corelib/thread/qfuture_impl.h @@ -0,0 +1,339 @@ +/**************************************************************************** +** +** Copyright (C) 2020 The Qt Company Ltd. +** Contact: https://www.qt.io/licensing/ +** +** This file is part of the QtCore module of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:LGPL$ +** Commercial License Usage +** Licensees holding valid commercial Qt licenses may use this file in +** accordance with the commercial license agreement provided with the +** Software or, alternatively, in accordance with the terms contained in +** a written agreement between you and The Qt Company. For licensing terms +** and conditions see https://www.qt.io/terms-conditions. For further +** information use the contact form at https://www.qt.io/contact-us. +** +** GNU Lesser General Public License Usage +** Alternatively, this file may be used under the terms of the GNU Lesser +** General Public License version 3 as published by the Free Software +** Foundation and appearing in the file LICENSE.LGPL3 included in the +** packaging of this file. Please review the following information to +** ensure the GNU Lesser General Public License version 3 requirements +** will be met: https://www.gnu.org/licenses/lgpl-3.0.html. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 2.0 or (at your option) the GNU General +** Public license version 3 or any later version approved by the KDE Free +** Qt Foundation. The licenses are as published by the Free Software +** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3 +** included in the packaging of this file. Please review the following +** information to ensure the GNU General Public License requirements will +** be met: https://www.gnu.org/licenses/gpl-2.0.html and +** https://www.gnu.org/licenses/gpl-3.0.html. +** +** $QT_END_LICENSE$ +** +****************************************************************************/ + +#ifndef QFUTURE_H +#error Do not include qfuture_impl.h directly +#endif + +#if 0 +#pragma qt_sync_skip_header_check +#pragma qt_sync_stop_processing +#endif + +#include +#include +#include + +QT_BEGIN_NAMESPACE + +// +// forward declarations +// +template +class QFuture; +template +class QFutureInterface; + +namespace QtFuture { +enum class Launch { Sync, Async, Inherit }; +} + +namespace QtPrivate { + +template +struct ResultTypeHelper +{ +}; + +// The callable takes an argument of type Arg +template +struct ResultTypeHelper< + F, Arg, typename std::enable_if_t, QFuture>>> +{ + using ResultType = std::invoke_result_t, std::decay_t>; +}; + +// The callable takes an argument of type QFuture +template +struct ResultTypeHelper< + F, Arg, typename std::enable_if_t, QFuture>>> +{ + using ResultType = std::invoke_result_t, QFuture>; +}; + +// The callable takes an argument of type QFuture +template +struct ResultTypeHelper< + F, void, typename std::enable_if_t, QFuture>>> +{ + using ResultType = std::invoke_result_t, QFuture>; +}; + +// The callable doesn't take argument +template +struct ResultTypeHelper< + F, void, typename std::enable_if_t, QFuture>>> +{ + using ResultType = std::invoke_result_t>; +}; + +template +class Continuation +{ +public: + Continuation(Function &&func, const QFuture &f, + const QFutureInterface &p) + : promise(p), parentFuture(f), function(std::forward(func)) + { + } + virtual ~Continuation() = default; + + bool execute(); + + static void create(Function &&func, QFuture *f, + QFutureInterface &p, QtFuture::Launch policy); + + static void create(Function &&func, QFuture *f, + QFutureInterface &p, QThreadPool *pool); + +protected: + virtual void runImpl() = 0; + + void runFunction(); + +protected: + QFutureInterface promise; + const QFuture parentFuture; + Function function; +}; + +template +class SyncContinuation final : public Continuation +{ +public: + SyncContinuation(Function &&func, const QFuture &f, + const QFutureInterface &p) + : Continuation(std::forward(func), f, p) + { + } + + ~SyncContinuation() override = default; + +private: + void runImpl() override { this->runFunction(); } +}; + +template +class AsyncContinuation final : public QRunnable, + public Continuation +{ +public: + AsyncContinuation(Function &&func, const QFuture &f, + const QFutureInterface &p, QThreadPool *pool = nullptr) + : Continuation(std::forward(func), f, p), + threadPool(pool) + { + this->promise.setRunnable(this); + } + + ~AsyncContinuation() override = default; + +private: + void runImpl() override // from Continuation + { + QThreadPool *pool = threadPool ? threadPool : QThreadPool::globalInstance(); + pool->start(this); + } + + void run() override // from QRunnable + { + this->runFunction(); + } + +private: + QThreadPool *threadPool; +}; + +template +void Continuation::runFunction() +{ + promise.reportStarted(); + + Q_ASSERT(parentFuture.isFinished()); + +#ifndef QT_NO_EXCEPTIONS + try { +#endif + if constexpr (!std::is_void_v) { + if constexpr (std::is_invocable_v, QFuture>) { + promise.reportResult(function(parentFuture)); + } else if constexpr (std::is_void_v) { + promise.reportResult(function()); + } else { + // This assert normally should never fail, this is to make sure + // that nothing unexpected happend. + static_assert( + std::is_invocable_v, std::decay_t>, + "The continuation is not invocable with the provided arguments"); + + promise.reportResult(function(parentFuture.result())); + } + } else { + if constexpr (std::is_invocable_v, QFuture>) { + function(parentFuture); + } else if constexpr (std::is_void_v) { + function(); + } else { + // This assert normally should never fail, this is to make sure + // that nothing unexpected happend. + static_assert( + std::is_invocable_v, std::decay_t>, + "The continuation is not invocable with the provided arguments"); + + function(parentFuture.result()); + } + } +#ifndef QT_NO_EXCEPTIONS + } catch (QException &e) { + promise.reportException(e); + } catch (...) { + promise.reportException(QUnhandledException()); + } +#endif + promise.reportFinished(); +} + +template +bool Continuation::execute() +{ + Q_ASSERT(parentFuture.isFinished()); + + if (parentFuture.isCanceled()) { +#ifndef QT_NO_EXCEPTIONS + if (parentFuture.d.exceptionStore().hasException()) { + // If the continuation doesn't take a QFuture argument, propagate the exception + // to the caller, by reporting it. If the continuation takes a QFuture argument, + // the user may want to catch the exception inside the continuation, to not + // interrupt the continuation chain, so don't report anything yet. + if constexpr (!std::is_invocable_v, QFuture>) { + promise.reportStarted(); + const QException *e = parentFuture.d.exceptionStore().exception().exception(); + promise.reportException(*e); + promise.reportFinished(); + return false; + } + } else +#endif + { + promise.reportStarted(); + promise.reportCanceled(); + promise.reportFinished(); + return false; + } + } + + runImpl(); + return true; +} + +template +void Continuation::create(Function &&func, + QFuture *f, + QFutureInterface &p, + QtFuture::Launch policy) +{ + Q_ASSERT(f); + + QThreadPool *pool = nullptr; + + bool launchAsync = (policy == QtFuture::Launch::Async); + if (policy == QtFuture::Launch::Inherit) { + launchAsync = f->d.launchAsync(); + + // If the parent future was using a custom thread pool, inherit it as well. + if (launchAsync && f->d.threadPool()) { + pool = f->d.threadPool(); + p.setThreadPool(pool); + } + } + + Continuation *continuationJob = nullptr; + if (launchAsync) { + continuationJob = new AsyncContinuation( + std::forward(func), *f, p, pool); + } else { + continuationJob = new SyncContinuation( + std::forward(func), *f, p); + } + + p.setLaunchAsync(launchAsync); + + auto continuation = [continuationJob, policy, launchAsync]() mutable { + bool isLaunched = continuationJob->execute(); + // If continuation is successfully launched, AsyncContinuation will be deleted + // by the QThreadPool which has started it. Synchronous continuation will be + // executed immediately, so it's safe to always delete it here. + if (!(launchAsync && isLaunched)) { + delete continuationJob; + continuationJob = nullptr; + } + }; + + f->d.setContinuation(std::move(continuation)); +} + +template +void Continuation::create(Function &&func, + QFuture *f, + QFutureInterface &p, + QThreadPool *pool) +{ + Q_ASSERT(f); + + auto continuationJob = new AsyncContinuation( + std::forward(func), *f, p, pool); + p.setLaunchAsync(true); + p.setThreadPool(pool); + + auto continuation = [continuationJob]() mutable { + bool isLaunched = continuationJob->execute(); + // If continuation is successfully launched, AsyncContinuation will be deleted + // by the QThreadPool which has started it. + if (!isLaunched) { + delete continuationJob; + continuationJob = nullptr; + } + }; + + f->d.setContinuation(continuation); +} + +} // namespace QtPrivate + +QT_END_NAMESPACE diff --git a/src/corelib/thread/qfutureinterface.cpp b/src/corelib/thread/qfutureinterface.cpp index e6380a8732..074e28d8df 100644 --- a/src/corelib/thread/qfutureinterface.cpp +++ b/src/corelib/thread/qfutureinterface.cpp @@ -191,6 +191,11 @@ bool QFutureInterfaceBase::isResultReadyAt(int index) const return d->internal_isResultReadyAt(index); } +bool QFutureInterfaceBase::isRunningOrPending() const +{ + return queryState(static_cast(Running | Pending)); +} + bool QFutureInterfaceBase::waitForNextResult() { QMutexLocker lock(&d->m_mutex); @@ -315,7 +320,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex) d->m_exceptionStore.throwPossibleException(); QMutexLocker lock(&d->m_mutex); - if (!isRunning()) + if (!isRunningOrPending()) return; lock.unlock(); @@ -326,7 +331,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex) lock.relock(); const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex; - while (isRunning() && !d->internal_isResultReadyAt(waitIndex)) + while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex)) d->waitCondition.wait(&d->m_mutex); d->m_exceptionStore.throwPossibleException(); @@ -335,7 +340,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex) void QFutureInterfaceBase::waitForFinished() { QMutexLocker lock(&d->m_mutex); - const bool alreadyFinished = !isRunning(); + const bool alreadyFinished = !isRunningOrPending(); lock.unlock(); if (!alreadyFinished) { @@ -343,7 +348,7 @@ void QFutureInterfaceBase::waitForFinished() lock.relock(); - while (isRunning()) + while (isRunningOrPending()) d->waitCondition.wait(&d->m_mutex); } @@ -386,6 +391,11 @@ void QFutureInterfaceBase::setThreadPool(QThreadPool *pool) d->m_pool = pool; } +QThreadPool *QFutureInterfaceBase::threadPool() const +{ + return d->m_pool; +} + void QFutureInterfaceBase::setFilterMode(bool enable) { QMutexLocker locker(&d->m_mutex); @@ -604,4 +614,36 @@ void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState) state.storeRelaxed(newState); } +void QFutureInterfaceBase::setContinuation(std::function func) +{ + QMutexLocker lock(&d->continuationMutex); + // If the state is ready, run continuation immediately, + // otherwise save it for later. + if (isFinished()) { + lock.unlock(); + func(); + } else { + d->continuation = std::move(func); + } +} + +void QFutureInterfaceBase::runContinuation() const +{ + QMutexLocker lock(&d->continuationMutex); + if (d->continuation) { + lock.unlock(); + d->continuation(); + } +} + +void QFutureInterfaceBase::setLaunchAsync(bool value) +{ + d->launchAsync = value; +} + +bool QFutureInterfaceBase::launchAsync() const +{ + return d->launchAsync; +} + QT_END_NAMESPACE diff --git a/src/corelib/thread/qfutureinterface.h b/src/corelib/thread/qfutureinterface.h index 43dfd6bac4..c2e884911f 100644 --- a/src/corelib/thread/qfutureinterface.h +++ b/src/corelib/thread/qfutureinterface.h @@ -58,6 +58,11 @@ class QFutureInterfaceBasePrivate; class QFutureWatcherBase; class QFutureWatcherBasePrivate; +namespace QtPrivate { +template +class Continuation; +} + class Q_CORE_EXPORT QFutureInterfaceBase { public: @@ -68,7 +73,9 @@ public: Finished = 0x04, Canceled = 0x08, Paused = 0x10, - Throttled = 0x20 + Throttled = 0x20, + // Pending means that the future depends on another one, which is not finished yet + Pending = 0x40 }; QFutureInterfaceBase(State initialState = NoState); @@ -86,6 +93,7 @@ public: void setRunnable(QRunnable *runnable); void setThreadPool(QThreadPool *pool); + QThreadPool *threadPool() const; void setFilterMode(bool enable); void setProgressRange(int minimum, int maximum); int progressMinimum() const; @@ -141,6 +149,18 @@ private: private: friend class QFutureWatcherBase; friend class QFutureWatcherBasePrivate; + + template + friend class QtPrivate::Continuation; + +protected: + void setContinuation(std::function func); + void runContinuation() const; + + void setLaunchAsync(bool value); + bool launchAsync() const; + + bool isRunningOrPending() const; }; template @@ -239,6 +259,7 @@ inline void QFutureInterface::reportFinished(const T *result) if (result) reportResult(result); QFutureInterfaceBase::reportFinished(); + QFutureInterfaceBase::runContinuation(); } template @@ -292,7 +313,11 @@ public: void reportResult(const void *, int) { } void reportResults(const QVector &, int) { } - void reportFinished(const void * = nullptr) { QFutureInterfaceBase::reportFinished(); } + void reportFinished(const void * = nullptr) + { + QFutureInterfaceBase::reportFinished(); + QFutureInterfaceBase::runContinuation(); + } }; QT_END_NAMESPACE diff --git a/src/corelib/thread/qfutureinterface_p.h b/src/corelib/thread/qfutureinterface_p.h index b297dff633..306b02e269 100644 --- a/src/corelib/thread/qfutureinterface_p.h +++ b/src/corelib/thread/qfutureinterface_p.h @@ -191,6 +191,12 @@ public: void disconnectOutputInterface(QFutureCallOutInterface *iface); void setState(QFutureInterfaceBase::State state); + + // Wrapper for continuation + std::function continuation; + QBasicMutex continuationMutex; + + bool launchAsync = false; }; QT_END_NAMESPACE diff --git a/src/corelib/thread/thread.pri b/src/corelib/thread/thread.pri index 25cf68a324..e3d791fee7 100644 --- a/src/corelib/thread/thread.pri +++ b/src/corelib/thread/thread.pri @@ -66,6 +66,7 @@ qtConfig(future) { HEADERS += \ thread/qexception.h \ thread/qfuture.h \ + thread/qfuture_impl.h \ thread/qfutureinterface.h \ thread/qfutureinterface_p.h \ thread/qfuturesynchronizer.h \ diff --git a/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp b/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp index a42454124e..0e747cbd9b 100644 --- a/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp +++ b/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp @@ -94,6 +94,13 @@ private slots: void nestedExceptions(); #endif void nonGlobalThreadPool(); + + void then(); + void thenOnCanceledFuture(); +#ifndef QT_NO_EXCEPTIONS + void thenOnExceptionFuture(); + void thenThrows(); +#endif }; void tst_QFuture::resultStore() @@ -1557,5 +1564,502 @@ void tst_QFuture::nonGlobalThreadPool() } } +void tst_QFuture::then() +{ + { + struct Add + { + + static int addTwo(int arg) { return arg + 2; } + + int operator()(int arg) const { return arg + 3; } + }; + + QFutureInterface promise; + QFuture then = promise.future() + .then([](int res) { return res + 1; }) // lambda + .then(Add::addTwo) // function + .then(Add()); // functor + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + const int result = 0; + promise.reportResult(result); + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(then.result(), result + 6); + } + + // then() on a ready future + { + QFutureInterface promise; + promise.reportStarted(); + + const int result = 0; + promise.reportResult(result); + promise.reportFinished(); + + QFuture then = promise.future() + .then([](int res1) { return res1 + 1; }) + .then([](int res2) { return res2 + 2; }) + .then([](int res3) { return res3 + 3; }); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(then.result(), result + 6); + } + + // Continuation of QFuture + { + int result = 0; + QFutureInterface promise; + QFuture then = promise.future() + .then([&]() { result += 1; }) + .then([&]() { result += 2; }) + .then([&]() { result += 3; }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(result, 6); + } + + // Continuation returns QFuture + { + QFutureInterface promise; + int value; + QFuture then = + promise.future().then([](int res) { return res * 2; }).then([&](int prevResult) { + value = prevResult; + }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + const int result = 5; + promise.reportResult(result); + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(value, result * 2); + } + + // Continuations taking a QFuture argument. + { + int value = 0; + QFutureInterface promise; + QFuture then = promise.future() + .then([](QFuture f1) { return f1.result() + 1; }) + .then([&](QFuture f2) { value = f2.result() + 2; }) + .then([&](QFuture f3) { + QVERIFY(f3.isFinished()); + value += 3; + }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + const int result = 0; + promise.reportResult(result); + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(value, 6); + } + + // Continuations use a new thread + { + Qt::HANDLE threadId1 = nullptr; + Qt::HANDLE threadId2 = nullptr; + QFutureInterface promise; + QFuture then = promise.future() + .then(QtFuture::Launch::Async, + [&]() { threadId1 = QThread::currentThreadId(); }) + .then([&]() { threadId2 = QThread::currentThreadId(); }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QVERIFY(threadId1 != QThread::currentThreadId()); + QVERIFY(threadId2 != QThread::currentThreadId()); + QVERIFY(threadId1 == threadId2); + } + + // Continuation inherits the launch policy of its parent (QtFuture::Launch::Sync) + { + Qt::HANDLE threadId1 = nullptr; + Qt::HANDLE threadId2 = nullptr; + QFutureInterface promise; + QFuture then = promise.future() + .then(QtFuture::Launch::Sync, + [&]() { threadId1 = QThread::currentThreadId(); }) + .then(QtFuture::Launch::Inherit, + [&]() { threadId2 = QThread::currentThreadId(); }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QVERIFY(threadId1 == QThread::currentThreadId()); + QVERIFY(threadId2 == QThread::currentThreadId()); + QVERIFY(threadId1 == threadId2); + } + + // Continuation inherits the launch policy of its parent (QtFuture::Launch::Async) + { + Qt::HANDLE threadId1 = nullptr; + Qt::HANDLE threadId2 = nullptr; + QFutureInterface promise; + QFuture then = promise.future() + .then(QtFuture::Launch::Async, + [&]() { threadId1 = QThread::currentThreadId(); }) + .then(QtFuture::Launch::Inherit, + [&]() { threadId2 = QThread::currentThreadId(); }); + + promise.reportStarted(); + QVERIFY(!then.isStarted()); + QVERIFY(!then.isFinished()); + + promise.reportFinished(); + + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QVERIFY(threadId1 != QThread::currentThreadId()); + QVERIFY(threadId2 != QThread::currentThreadId()); + } + + // Continuations use a custom thread pool + { + QFutureInterface promise; + QThreadPool pool; + QVERIFY(pool.waitForDone(0)); // pool is not busy yet + QSemaphore semaphore; + QFuture then = promise.future().then(&pool, [&]() { semaphore.acquire(); }); + + promise.reportStarted(); + promise.reportFinished(); + + // Make sure the custom thread pool is busy on running the continuation + QVERIFY(!pool.waitForDone(0)); + semaphore.release(); + then.waitForFinished(); + + QVERIFY(then.isStarted()); + QVERIFY(then.isFinished()); + QCOMPARE(then.d.threadPool(), &pool); + } + + // Continuation inherits parent's thread pool + { + Qt::HANDLE threadId1 = nullptr; + Qt::HANDLE threadId2 = nullptr; + QFutureInterface promise; + + QThreadPool pool; + QFuture then1 = promise.future().then(&pool, [&]() { + threadId1 = QThread::currentThreadId(); + }); + + promise.reportStarted(); + promise.reportFinished(); + + then1.waitForFinished(); + QVERIFY(pool.waitForDone()); // The pool is not busy after the first continuation is done + + QSemaphore semaphore; + QFuture then2 = then1.then(QtFuture::Launch::Inherit, [&]() { + semaphore.acquire(); + threadId2 = QThread::currentThreadId(); + }); + + QVERIFY(!pool.waitForDone(0)); // The pool is busy running the 2nd continuation + + semaphore.release(); + then2.waitForFinished(); + + QVERIFY(then2.isStarted()); + QVERIFY(then2.isFinished()); + QCOMPARE(then1.d.threadPool(), then2.d.threadPool()); + QCOMPARE(then2.d.threadPool(), &pool); + QVERIFY(threadId1 != QThread::currentThreadId()); + QVERIFY(threadId2 != QThread::currentThreadId()); + } +} + +void tst_QFuture::thenOnCanceledFuture() +{ + // Continuations on a canceled future + { + QFutureInterface promise; + promise.reportStarted(); + promise.reportCanceled(); + promise.reportFinished(); + + int thenResult = 0; + QFuture then = + promise.future().then([&]() { ++thenResult; }).then([&]() { ++thenResult; }); + + QVERIFY(then.isCanceled()); + QCOMPARE(thenResult, 0); + } + + // QFuture gets canceled after continuations are set + { + QFutureInterface promise; + + int thenResult = 0; + QFuture then = + promise.future().then([&]() { ++thenResult; }).then([&]() { ++thenResult; }); + + promise.reportStarted(); + promise.reportCanceled(); + promise.reportFinished(); + + QVERIFY(then.isCanceled()); + QCOMPARE(thenResult, 0); + } + + // Same with QtFuture::Launch::Async + + // Continuations on a canceled future + { + QFutureInterface promise; + promise.reportStarted(); + promise.reportCanceled(); + promise.reportFinished(); + + int thenResult = 0; + QFuture then = + promise.future().then(QtFuture::Launch::Async, [&]() { ++thenResult; }).then([&]() { + ++thenResult; + }); + + QVERIFY(then.isCanceled()); + QCOMPARE(thenResult, 0); + } + + // QFuture gets canceled after continuations are set + { + QFutureInterface promise; + + int thenResult = 0; + QFuture then = + promise.future().then(QtFuture::Launch::Async, [&]() { ++thenResult; }).then([&]() { + ++thenResult; + }); + + promise.reportStarted(); + promise.reportCanceled(); + promise.reportFinished(); + + QVERIFY(then.isCanceled()); + QCOMPARE(thenResult, 0); + } +} + +#ifndef QT_NO_EXCEPTIONS +void tst_QFuture::thenOnExceptionFuture() +{ + { + QFutureInterface promise; + + int thenResult = 0; + QFuture then = promise.future().then([&](int res) { thenResult = res; }); + + promise.reportStarted(); + QException e; + promise.reportException(e); + promise.reportFinished(); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + QCOMPARE(thenResult, 0); + } + + // Exception handled inside the continuation + { + QFutureInterface promise; + + bool caught = false; + bool caughtByContinuation = false; + bool success = false; + int thenResult = 0; + QFuture then = promise.future() + .then([&](QFuture res) { + try { + thenResult = res.result(); + } catch (QException &) { + caughtByContinuation = true; + } + }) + .then([&]() { success = true; }); + + promise.reportStarted(); + QException e; + promise.reportException(e); + promise.reportFinished(); + + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + + QCOMPARE(thenResult, 0); + QVERIFY(!caught); + QVERIFY(caughtByContinuation); + QVERIFY(success); + } + + // Exception future + { + QFutureInterface promise; + promise.reportStarted(); + QException e; + promise.reportException(e); + promise.reportFinished(); + + int thenResult = 0; + QFuture then = promise.future().then([&](int res) { thenResult = res; }); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + QCOMPARE(thenResult, 0); + } + + // Same with QtFuture::Launch::Async + { + QFutureInterface promise; + + int thenResult = 0; + QFuture then = + promise.future().then(QtFuture::Launch::Async, [&](int res) { thenResult = res; }); + + promise.reportStarted(); + QException e; + promise.reportException(e); + promise.reportFinished(); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + QCOMPARE(thenResult, 0); + } + + // Exception future + { + QFutureInterface promise; + promise.reportStarted(); + QException e; + promise.reportException(e); + promise.reportFinished(); + + int thenResult = 0; + QFuture then = + promise.future().then(QtFuture::Launch::Async, [&](int res) { thenResult = res; }); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + QCOMPARE(thenResult, 0); + } +} + +void tst_QFuture::thenThrows() +{ + // Continuation throws an exception + { + QFutureInterface promise; + + QFuture then = promise.future().then([]() { throw QException(); }); + + promise.reportStarted(); + promise.reportFinished(); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + } + + // Same with QtFuture::Launch::Async + { + QFutureInterface promise; + + QFuture then = + promise.future().then(QtFuture::Launch::Async, []() { throw QException(); }); + + promise.reportStarted(); + promise.reportFinished(); + + bool caught = false; + try { + then.waitForFinished(); + } catch (QException &) { + caught = true; + } + QVERIFY(caught); + } +} +#endif + QTEST_MAIN(tst_QFuture) #include "tst_qfuture.moc" -- cgit v1.2.3