diff options
-rw-r--r-- | src/corelib/thread/qthreadpool.cpp | 93 | ||||
-rw-r--r-- | src/corelib/thread/qthreadpool_p.h | 1 | ||||
-rw-r--r-- | tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp | 63 |
3 files changed, 110 insertions, 47 deletions
diff --git a/src/corelib/thread/qthreadpool.cpp b/src/corelib/thread/qthreadpool.cpp index 4d7d7a2250..96eae6f3f3 100644 --- a/src/corelib/thread/qthreadpool.cpp +++ b/src/corelib/thread/qthreadpool.cpp @@ -112,14 +112,13 @@ void QThreadPoolThread::run() locker.relock(); } - // if too many threads are active, expire this thread + // if too many threads are active, stop working in this one if (manager->tooManyThreadsActive()) break; - if (manager->queue.isEmpty()) { - r = nullptr; + // all work is done, time to wait for more + if (manager->queue.isEmpty()) break; - } QueuePage *page = manager->queue.first(); r = page->pop(); @@ -130,26 +129,32 @@ void QThreadPoolThread::run() } } while (true); - // if too many threads are active, expire this thread - bool expired = manager->tooManyThreadsActive(); - if (!expired) { - manager->waitingThreads.enqueue(this); + // this thread is about to be deleted, do not wait or expire + if (!manager->allThreads.contains(this)) { registerThreadInactive(); - // wait for work, exiting after the expiry timeout is reached - runnableReady.wait(locker.mutex(), QDeadlineTimer(manager->expiryTimeout)); - ++manager->activeThreads; - if (manager->waitingThreads.removeOne(this)) - expired = true; - if (!manager->allThreads.contains(this)) { - registerThreadInactive(); - break; - } + return; } - if (expired) { + + // if too many threads are active, expire this thread + if (manager->tooManyThreadsActive()) { manager->expiredThreads.enqueue(this); registerThreadInactive(); - break; + return; + } + manager->waitingThreads.enqueue(this); + registerThreadInactive(); + // wait for work, exiting after the expiry timeout is reached + runnableReady.wait(locker.mutex(), QDeadlineTimer(manager->expiryTimeout)); + // this thread is about to be deleted, do not work or expire + if (!manager->allThreads.contains(this)) { + Q_ASSERT(manager->queue.isEmpty()); + return; + } + if (manager->waitingThreads.removeOne(this)) { + manager->expiredThreads.enqueue(this); + return; } + ++manager->activeThreads; } } @@ -176,10 +181,10 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task) } // can't do anything if we're over the limit - if (activeThreadCount() >= maxThreadCount()) + if (areAllThreadsActive()) return false; - if (waitingThreads.count() > 0) { + if (!waitingThreads.isEmpty()) { // recycle an available thread enqueueTask(task); waitingThreads.takeFirst()->runnableReady.wakeOne(); @@ -251,6 +256,12 @@ void QThreadPoolPrivate::tryToStartMoreThreads() } } +bool QThreadPoolPrivate::areAllThreadsActive() const +{ + const int activeThreadCount = this->activeThreadCount(); + return activeThreadCount >= maxThreadCount() && (activeThreadCount - reservedThreads) >= 1; +} + bool QThreadPoolPrivate::tooManyThreadsActive() const { const int activeThreadCount = this->activeThreadCount(); @@ -283,18 +294,20 @@ void QThreadPoolPrivate::startThread(QRunnable *runnable) \internal Helper function only to be called from waitForDone(int) + + Deletes all current threads. */ void QThreadPoolPrivate::reset() { // move the contents of the set out so that we can iterate without the lock - QSet<QThreadPoolThread *> allThreadsCopy; - allThreadsCopy.swap(allThreads); + auto allThreadsCopy = std::exchange(allThreads, {}); expiredThreads.clear(); waitingThreads.clear(); + mutex.unlock(); for (QThreadPoolThread *thread : qAsConst(allThreadsCopy)) { - if (!thread->isFinished()) { + if (thread->isRunning()) { thread->runnableReady.wakeAll(); thread->wait(); } @@ -321,15 +334,13 @@ bool QThreadPoolPrivate::waitForDone(int msecs) { QMutexLocker locker(&mutex); QDeadlineTimer timer(msecs); - do { - if (!waitForDone(timer)) - return false; - reset(); - // More threads can be started during reset(), in that case continue - // waiting if we still have time left. - } while ((!queue.isEmpty() || activeThreads) && !timer.hasExpired()); - - return queue.isEmpty() && activeThreads == 0; + if (!waitForDone(timer)) + return false; + reset(); + // New jobs might have started during reset, but return anyway + // as the active thread and task count did reach 0 once, and + // race conditions are outside our scope. + return true; } void QThreadPoolPrivate::clear() @@ -473,7 +484,10 @@ QThreadPool::QThreadPool(QObject *parent) */ QThreadPool::~QThreadPool() { + Q_D(QThreadPool); waitForDone(); + Q_ASSERT(d->queue.isEmpty()); + Q_ASSERT(d->allThreads.isEmpty()); } /*! @@ -513,12 +527,8 @@ void QThreadPool::start(QRunnable *runnable, int priority) Q_D(QThreadPool); QMutexLocker locker(&d->mutex); - if (!d->tryStart(runnable)) { + if (!d->tryStart(runnable)) d->enqueueTask(runnable, priority); - - if (!d->waitingThreads.isEmpty()) - d->waitingThreads.takeFirst()->runnableReady.wakeOne(); - } } /*! @@ -582,7 +592,7 @@ bool QThreadPool::tryStart(std::function<void()> functionToRun) Q_D(QThreadPool); QMutexLocker locker(&d->mutex); - if (!d->allThreads.isEmpty() && d->activeThreadCount() >= d->maxThreadCount()) + if (!d->allThreads.isEmpty() && d->areAllThreadsActive()) return false; QRunnable *runnable = QRunnable::create(std::move(functionToRun)); @@ -674,7 +684,10 @@ int QThreadPool::activeThreadCount() const Once you are done with the thread, call releaseThread() to allow it to be reused. - \note This function will always increase the number of active threads. + \note Even if reserving maxThreadCount() threads or more, the thread pool + will still allow a minimum of one thread. + + \note This function will increase the reported number of active threads. This means that by using this function, it is possible for activeThreadCount() to return a value greater than maxThreadCount() . diff --git a/src/corelib/thread/qthreadpool_p.h b/src/corelib/thread/qthreadpool_p.h index b21c1b7d41..1b6a65f69d 100644 --- a/src/corelib/thread/qthreadpool_p.h +++ b/src/corelib/thread/qthreadpool_p.h @@ -157,6 +157,7 @@ public: int activeThreadCount() const; void tryToStartMoreThreads(); + bool areAllThreadsActive() const; bool tooManyThreadsActive() const; int maxThreadCount() const diff --git a/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp b/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp index 7f36db03e1..3ce4ee09b3 100644 --- a/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp +++ b/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp @@ -89,6 +89,7 @@ private slots: void releaseThread_data(); void releaseThread(); void reserveAndStart(); + void releaseAndBlock(); void start(); void tryStart(); void tryStartPeakThreadCount(); @@ -713,21 +714,19 @@ void tst_QThreadPool::reserveAndStart() // QTBUG-21051 threadpool->reserveThread(); QCOMPARE(threadpool->activeThreadCount(), 1); - // start a task, to get a running thread + // start a task, to get a running thread, works since one thread is always allowed WaitingTask task; threadpool->start(&task); QCOMPARE(threadpool->activeThreadCount(), 2); + // tryStart() will fail since activeThreadCount() >= maxThreadCount() and one thread is already running + QVERIFY(!threadpool->tryStart(&task)); + QTRY_COMPARE(threadpool->activeThreadCount(), 2); task.waitForStarted.acquire(); task.waitBeforeDone.release(); QTRY_COMPARE(task.count.loadRelaxed(), 1); QTRY_COMPARE(threadpool->activeThreadCount(), 1); - // now the thread is waiting, but tryStart() will fail since activeThreadCount() >= maxThreadCount() - QVERIFY(!threadpool->tryStart(&task)); - QTRY_COMPARE(threadpool->activeThreadCount(), 1); - - // start() will therefore do a failing tryStart(), followed by enqueueTask() - // which will actually wake up the waiting thread. + // start() will wake up the waiting thread. threadpool->start(&task); QTRY_COMPARE(threadpool->activeThreadCount(), 2); task.waitForStarted.acquire(); @@ -741,6 +740,54 @@ void tst_QThreadPool::reserveAndStart() // QTBUG-21051 threadpool->setMaxThreadCount(savedLimit); } +void tst_QThreadPool::releaseAndBlock() +{ + class WaitingTask : public QRunnable + { + public: + QSemaphore waitBeforeDone; + + WaitingTask() { setAutoDelete(false); } + + void run() override + { + waitBeforeDone.acquire(); + } + }; + + // Set up + QThreadPool *threadpool = QThreadPool::globalInstance(); + const int savedLimit = threadpool->maxThreadCount(); + threadpool->setMaxThreadCount(1); + QCOMPARE(threadpool->activeThreadCount(), 0); + + // start a task, to get a running thread, works since one thread is always allowed + WaitingTask task1, task2; + threadpool->start(&task1); + QCOMPARE(threadpool->activeThreadCount(), 1); + + // tryStart() will fail since activeThreadCount() >= maxThreadCount() and one thread is already running + QVERIFY(!threadpool->tryStart(&task2)); + QCOMPARE(threadpool->activeThreadCount(), 1); + + // Use release without reserve to account for the blocking thread. + threadpool->releaseThread(); + QTRY_COMPARE(threadpool->activeThreadCount(), 0); + + // Now we can start task2 + QVERIFY(threadpool->tryStart(&task2)); + QCOMPARE(threadpool->activeThreadCount(), 1); + task2.waitBeforeDone.release(); + QTRY_COMPARE(threadpool->activeThreadCount(), 0); + + threadpool->reserveThread(); + QCOMPARE(threadpool->activeThreadCount(), 1); + task1.waitBeforeDone.release(); + QTRY_COMPARE(threadpool->activeThreadCount(), 0); + + threadpool->setMaxThreadCount(savedLimit); +} + static QAtomicInt count; class CountingRunnable : public QRunnable { @@ -917,6 +964,7 @@ void tst_QThreadPool::waitForDone() { QElapsedTimer total, pass; total.start(); + pass.start(); QThreadPool threadPool; while (total.elapsed() < 10000) { @@ -1101,6 +1149,7 @@ void tst_QThreadPool::destroyingWaitsForTasksToFinish() { QElapsedTimer total, pass; total.start(); + pass.start(); while (total.elapsed() < 10000) { int runs; |