summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAlex Trotsenko <alex1973tr@gmail.com>2021-01-23 13:00:22 +0200
committerAlex Trotsenko <alex1973tr@gmail.com>2021-03-02 22:53:06 +0200
commitf265c87e015c26225b80297f5c9f430ea7030214 (patch)
tree29e7fedb3e6e51e0cc634852079e998c8705a921 /src
parentd316ae8e830b8b86f81307d35583f20773da6a55 (diff)
Allow QWindowsPipe{Reader|Writer} to work with foreign event loops, take 2
When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution, but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I66443c3021d6ba98639a214c3e768be97d2cf14b Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
Diffstat (limited to 'src')
-rw-r--r--src/corelib/io/qwindowspipereader.cpp468
-rw-r--r--src/corelib/io/qwindowspipereader_p.h51
-rw-r--r--src/corelib/io/qwindowspipewriter.cpp348
-rw-r--r--src/corelib/io/qwindowspipewriter_p.h44
4 files changed, 570 insertions, 341 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index b525e88282..bf03737c39 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,58 +39,63 @@
****************************************************************************/
#include "qwindowspipereader_p.h"
-#include "qiodevice_p.h"
-#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
static const DWORD minReadBufferSize = 4096;
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
- bytesPending(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
state(Stopped),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+
+ // Wait for thread pool callback to complete, as it can be still
+ // executing some completion code.
+ WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
Sets the handle to read from. The handle must be valid.
+ Do not call this function while the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
- bytesPending = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
@@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
*/
void QWindowsPipeReader::stop()
{
- state = Stopped;
- cancelAsyncRead();
+ cancelAsyncRead(Stopped);
}
/*!
@@ -108,16 +113,27 @@ void QWindowsPipeReader::stop()
*/
void QWindowsPipeReader::drainAndStop()
{
- state = Draining;
- cancelAsyncRead();
+ cancelAsyncRead(Draining);
+
+ // Note that signals are not emitted in the call below, as the caller
+ // is expected to do that synchronously.
+ consumePending();
}
/*!
Stops the asynchronous read sequence.
*/
-void QWindowsPipeReader::cancelAsyncRead()
+void QWindowsPipeReader::cancelAsyncRead(State newState)
{
+ if (state != Running)
+ return;
+
+ QMutexLocker locker(&mutex);
+ state = newState;
if (readSequenceStarted) {
+ // This can legitimately fail due to the GetOverlappedResult()
+ // in the callback not being locked. We ignore ERROR_NOT_FOUND
+ // in this case.
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -125,11 +141,37 @@ void QWindowsPipeReader::cancelAsyncRead()
handle);
}
}
- waitForNotification(-1);
+
+ // Wait for callback to complete.
+ do {
+ locker.unlock();
+ waitForNotification(QDeadlineTimer(-1));
+ locker.relock();
+ } while (readSequenceStarted);
}
}
/*!
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+*/
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
+}
+
+/*!
Returns the number of bytes we've read so far.
*/
qint64 QWindowsPipeReader::bytesAvailable() const
@@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
if (state == Running)
@@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
- \internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (state != Running)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (state == Stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emitPipeClosed();
+ state = Running;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
- // Read all pending data from the pipe's buffer in 'Draining' state.
- if (state == Draining) {
- // Determine the number of pending bytes on the first iteration.
- if (bytesPending == 0)
- bytesPending = checkPipeState();
- else
- bytesPending -= numberOfBytesRead;
+ SetEvent(syncHandle);
+}
- if (bytesPending == 0) // all data received
- return; // unblock waitForNotification() in cancelAsyncRead()
+/*!
+ Starts a new read sequence. Thread-safety should be ensured
+ by the caller.
+ */
+void QWindowsPipeReader::startAsyncReadLocked()
+{
+ // Determine the number of bytes to read.
+ qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
- startAsyncReadHelper(bytesPending);
- if (readSequenceStarted)
- notifiedCalled = false; // wait for more data
+ // This can happen only while draining; just do nothing in this case.
+ if (bytesToRead == 0)
return;
+
+ while (lastError == ERROR_SUCCESS) {
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
+ }
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ Q_ASSERT(state == Running);
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!readCompleted(errorCode, numberOfBytesRead))
+ return;
+
+ // In the 'Draining' state, we have to get all the data with one call
+ // to ReadFile(). Note that message mode pipes are not supported here.
+ if (state == Draining) {
+ Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
+ return;
+ }
+
+ // We need to loop until all pending data has been read and an
+ // operation is queued for asynchronous completion.
+ // If the pipe is configured to work in message mode, we read
+ // the data in chunks.
+ bytesToRead = qMax(checkPipeState(), minReadBufferSize);
}
+}
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+/*!
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ pipeReader->mutex.lock();
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed(). Also, if the reader was
+ // stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and
+ // no new read sequence should be started in this case.
+ if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
+ // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
+ // specifically for flushing the pipe.
+ if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
+ errorCode = ERROR_SUCCESS;
+
+ if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
+ pipeReader->startAsyncReadLocked();
+
+ if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ pipeReader->mutex.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ pipeReader->mutex.unlock();
+ }
+ } else {
+ pipeReader->mutex.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeReader->syncHandle);
}
/*!
- \internal
- Starts an asynchronous read sequence on the pipe.
+ Will be called whenever the read operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
*/
-void QWindowsPipeReader::startAsyncRead()
+bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
{
- if (readSequenceStarted)
- return;
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ return true;
+ }
- state = Running;
- startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize));
+ lastError = errorCode;
+ return false;
}
/*!
- \internal
- Starts a new read sequence.
+ Receives notification that the read operation has completed.
*/
-void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead)
+bool QWindowsPipeReader::event(QEvent *e)
{
- Q_ASSERT(bytesToRead != 0);
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
- if (pipeBroken)
- return;
+/*!
+ Updates the read buffer size and emits pending signals in the main thread.
+ Returns \c true, if readyRead() was emitted.
+ */
+bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
+{
+ mutex.lock();
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
- return;
- }
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ const bool emitReadyRead = consumePending();
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ // We are not allowed to emit signals in either 'Stopped'
+ // or 'Draining' state.
+ if (state != Running)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
}
- char *ptr = readBuffer.reserve(bytesToRead);
-
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
-
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit"));
+ emit pipeClosed();
}
+
+ return emitReadyRead;
}
/*!
- \internal
- Called when ReadFileEx finished the read operation.
+ Updates the read buffer size. Returns \c true, if readyRead()
+ should be emitted. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::consumePending()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ return true;
+ }
+
+ return false;
}
/*!
- \internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emitPipeClosed();
- }
+
+ lastError = GetLastError();
return 0;
}
-bool QWindowsPipeReader::waitForNotification(int timeout)
+bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
-}
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
-void QWindowsPipeReader::emitPipeClosed()
-{
- // We are not allowed to emit signals in either 'Stopped'
- // or 'Draining' state.
- if (state == Running)
- emit pipeClosed();
+ return false;
}
/*!
@@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ QDeadlineTimer timer(msecs);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isReadOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
return false;
@@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs)
bool QWindowsPipeReader::waitForPipeClosed(int msecs)
{
const int sleepTime = 10;
- QElapsedTimer stopWatch;
- stopWatch.start();
+ QDeadlineTimer timer(msecs);
+
+ while (waitForReadyRead(timer.remainingTime())) {}
+ if (pipeBroken)
+ return true;
+
+ if (timer.hasExpired())
+ return false;
+
+ // When the read buffer is full, the read sequence is not running,
+ // so we need to peek the pipe to detect disconnection.
forever {
- waitForReadyRead(0);
checkPipeState();
+ consumePendingAndEmit(false);
if (pipeBroken)
return true;
- if (stopWatch.hasExpired(msecs - sleepTime))
+
+ if (timer.hasExpired())
return false;
+
Sleep(sleepTime);
}
}
diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h
index c61018d87d..a284f55b3b 100644
--- a/src/corelib/io/qwindowspipereader_p.h
+++ b/src/corelib/io/qwindowspipereader_p.h
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -52,6 +53,8 @@
//
#include <qobject.h>
+#include <qdeadlinetimer.h>
+#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
@@ -70,7 +73,7 @@ public:
void stop();
void drainAndStop();
- void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
+ void setMaxReadBufferSize(qint64 size);
qint64 maxReadBufferSize() const { return readBufferMaxSize; }
bool isPipeClosed() const { return pipeBroken; }
@@ -80,46 +83,46 @@ public:
bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs);
- bool isReadOperationActive() const { return readSequenceStarted; }
+ bool isReadOperationActive() const;
Q_SIGNALS:
void winError(ulong, const QString &);
void readyRead();
void pipeClosed();
- void _q_queueReadyRead(QPrivateSignal);
+
+protected:
+ bool event(QEvent *e) override;
private:
- void startAsyncReadHelper(qint64 bytesToRead);
- void cancelAsyncRead();
- static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesRead);
+ enum State { Stopped, Running, Draining };
+
+ void startAsyncReadLocked();
+ void cancelAsyncRead(State newState);
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
- bool waitForNotification(int timeout);
- void emitPendingReadyRead();
- void emitPipeClosed();
-
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeReader *reader);
- void clear();
- QWindowsPipeReader *pipeReader;
- };
+ bool waitForNotification(const QDeadlineTimer &deadline);
+ bool consumePendingAndEmit(bool allowWinActPosting);
+ bool consumePending();
HANDLE handle;
- Overlapped overlapped;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
qint64 readBufferMaxSize;
QRingBuffer readBuffer;
qint64 actualReadBufferSize;
- qint64 bytesPending;
+ qint64 pendingReadBytes;
+ mutable QMutex mutex;
+ DWORD lastError;
- enum State { Stopped, Running, Draining } state;
+ State state;
bool readSequenceStarted;
- bool notifiedCalled;
bool pipeBroken;
bool readyReadPending;
+ bool winEventActPosted;
bool inReadyRead;
};
diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp
index e374034a06..5ed584c6e3 100644
--- a/src/corelib/io/qwindowspipewriter.cpp
+++ b/src/corelib/io/qwindowspipewriter.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,189 +39,296 @@
****************************************************************************/
#include "qwindowspipewriter_p.h"
-#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
- : pipeWriter(pipeWriter)
-{
-}
-
-void QWindowsPipeWriter::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
pendingBytesWrittenValue(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
- notifiedCalled(false),
bytesWrittenPending(false),
+ winEventActPosted(false),
inBytesWritten(false)
{
- connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
- this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
-bool QWindowsPipeWriter::waitForWrite(int msecs)
+/*!
+ Stops the asynchronous write sequence.
+ If the write sequence is running then the I/O operation is canceled.
+ */
+void QWindowsPipeWriter::stop()
{
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
- }
-
- if (!writeSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ if (stopped)
+ return;
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
+ mutex.lock();
+ stopped = true;
+ if (writeSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
+ if (!CancelIoEx(handle, &overlapped)) {
+ const DWORD dwError = GetLastError();
+ if (dwError != ERROR_NOT_FOUND) {
+ qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
+ handle);
+ }
+ }
+ writeSequenceStarted = false;
}
+ mutex.unlock();
- return false;
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
+/*!
+ Returns \c true if async operation is in progress or a bytesWritten
+ signal is pending.
+ */
+bool QWindowsPipeWriter::isWriteOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return writeSequenceStarted || bytesWrittenPending;
+}
+
+/*!
+ Returns the number of bytes that are waiting to be written.
+ */
qint64 QWindowsPipeWriter::bytesToWrite() const
{
- return buffer.size() + pendingBytesWrittenValue;
+ QMutexLocker locker(&mutex);
+ return writeBuffer.size() + pendingBytesWrittenValue;
}
-void QWindowsPipeWriter::emitPendingBytesWrittenValue()
+/*!
+ Writes data to the pipe.
+ */
+bool QWindowsPipeWriter::write(const QByteArray &ba)
{
- if (bytesWrittenPending) {
- // Reset the state even if we don't emit bytesWritten().
- // It's a defined behavior to not re-emit this signal recursively.
- bytesWrittenPending = false;
- const qint64 bytes = pendingBytesWrittenValue;
- pendingBytesWrittenValue = 0;
-
- emit canWrite();
- if (!inBytesWritten) {
- QScopedValueRollback<bool> guard(inBytesWritten, true);
- emit bytesWritten(bytes);
- }
+ QMutexLocker locker(&mutex);
+
+ if (lastError != ERROR_SUCCESS)
+ return false;
+
+ writeBuffer.append(ba);
+ if (writeSequenceStarted)
+ return true;
+
+ stopped = false;
+ startAsyncWriteLocked();
+
+ // Do not post the event, if the write operation will be completed asynchronously.
+ if (!bytesWrittenPending)
+ return true;
+
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ SetEvent(syncHandle);
+ return true;
}
-void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+/*!
+ Starts a new write sequence. Thread-safety should be ensured by the caller.
+ */
+void QWindowsPipeWriter::startAsyncWriteLocked()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
+ while (!writeBuffer.isEmpty()) {
+ // WriteFile() returns true, if the write operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesWritten' is valid in this case.
+ DWORD numberOfBytesWritten;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
+ &numberOfBytesWritten, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ writeSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!writeCompleted(errorCode, numberOfBytesWritten))
+ return;
+ }
}
/*!
- \internal
- Will be called whenever the write operation completes.
+ Thread pool callback procedure.
*/
-void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
+void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
- notifiedCalled = true;
- writeSequenceStarted = false;
- Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
- buffer.clear();
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
- break;
- }
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ QMutexLocker locker(&pipeWriter->mutex);
// After the writer was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new write sequence should
- // be started in this case.
- if (stopped)
+ // completion of a cancellation. No signals should be emitted, and no new write sequence
+ // should be started in this case.
+ if (pipeWriter->stopped)
return;
- pendingBytesWrittenValue += qint64(numberOfBytesWritten);
- if (!bytesWrittenPending) {
- bytesWrittenPending = true;
- emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
+ pipeWriter->writeSequenceStarted = false;
+
+ if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
+ pipeWriter->startAsyncWriteLocked();
+
+ if (pipeWriter->lastError == ERROR_SUCCESS && !pipeWriter->winEventActPosted) {
+ pipeWriter->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeWriter->syncHandle);
}
-bool QWindowsPipeWriter::waitForNotification(int timeout)
+/*!
+ Will be called whenever the write operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
+ */
+bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
- return true;
+ if (errorCode == ERROR_SUCCESS) {
+ Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
+ bytesWrittenPending = true;
+ pendingBytesWrittenValue += numberOfBytesWritten;
+ writeBuffer.free(numberOfBytesWritten);
+ return true;
}
- return notifiedCalled;
+
+ lastError = errorCode;
+ writeBuffer.clear();
+ // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
+ if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
+ qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
+ return false;
}
-bool QWindowsPipeWriter::write(const QByteArray &ba)
+/*!
+ Receives notification that the write operation has completed.
+ */
+bool QWindowsPipeWriter::event(QEvent *e)
{
- if (writeSequenceStarted)
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
+
+/*!
+ Updates the state and emits pending signals in the main thread.
+ Returns \c true, if bytesWritten() was emitted.
+ */
+bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
+{
+ QMutexLocker locker(&mutex);
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ if (!bytesWrittenPending)
return false;
- overlapped.clear();
- buffer = ba;
- stopped = false;
- writeSequenceStarted = true;
- if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
- &overlapped, &writeFileCompleted)) {
- writeSequenceStarted = false;
- buffer.clear();
-
- const DWORD errorCode = GetLastError();
- switch (errorCode) {
- case ERROR_NO_DATA: // "The pipe is being closed."
- // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
- break;
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
- }
+ // Reset the state even if we don't emit bytesWritten().
+ // It's a defined behavior to not re-emit this signal recursively.
+ bytesWrittenPending = false;
+ qint64 numberOfBytesWritten = pendingBytesWrittenValue;
+ pendingBytesWrittenValue = 0;
+
+ locker.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
return false;
+
+ emit canWrite();
+ if (!inBytesWritten) {
+ QScopedValueRollback<bool> guard(inBytesWritten, true);
+ emit bytesWritten(numberOfBytesWritten);
}
return true;
}
-void QWindowsPipeWriter::stop()
+bool QWindowsPipeWriter::waitForNotification(const QDeadlineTimer &deadline)
{
- stopped = true;
- bytesWrittenPending = false;
- pendingBytesWrittenValue = 0;
- if (writeSequenceStarted) {
- if (!CancelIoEx(handle, &overlapped)) {
- const DWORD dwError = GetLastError();
- if (dwError != ERROR_NOT_FOUND) {
- qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
- handle);
- }
- }
- waitForNotification(-1);
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
+ return true;
+
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
+
+ return false;
+}
+
+/*!
+ Waits for the completion of the asynchronous write operation.
+ Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
+ or bytesWritten will be emitted by the event loop (recursive case).
+ */
+bool QWindowsPipeWriter::waitForWrite(int msecs)
+{
+ QDeadlineTimer timer(msecs);
+
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isWriteOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
+
+ return false;
}
QT_END_NAMESPACE
diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h
index 39e8ffe40a..b648d7b846 100644
--- a/src/corelib/io/qwindowspipewriter_p.h
+++ b/src/corelib/io/qwindowspipewriter_p.h
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -54,7 +55,10 @@
#include <QtCore/private/qglobal_p.h>
#include <qelapsedtimer.h>
#include <qobject.h>
-#include <qbytearray.h>
+#include <qdeadlinetimer.h>
+#include <qmutex.h>
+#include <private/qringbuffer_p.h>
+
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
@@ -117,39 +121,37 @@ public:
bool write(const QByteArray &ba);
void stop();
bool waitForWrite(int msecs);
- bool isWriteOperationActive() const { return writeSequenceStarted; }
+ bool isWriteOperationActive() const;
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
- void _q_queueBytesWritten(QPrivateSignal);
-private:
- static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesWritten);
- bool waitForNotification(int timeout);
- void emitPendingBytesWrittenValue();
+protected:
+ bool event(QEvent *e) override;
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeWriter *pipeWriter);
- void clear();
-
- QWindowsPipeWriter *pipeWriter;
- };
+private:
+ void startAsyncWriteLocked();
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
+ bool waitForNotification(const QDeadlineTimer &deadline);
+ bool consumePendingAndEmit(bool allowWinActPosting);
HANDLE handle;
- Overlapped overlapped;
- QByteArray buffer;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
+ QRingBuffer writeBuffer;
qint64 pendingBytesWrittenValue;
+ mutable QMutex mutex;
+ DWORD lastError;
bool stopped;
bool writeSequenceStarted;
- bool notifiedCalled;
bool bytesWrittenPending;
+ bool winEventActPosted;
bool inBytesWritten;
};