diff options
-rw-r--r-- | src/corelib/thread/qsemaphore.cpp | 131 | ||||
-rw-r--r-- | tests/auto/corelib/thread/qsemaphore/tst_qsemaphore.cpp | 69 |
2 files changed, 157 insertions, 43 deletions
diff --git a/src/corelib/thread/qsemaphore.cpp b/src/corelib/thread/qsemaphore.cpp index 012628ef63..f4674fcc1d 100644 --- a/src/corelib/thread/qsemaphore.cpp +++ b/src/corelib/thread/qsemaphore.cpp @@ -1,7 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2017 The Qt Company Ltd. -** Copyright (C) 2017 Intel Corporation. +** Copyright (C) 2018 Intel Corporation. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -119,28 +119,37 @@ using namespace QtFutex; acquire tokens. Which ones get woken up is unspecified. If the system has the ability to wake up a precise number of threads, has - Linux's FUTEX_WAKE_OP functionality, and is 64-bit, we'll use the high word - as a copy of the low word, but the sign bit indicating the presence of a - thread waiting for multiple tokens. So when releasing n tokens on those - systems, we tell the kernel to wake up n single-token threads and all of - the multi-token ones, then clear that wait bit. Which threads get woken up - is unspecified, but it's likely single-token threads will get woken up - first. + Linux's FUTEX_WAKE_OP functionality, and is 64-bit, instead of using a + single bit indicating a contended semaphore, we'll store the total number + of waiters in the high word. Additionally, all multi-token waiters will be + waiting on that high word. So when releasing n tokens on those systems, we + tell the kernel to wake up n single-token threads and all of the + multi-token ones. Which threads get woken up is unspecified, but it's + likely single-token threads will get woken up first. */ -static const quint32 futexContendedBit = 1U << 31; + +#if defined(FUTEX_OP) && QT_POINTER_SIZE > 4 +static Q_CONSTEXPR bool futexHasWaiterCount = true; +#else +static Q_CONSTEXPR bool futexHasWaiterCount = false; +#endif + +static const quintptr futexNeedsWakeAllBit = + Q_UINT64_C(1) << (sizeof(quintptr) * CHAR_BIT - 1); static int futexAvailCounter(quintptr v) { // the low 31 bits - return int(v & (futexContendedBit - 1)); + return int(v & 0x7fffffffU); } -static quintptr futexCounterParcel(int n) +static bool futexNeedsWake(quintptr v) { - // replicate the 31 bits if we're on 64-bit - quint64 nn = quint32(n); - nn |= (nn << 32); - return quintptr(nn); + // If we're counting waiters, the number of waiters is stored in the low 31 + // bits of the high word (that is, bits 32-62). If we're not, then we use + // bit 31 to indicate anyone is waiting. Either way, if any bit 31 or above + // is set, there are waiters. + return v >> 31; } static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *ptr) @@ -152,9 +161,6 @@ static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *p return result; } -#ifdef FUTEX_OP -// quintptr might be 32bit, in which case we want this to be 0, without implicitly casting. -static const quintptr futexMultiWaiterBit = static_cast<quintptr>(Q_UINT64_C(1) << 63); static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *ptr) { auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr); @@ -163,18 +169,21 @@ static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> * #endif return result; } -#endif -template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, int timeout) +template <bool IsTimed> bool +futexSemaphoreTryAcquire_loop(QBasicAtomicInteger<quintptr> &u, quintptr curValue, quintptr nn, int timeout) { QDeadlineTimer timer(IsTimed ? QDeadlineTimer(timeout) : QDeadlineTimer()); - quintptr curValue = u.loadAcquire(); qint64 remainingTime = timeout * Q_INT64_C(1000) * 1000; + int n = int(unsigned(nn)); + + // we're called after one testAndSet, so start by waiting first + goto start_wait; + forever { - int available = futexAvailCounter(curValue); - if (available >= n) { + if (futexAvailCounter(curValue) >= n) { // try to acquire - quintptr newValue = curValue - futexCounterParcel(n); + quintptr newValue = curValue - nn; if (u.testAndSetOrdered(curValue, newValue, curValue)) return true; // succeeded! continue; @@ -184,19 +193,18 @@ template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintp if (remainingTime == 0) return false; - // set the contended and multi-wait bits - quintptr bitsToSet = futexContendedBit; + // indicate we're waiting +start_wait: auto ptr = futexLow32(&u); -#ifdef FUTEX_OP - if (n > 1 && sizeof(curValue) >= sizeof(int)) { - bitsToSet |= futexMultiWaiterBit; - ptr = futexHigh32(&u); + if (n > 1 || !futexHasWaiterCount) { + u.fetchAndOrRelaxed(futexNeedsWakeAllBit); + curValue |= futexNeedsWakeAllBit; + if (n > 1 && futexHasWaiterCount) { + ptr = futexHigh32(&u); + //curValue >>= 32; // but this is UB in 32-bit, so roundabout: + curValue = quint64(curValue) >> 32; + } } -#endif - - // the value is the same for either branch - u.fetchAndOrRelaxed(bitsToSet); - curValue |= bitsToSet; if (IsTimed && remainingTime > 0) { bool timedout = !futexWait(*ptr, curValue, remainingTime); @@ -212,6 +220,47 @@ template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintp } } +template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, int timeout) +{ + // Try to acquire without waiting (we still loop because the testAndSet + // call can fail). + quintptr curValue = u.loadAcquire(); + while (futexAvailCounter(curValue) >= n) { + // try to acquire + quintptr newValue = curValue - n; + if (u.testAndSetOrdered(curValue, newValue, curValue)) + return true; // succeeded! + } + if (timeout == 0) + return false; + + // we need to wait + quintptr valueToSubtract = unsigned(n); + quintptr oneWaiter = quintptr(Q_UINT64_C(1) << 32); // zero on 32-bit + if (futexHasWaiterCount) { + // increase the waiter count + u.fetchAndAddRelaxed(oneWaiter); + + // We don't use the fetched value from above so futexWait() fails if + // it changed after the testAndSetOrdered above. + curValue += oneWaiter; + + // Also adjust valueToSubtract to subtract oneWaiter when we succeed in + // acquiring. + valueToSubtract += oneWaiter; + } + + if (futexSemaphoreTryAcquire_loop<IsTimed>(u, curValue, valueToSubtract, timeout)) + return true; + + if (futexHasWaiterCount) { + // decrement the number of threads waiting + Q_ASSERT(futexHigh32(&u)->load() & 0x7fffffffU); + u.fetchAndSubRelaxed(oneWaiter); + } + return false; +} + class QSemaphorePrivate { public: inline QSemaphorePrivate(int n) : avail(n) { } @@ -289,10 +338,10 @@ void QSemaphore::release(int n) Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative"); if (futexAvailable()) { - quintptr prevValue = u.fetchAndAddRelease(futexCounterParcel(n)); - if (prevValue & futexContendedBit) { + quintptr prevValue = u.fetchAndAddRelease(n); + if (futexNeedsWake(prevValue)) { #ifdef FUTEX_OP - if (sizeof(u) == sizeof(int)) { + if (!futexHasWaiterCount) { /* On 32-bit systems, all waiters are waiting on the same address, so we'll wake them all and ask the kernel to clear the high bit. @@ -317,9 +366,6 @@ void QSemaphore::release(int n) the kernel to wake up n single-token waiters and all multi-token waiters (if any), then clear the multi-token wait bit. - That means we must clear the contention bit ourselves. See - below for handling the race. - atomic { int oldval = *upper; *upper = oldval & ~(1 << 31); @@ -332,7 +378,6 @@ void QSemaphore::release(int n) quint32 oparg = 31; quint32 cmp = FUTEX_OP_CMP_LT; quint32 cmparg = 0; - futexLow32(&u)->fetchAndAndRelease(futexContendedBit - 1); futexWakeOp(*futexLow32(&u), n, INT_MAX, *futexHigh32(&u), FUTEX_OP(op, oparg, cmp, cmparg)); } #else @@ -343,7 +388,7 @@ void QSemaphore::release(int n) // its acquisition anyway, so it has to wait; // 2) it did not see the new counter value, in which case its // futexWait will fail. - u.fetchAndAndRelease(futexContendedBit - 1); + u.fetchAndAndRelease(futexNeedsWakeAllBit - 1); futexWakeAll(u); #endif } diff --git a/tests/auto/corelib/thread/qsemaphore/tst_qsemaphore.cpp b/tests/auto/corelib/thread/qsemaphore/tst_qsemaphore.cpp index a67ecc2471..b7134d0454 100644 --- a/tests/auto/corelib/thread/qsemaphore/tst_qsemaphore.cpp +++ b/tests/auto/corelib/thread/qsemaphore/tst_qsemaphore.cpp @@ -37,6 +37,8 @@ class tst_QSemaphore : public QObject Q_OBJECT private slots: void acquire(); + void multiRelease(); + void multiAcquireRelease(); void tryAcquire(); void tryAcquireWithTimeout_data(); void tryAcquireWithTimeout(); @@ -149,6 +151,73 @@ void tst_QSemaphore::acquire() QCOMPARE(semaphore.available(), 0); } +void tst_QSemaphore::multiRelease() +{ + class Thread : public QThread + { + public: + QSemaphore &sem; + Thread(QSemaphore &sem) : sem(sem) {} + + void run() override + { + sem.acquire(); + } + }; + + QSemaphore sem; + QVector<Thread *> threads; + threads.resize(4); + + for (Thread *&t : threads) + t = new Thread(sem); + for (Thread *&t : threads) + t->start(); + + // wait for all threads to reach the sem.acquire() and then + // release them all + QTest::qSleep(1); + sem.release(threads.size()); + + for (Thread *&t : threads) + t->wait(); + qDeleteAll(threads); +} + +void tst_QSemaphore::multiAcquireRelease() +{ + class Thread : public QThread + { + public: + QSemaphore &sem; + Thread(QSemaphore &sem) : sem(sem) {} + + void run() override + { + sem.acquire(); + sem.release(); + } + }; + + QSemaphore sem; + QVector<Thread *> threads; + threads.resize(4); + + for (Thread *&t : threads) + t = new Thread(sem); + for (Thread *&t : threads) + t->start(); + + // wait for all threads to reach the sem.acquire() and then + // release them all + QTest::qSleep(1); + sem.release(); + + for (Thread *&t : threads) + t->wait(); + qDeleteAll(threads); +} + void tst_QSemaphore::tryAcquire() { QSemaphore semaphore; |