diff options
Diffstat (limited to 'src/corelib/thread/qsemaphore.cpp')
-rw-r--r-- | src/corelib/thread/qsemaphore.cpp | 207 |
1 files changed, 202 insertions, 5 deletions
diff --git a/src/corelib/thread/qsemaphore.cpp b/src/corelib/thread/qsemaphore.cpp index 96c031eec6..21acc533a1 100644 --- a/src/corelib/thread/qsemaphore.cpp +++ b/src/corelib/thread/qsemaphore.cpp @@ -1,6 +1,7 @@ /**************************************************************************** ** -** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2017 The Qt Company Ltd. +** Copyright (C) 2017 Intel Corporation. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -41,12 +42,15 @@ #ifndef QT_NO_THREAD #include "qmutex.h" +#include "qfutex_p.h" #include "qwaitcondition.h" #include "qdeadlinetimer.h" #include "qdatetime.h" QT_BEGIN_NAMESPACE +using namespace QtFutex; + /*! \class QSemaphore \inmodule QtCore @@ -97,6 +101,117 @@ QT_BEGIN_NAMESPACE \sa QSemaphoreReleaser, QMutex, QWaitCondition, QThread, {Semaphores Example} */ +/* + QSemaphore futex operation + + QSemaphore stores a 32-bit integer with the counter of currently available + tokens (value between 0 and INT_MAX). When a thread attempts to acquire n + tokens and the counter is larger than that, we perform a compare-and-swap + with the new count. If that succeeds, the acquisition worked; if not, we + loop again because the counter changed. If there were not enough tokens, + we'll perform a futex-wait. + + Before we do, we set the high bit in the futex to indicate that semaphore + is contended: that is, there's a thread waiting for more tokens. On + release() for n tokens, we perform a fetch-and-add of n and then check if + that high bit was set. If it was, then we clear that bit and perform a + futex-wake on the semaphore to indicate the waiting threads can wake up and + acquire tokens. Which ones get woken up is unspecified. + + If the system has the ability to wake up a precise number of threads, has + Linux's FUTEX_WAKE_OP functionality, and is 64-bit, we'll use the high word + as a copy of the low word, but the sign bit indicating the presence of a + thread waiting for multiple tokens. So when releasing n tokens on those + systems, we tell the kernel to wake up n single-token threads and all of + the multi-token ones, then clear that wait bit. Which threads get woken up + is unspecified, but it's likely single-token threads will get woken up + first. + */ +static const quint32 futexContendedBit = 1U << 31; + +static int futexAvailCounter(quintptr v) +{ + // the low 31 bits + return int(v & (futexContendedBit - 1)); +} + +static quintptr futexCounterParcel(int n) +{ + // replicate the 31 bits if we're on 64-bit + quint64 nn = quint32(n); + nn |= (nn << 32); + return quintptr(nn); +} + +static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *ptr) +{ + auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr); +#if Q_BYTE_ORDER == Q_BIG_ENDIAN && QT_POINTER_SIZE > 4 + ++result; +#endif + return result; +} + +#ifdef FUTEX_OP +// quintptr might be 32bit, in which case we want this to be 0, without implicitly casting. +static const quintptr futexMultiWaiterBit = static_cast<quintptr>(Q_UINT64_C(1) << 63); +static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *ptr) +{ + auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr); +#if Q_BYTE_ORDER == Q_LITTLE_ENDIAN && QT_POINTER_SIZE > 4 + ++result; +#endif + return result; +} +#endif + +template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, int timeout) +{ + QDeadlineTimer timer(IsTimed ? QDeadlineTimer(timeout) : QDeadlineTimer()); + quintptr curValue = u.loadAcquire(); + qint64 remainingTime = timeout * Q_INT64_C(1000) * 1000; + forever { + int available = futexAvailCounter(curValue); + if (available >= n) { + // try to acquire + quintptr newValue = curValue - futexCounterParcel(n); + if (u.testAndSetOrdered(curValue, newValue, curValue)) + return true; // succeeded! + continue; + } + + // not enough tokens available, put us to wait + if (remainingTime == 0) + return false; + + // set the contended and multi-wait bits + quintptr bitsToSet = futexContendedBit; + auto ptr = futexLow32(&u); +#ifdef FUTEX_OP + if (n > 1 && sizeof(curValue) >= sizeof(int)) { + bitsToSet |= futexMultiWaiterBit; + ptr = futexHigh32(&u); + } +#endif + + // the value is the same for either branch + u.fetchAndOrRelaxed(bitsToSet); + curValue |= bitsToSet; + + if (IsTimed && remainingTime > 0) { + bool timedout = !futexWait(*ptr, curValue, remainingTime); + if (timedout) + return false; + } else { + futexWait(*ptr, curValue); + } + + curValue = u.loadAcquire(); + if (IsTimed) + remainingTime = timer.remainingTimeNSecs(); + } +} + class QSemaphorePrivate { public: inline QSemaphorePrivate(int n) : avail(n) { } @@ -116,7 +231,10 @@ public: QSemaphore::QSemaphore(int n) { Q_ASSERT_X(n >= 0, "QSemaphore", "parameter 'n' must be non-negative"); - d = new QSemaphorePrivate(n); + if (futexAvailable()) + u.store(n); + else + d = new QSemaphorePrivate(n); } /*! @@ -126,7 +244,10 @@ QSemaphore::QSemaphore(int n) undefined behavior. */ QSemaphore::~QSemaphore() -{ delete d; } +{ + if (!futexAvailable()) + delete d; +} /*! Tries to acquire \c n resources guarded by the semaphore. If \a n @@ -138,6 +259,12 @@ QSemaphore::~QSemaphore() void QSemaphore::acquire(int n) { Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative"); + + if (futexAvailable()) { + futexSemaphoreTryAcquire<false>(u, n, -1); + return; + } + QMutexLocker locker(&d->mutex); while (n > d->avail) d->cond.wait(locker.mutex()); @@ -160,6 +287,69 @@ void QSemaphore::acquire(int n) void QSemaphore::release(int n) { Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative"); + + if (futexAvailable()) { + quintptr prevValue = u.fetchAndAddRelease(futexCounterParcel(n)); + if (prevValue & futexContendedBit) { +#ifdef FUTEX_OP + if (sizeof(u) == sizeof(int)) { + /* + On 32-bit systems, all waiters are waiting on the same address, + so we'll wake them all and ask the kernel to clear the high bit. + + atomic { + int oldval = u; + u = oldval & ~(1 << 31); + futexWake(u, INT_MAX); + if (oldval == 0) // impossible condition + futexWake(u, INT_MAX); + } + */ + quint32 op = FUTEX_OP_ANDN | FUTEX_OP_OPARG_SHIFT; + quint32 oparg = 31; + quint32 cmp = FUTEX_OP_CMP_EQ; + quint32 cmparg = 0; + futexWakeOp(u, INT_MAX, INT_MAX, u, FUTEX_OP(op, oparg, cmp, cmparg)); + } else { + /* + On 64-bit systems, the single-token waiters wait on the low half + and the multi-token waiters wait on the upper half. So we ask + the kernel to wake up n single-token waiters and all multi-token + waiters (if any), then clear the multi-token wait bit. + + That means we must clear the contention bit ourselves. See + below for handling the race. + + atomic { + int oldval = *upper; + *upper = oldval & ~(1 << 31); + futexWake(lower, n); + if (oldval < 0) // sign bit set + futexWake(upper, INT_MAX); + } + */ + quint32 op = FUTEX_OP_ANDN | FUTEX_OP_OPARG_SHIFT; + quint32 oparg = 31; + quint32 cmp = FUTEX_OP_CMP_LT; + quint32 cmparg = 0; + futexLow32(&u)->fetchAndAndRelease(futexContendedBit - 1); + futexWakeOp(*futexLow32(&u), n, INT_MAX, *futexHigh32(&u), FUTEX_OP(op, oparg, cmp, cmparg)); + } +#else + // Unset the bit and wake everyone. There are two possibibilies + // under which a thread can set the bit between the AND and the + // futexWake: + // 1) it did see the new counter value, but it wasn't enough for + // its acquisition anyway, so it has to wait; + // 2) it did not see the new counter value, in which case its + // futexWait will fail. + u.fetchAndAndRelease(futexContendedBit - 1); + futexWakeAll(u); +#endif + } + return; + } + QMutexLocker locker(&d->mutex); d->avail += n; d->cond.wakeAll(); @@ -173,6 +363,9 @@ void QSemaphore::release(int n) */ int QSemaphore::available() const { + if (futexAvailable()) + return futexAvailCounter(u.load()); + QMutexLocker locker(&d->mutex); return d->avail; } @@ -191,6 +384,10 @@ int QSemaphore::available() const bool QSemaphore::tryAcquire(int n) { Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative"); + + if (futexAvailable()) + return futexSemaphoreTryAcquire<false>(u, n, 0); + QMutexLocker locker(&d->mutex); if (n > d->avail) return false; @@ -217,8 +414,8 @@ bool QSemaphore::tryAcquire(int n) bool QSemaphore::tryAcquire(int n, int timeout) { Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative"); - if (timeout < 0) - return tryAcquire(n); + if (futexAvailable()) + return futexSemaphoreTryAcquire<true>(u, n, timeout < 0 ? -1 : timeout); QDeadlineTimer timer(timeout); QMutexLocker locker(&d->mutex); |