summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSvenn-Arne Dragly <svenn-arne.dragly@qt.io>2017-09-05 15:02:20 +0200
committerSvenn-Arne Dragly <svenn-arne.dragly@qt.io>2017-10-04 12:00:33 +0000
commitba423261cd9abedda8b732c82371515003d385ce (patch)
tree78c7a4997deaf6ae6e5b20e99f72c2f03e89f6c3
parentd7c57fa68e7bfa1fcb1bca3bcc1ea3e3668167a9 (diff)
Improve performance in QThreadPool
When many runnables are executed, this improves the performance by not resizing the queue for each runnable, which was the case in the previous version, because of many calls to QVector::takeFirst(). Also add a test that makes sure tryTake() is safe to call and does not leave the queue in a bad state that tries to use nullptr entries. Change-Id: I608134ecfa9cfc03db4878dcbd6f9c1107e13e90 Reviewed-by: Lars Knoll <lars.knoll@qt.io>
-rw-r--r--src/corelib/thread/qthreadpool.cpp87
-rw-r--r--src/corelib/thread/qthreadpool_p.h84
-rw-r--r--tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp64
3 files changed, 204 insertions, 31 deletions
diff --git a/src/corelib/thread/qthreadpool.cpp b/src/corelib/thread/qthreadpool.cpp
index f3ce1f258f..ccd8194b35 100644
--- a/src/corelib/thread/qthreadpool.cpp
+++ b/src/corelib/thread/qthreadpool.cpp
@@ -73,7 +73,7 @@ public:
\internal
*/
QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager)
- :manager(manager), runnable(0)
+ :manager(manager), runnable(nullptr)
{ }
/*
@@ -84,7 +84,7 @@ void QThreadPoolThread::run()
QMutexLocker locker(&manager->mutex);
for(;;) {
QRunnable *r = runnable;
- runnable = 0;
+ runnable = nullptr;
do {
if (r) {
@@ -116,8 +116,19 @@ void QThreadPoolThread::run()
if (manager->tooManyThreadsActive())
break;
- r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0;
- } while (r != 0);
+ if (manager->queue.isEmpty()) {
+ r = nullptr;
+ break;
+ }
+
+ QueuePage *page = manager->queue.first();
+ r = page->pop();
+
+ if (page->isFinished()) {
+ manager->queue.removeFirst();
+ delete page;
+ }
+ } while (true);
if (manager->isExiting) {
registerThreadInactive();
@@ -163,6 +174,7 @@ QThreadPoolPrivate:: QThreadPoolPrivate()
bool QThreadPoolPrivate::tryStart(QRunnable *task)
{
+ Q_ASSERT(task != nullptr);
if (allThreads.isEmpty()) {
// always create at least one thread
startThread(task);
@@ -183,7 +195,7 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
if (!expiredThreads.isEmpty()) {
// restart an expired thread
QThreadPoolThread *thread = expiredThreads.dequeue();
- Q_ASSERT(thread->runnable == 0);
+ Q_ASSERT(thread->runnable == nullptr);
++activeThreads;
@@ -199,22 +211,25 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
return true;
}
-inline bool operator<(int priority, const QPair<QRunnable *, int> &p)
-{ return p.second < priority; }
-inline bool operator<(const QPair<QRunnable *, int> &p, int priority)
-{ return priority < p.second; }
+inline bool comparePriority(int priority, const QueuePage *p)
+{
+ return p->priority() < priority;
+}
void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
{
+ Q_ASSERT(runnable != nullptr);
if (runnable->autoDelete())
++runnable->ref;
- // put it on the queue
- QVector<QPair<QRunnable *, int> >::const_iterator begin = queue.constBegin();
- QVector<QPair<QRunnable *, int> >::const_iterator it = queue.constEnd();
- if (it != begin && priority > (*(it - 1)).second)
- it = std::upper_bound(begin, --it, priority);
- queue.insert(it - begin, qMakePair(runnable, priority));
+ for (QueuePage *page : qAsConst(queue)) {
+ if (page->priority() == priority && !page->isFull()) {
+ page->push(runnable);
+ return;
+ }
+ }
+ auto it = std::upper_bound(queue.constBegin(), queue.constEnd(), priority, comparePriority);
+ queue.insert(std::distance(queue.constBegin(), it), new QueuePage(runnable, priority));
}
int QThreadPoolPrivate::activeThreadCount() const
@@ -228,8 +243,18 @@ int QThreadPoolPrivate::activeThreadCount() const
void QThreadPoolPrivate::tryToStartMoreThreads()
{
// try to push tasks on the queue to any available threads
- while (!queue.isEmpty() && tryStart(queue.constFirst().first))
- queue.removeFirst();
+ while (!queue.isEmpty()) {
+ QueuePage *page = queue.first();
+ if (!tryStart(page->first()))
+ break;
+
+ page->pop();
+
+ if (page->isFinished()) {
+ queue.removeFirst();
+ delete page;
+ }
+ }
}
bool QThreadPoolPrivate::tooManyThreadsActive() const
@@ -243,6 +268,7 @@ bool QThreadPoolPrivate::tooManyThreadsActive() const
*/
void QThreadPoolPrivate::startThread(QRunnable *runnable)
{
+ Q_ASSERT(runnable != nullptr);
QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this));
thread->setObjectName(QLatin1String("Thread (pooled)"));
Q_ASSERT(!allThreads.contains(thread.data())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
@@ -306,12 +332,14 @@ bool QThreadPoolPrivate::waitForDone(int msecs)
void QThreadPoolPrivate::clear()
{
QMutexLocker locker(&mutex);
- for (QVector<QPair<QRunnable *, int> >::const_iterator it = queue.constBegin();
- it != queue.constEnd(); ++it) {
- QRunnable* r = it->first;
- if (r->autoDelete() && !--r->ref)
- delete r;
+ for (QueuePage *page : qAsConst(queue)) {
+ while (!page->isFinished()) {
+ QRunnable *r = page->pop();
+ if (r && r->autoDelete() && !--r->ref)
+ delete r;
+ }
}
+ qDeleteAll(queue);
queue.clear();
}
@@ -336,22 +364,21 @@ bool QThreadPool::tryTake(QRunnable *runnable)
{
Q_D(QThreadPool);
- if (runnable == 0)
+ if (runnable == nullptr)
return false;
{
QMutexLocker locker(&d->mutex);
- auto it = d->queue.begin();
- auto end = d->queue.end();
-
- while (it != end) {
- if (it->first == runnable) {
- d->queue.erase(it);
+ for (QueuePage *page : qAsConst(d->queue)) {
+ if (page->tryTake(runnable)) {
+ if (page->isFinished()) {
+ d->queue.removeOne(page);
+ delete page;
+ }
if (runnable->autoDelete())
--runnable->ref; // undo ++ref in start()
return true;
}
- ++it;
}
}
diff --git a/src/corelib/thread/qthreadpool_p.h b/src/corelib/thread/qthreadpool_p.h
index 4a9f9e5cfa..18b89bbba9 100644
--- a/src/corelib/thread/qthreadpool_p.h
+++ b/src/corelib/thread/qthreadpool_p.h
@@ -62,6 +62,87 @@
QT_BEGIN_NAMESPACE
+class QueuePage {
+public:
+ enum {
+ MaxPageSize = 256
+ };
+
+ QueuePage(QRunnable *runnable, int pri)
+ : m_priority(pri)
+ {
+ push(runnable);
+ }
+
+ bool isFull() {
+ return m_lastIndex >= MaxPageSize - 1;
+ }
+
+ bool isFinished() {
+ return m_firstIndex > m_lastIndex;
+ }
+
+ void push(QRunnable *runnable) {
+ Q_ASSERT(runnable != nullptr);
+ Q_ASSERT(!isFull());
+ m_lastIndex += 1;
+ m_entries[m_lastIndex] = runnable;
+ }
+
+ void skipToNextOrEnd() {
+ while (!isFinished() && m_entries[m_firstIndex] == nullptr) {
+ m_firstIndex += 1;
+ }
+ }
+
+ QRunnable *first() {
+ Q_ASSERT(!isFinished());
+ QRunnable *runnable = m_entries[m_firstIndex];
+ Q_ASSERT(runnable);
+ return runnable;
+ }
+
+ QRunnable *pop() {
+ Q_ASSERT(!isFinished());
+ QRunnable *runnable = first();
+ Q_ASSERT(runnable);
+
+ // clear the entry although this should not be necessary
+ m_entries[m_firstIndex] = nullptr;
+ m_firstIndex += 1;
+
+ // make sure the next runnable returned by first() is not a nullptr
+ skipToNextOrEnd();
+
+ return runnable;
+ }
+
+ bool tryTake(QRunnable *runnable) {
+ Q_ASSERT(!isFinished());
+ for (int i = m_firstIndex; i <= m_lastIndex; i++) {
+ if (m_entries[i] == runnable) {
+ m_entries[i] = nullptr;
+ if (i == m_firstIndex) {
+ // make sure first() does not return a nullptr
+ skipToNextOrEnd();
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ int priority() const {
+ return m_priority;
+ }
+
+private:
+ int m_priority = 0;
+ int m_firstIndex = 0;
+ int m_lastIndex = -1;
+ QRunnable *m_entries[MaxPageSize];
+};
+
class QThreadPoolThread;
class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate
{
@@ -83,12 +164,13 @@ public:
bool waitForDone(int msecs);
void clear();
void stealAndRunRunnable(QRunnable *runnable);
+ void deletePageIfFinished(QueuePage *page);
mutable QMutex mutex;
QList<QThreadPoolThread *> allThreads;
QQueue<QThreadPoolThread *> waitingThreads;
QQueue<QThreadPoolThread *> expiredThreads;
- QVector<QPair<QRunnable *, int> > queue;
+ QVector<QueuePage*> queue;
QWaitCondition noActiveThreads;
bool isExiting;
diff --git a/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp b/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp
index fdc4ecb5c8..66853a88d8 100644
--- a/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp
+++ b/tests/auto/corelib/thread/qthreadpool/tst_qthreadpool.cpp
@@ -93,6 +93,7 @@ private slots:
void waitForDoneTimeout();
void destroyingWaitsForTasksToFinish();
void stressTest();
+ void takeAllAndIncreaseMaxThreadCount();
private:
QMutex m_functionTestMutex;
@@ -1199,5 +1200,68 @@ void tst_QThreadPool::stressTest()
}
}
+void tst_QThreadPool::takeAllAndIncreaseMaxThreadCount() {
+ class Task : public QRunnable
+ {
+ public:
+ Task(QSemaphore *mainBarrier, QSemaphore *threadBarrier)
+ : m_mainBarrier(mainBarrier)
+ , m_threadBarrier(threadBarrier)
+ {
+ setAutoDelete(false);
+ }
+
+ void run() {
+ m_mainBarrier->release();
+ m_threadBarrier->acquire();
+ }
+ private:
+ QSemaphore *m_mainBarrier;
+ QSemaphore *m_threadBarrier;
+ };
+
+ QSemaphore mainBarrier;
+ QSemaphore taskBarrier;
+
+ QThreadPool threadPool;
+ threadPool.setMaxThreadCount(1);
+
+ Task *task1 = new Task(&mainBarrier, &taskBarrier);
+ Task *task2 = new Task(&mainBarrier, &taskBarrier);
+ Task *task3 = new Task(&mainBarrier, &taskBarrier);
+
+ threadPool.start(task1);
+ threadPool.start(task2);
+ threadPool.start(task3);
+
+ mainBarrier.acquire(1);
+
+ QCOMPARE(threadPool.activeThreadCount(), 1);
+
+ QVERIFY(!threadPool.tryTake(task1));
+ QVERIFY(threadPool.tryTake(task2));
+ QVERIFY(threadPool.tryTake(task3));
+
+ // A bad queue implementation can segfault here because two consecutive items in the queue
+ // have been taken
+ threadPool.setMaxThreadCount(4);
+
+ // Even though we increase the max thread count, there should only be one job to run
+ QCOMPARE(threadPool.activeThreadCount(), 1);
+
+ // Make sure jobs 2 and 3 never started
+ QCOMPARE(mainBarrier.available(), 0);
+
+ taskBarrier.release(1);
+
+ threadPool.waitForDone();
+
+ QCOMPARE(threadPool.activeThreadCount(), 0);
+
+ delete task1;
+ delete task2;
+ delete task3;
+}
+
QTEST_MAIN(tst_QThreadPool);
#include "tst_qthreadpool.moc"