summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipereader.cpp
diff options
context:
space:
mode:
authorAlex Trotsenko <alex1973tr@gmail.com>2021-01-23 13:00:22 +0200
committerAlex Trotsenko <alex1973tr@gmail.com>2021-03-02 22:53:06 +0200
commitf265c87e015c26225b80297f5c9f430ea7030214 (patch)
tree29e7fedb3e6e51e0cc634852079e998c8705a921 /src/corelib/io/qwindowspipereader.cpp
parentd316ae8e830b8b86f81307d35583f20773da6a55 (diff)
Allow QWindowsPipe{Reader|Writer} to work with foreign event loops, take 2
When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution, but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I66443c3021d6ba98639a214c3e768be97d2cf14b Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r--src/corelib/io/qwindowspipereader.cpp468
1 files changed, 292 insertions, 176 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index b525e88282..bf03737c39 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,58 +39,63 @@
****************************************************************************/
#include "qwindowspipereader_p.h"
-#include "qiodevice_p.h"
-#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
static const DWORD minReadBufferSize = 4096;
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
- bytesPending(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
state(Stopped),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+
+ // Wait for thread pool callback to complete, as it can be still
+ // executing some completion code.
+ WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
Sets the handle to read from. The handle must be valid.
+ Do not call this function while the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
- bytesPending = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
@@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
*/
void QWindowsPipeReader::stop()
{
- state = Stopped;
- cancelAsyncRead();
+ cancelAsyncRead(Stopped);
}
/*!
@@ -108,16 +113,27 @@ void QWindowsPipeReader::stop()
*/
void QWindowsPipeReader::drainAndStop()
{
- state = Draining;
- cancelAsyncRead();
+ cancelAsyncRead(Draining);
+
+ // Note that signals are not emitted in the call below, as the caller
+ // is expected to do that synchronously.
+ consumePending();
}
/*!
Stops the asynchronous read sequence.
*/
-void QWindowsPipeReader::cancelAsyncRead()
+void QWindowsPipeReader::cancelAsyncRead(State newState)
{
+ if (state != Running)
+ return;
+
+ QMutexLocker locker(&mutex);
+ state = newState;
if (readSequenceStarted) {
+ // This can legitimately fail due to the GetOverlappedResult()
+ // in the callback not being locked. We ignore ERROR_NOT_FOUND
+ // in this case.
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -125,11 +141,37 @@ void QWindowsPipeReader::cancelAsyncRead()
handle);
}
}
- waitForNotification(-1);
+
+ // Wait for callback to complete.
+ do {
+ locker.unlock();
+ waitForNotification(QDeadlineTimer(-1));
+ locker.relock();
+ } while (readSequenceStarted);
}
}
/*!
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+*/
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
+}
+
+/*!
Returns the number of bytes we've read so far.
*/
qint64 QWindowsPipeReader::bytesAvailable() const
@@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
if (state == Running)
@@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
- \internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (state != Running)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (state == Stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emitPipeClosed();
+ state = Running;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
- // Read all pending data from the pipe's buffer in 'Draining' state.
- if (state == Draining) {
- // Determine the number of pending bytes on the first iteration.
- if (bytesPending == 0)
- bytesPending = checkPipeState();
- else
- bytesPending -= numberOfBytesRead;
+ SetEvent(syncHandle);
+}
- if (bytesPending == 0) // all data received
- return; // unblock waitForNotification() in cancelAsyncRead()
+/*!
+ Starts a new read sequence. Thread-safety should be ensured
+ by the caller.
+ */
+void QWindowsPipeReader::startAsyncReadLocked()
+{
+ // Determine the number of bytes to read.
+ qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
- startAsyncReadHelper(bytesPending);
- if (readSequenceStarted)
- notifiedCalled = false; // wait for more data
+ // This can happen only while draining; just do nothing in this case.
+ if (bytesToRead == 0)
return;
+
+ while (lastError == ERROR_SUCCESS) {
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
+ }
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ Q_ASSERT(state == Running);
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!readCompleted(errorCode, numberOfBytesRead))
+ return;
+
+ // In the 'Draining' state, we have to get all the data with one call
+ // to ReadFile(). Note that message mode pipes are not supported here.
+ if (state == Draining) {
+ Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
+ return;
+ }
+
+ // We need to loop until all pending data has been read and an
+ // operation is queued for asynchronous completion.
+ // If the pipe is configured to work in message mode, we read
+ // the data in chunks.
+ bytesToRead = qMax(checkPipeState(), minReadBufferSize);
}
+}
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+/*!
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ pipeReader->mutex.lock();
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed(). Also, if the reader was
+ // stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and
+ // no new read sequence should be started in this case.
+ if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
+ // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
+ // specifically for flushing the pipe.
+ if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
+ errorCode = ERROR_SUCCESS;
+
+ if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
+ pipeReader->startAsyncReadLocked();
+
+ if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ pipeReader->mutex.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ pipeReader->mutex.unlock();
+ }
+ } else {
+ pipeReader->mutex.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeReader->syncHandle);
}
/*!
- \internal
- Starts an asynchronous read sequence on the pipe.
+ Will be called whenever the read operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
*/
-void QWindowsPipeReader::startAsyncRead()
+bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
{
- if (readSequenceStarted)
- return;
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ return true;
+ }
- state = Running;
- startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize));
+ lastError = errorCode;
+ return false;
}
/*!
- \internal
- Starts a new read sequence.
+ Receives notification that the read operation has completed.
*/
-void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead)
+bool QWindowsPipeReader::event(QEvent *e)
{
- Q_ASSERT(bytesToRead != 0);
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
- if (pipeBroken)
- return;
+/*!
+ Updates the read buffer size and emits pending signals in the main thread.
+ Returns \c true, if readyRead() was emitted.
+ */
+bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
+{
+ mutex.lock();
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
- return;
- }
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ const bool emitReadyRead = consumePending();
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ // We are not allowed to emit signals in either 'Stopped'
+ // or 'Draining' state.
+ if (state != Running)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
}
- char *ptr = readBuffer.reserve(bytesToRead);
-
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
-
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit"));
+ emit pipeClosed();
}
+
+ return emitReadyRead;
}
/*!
- \internal
- Called when ReadFileEx finished the read operation.
+ Updates the read buffer size. Returns \c true, if readyRead()
+ should be emitted. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::consumePending()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ return true;
+ }
+
+ return false;
}
/*!
- \internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emitPipeClosed();
- }
+
+ lastError = GetLastError();
return 0;
}
-bool QWindowsPipeReader::waitForNotification(int timeout)
+bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
-}
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
-void QWindowsPipeReader::emitPipeClosed()
-{
- // We are not allowed to emit signals in either 'Stopped'
- // or 'Draining' state.
- if (state == Running)
- emit pipeClosed();
+ return false;
}
/*!
@@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ QDeadlineTimer timer(msecs);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isReadOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
return false;
@@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs)
bool QWindowsPipeReader::waitForPipeClosed(int msecs)
{
const int sleepTime = 10;
- QElapsedTimer stopWatch;
- stopWatch.start();
+ QDeadlineTimer timer(msecs);
+
+ while (waitForReadyRead(timer.remainingTime())) {}
+ if (pipeBroken)
+ return true;
+
+ if (timer.hasExpired())
+ return false;
+
+ // When the read buffer is full, the read sequence is not running,
+ // so we need to peek the pipe to detect disconnection.
forever {
- waitForReadyRead(0);
checkPipeState();
+ consumePendingAndEmit(false);
if (pipeBroken)
return true;
- if (stopWatch.hasExpired(msecs - sleepTime))
+
+ if (timer.hasExpired())
return false;
+
Sleep(sleepTime);
}
}