summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipereader.cpp
diff options
context:
space:
mode:
authorAlex Trotsenko <alex1973tr@gmail.com>2020-10-05 19:22:49 +0300
committerAlex Trotsenko <alex1973tr@gmail.com>2020-11-17 12:45:50 +0200
commitee122077b09430da54ca09750589b37326a22d85 (patch)
treefa06b0006bddc56fa68045d827275dc52c14f1ee /src/corelib/io/qwindowspipereader.cpp
parent6be39809b038768a665b0e29a3a3668fdc424d9a (diff)
Allow QWindowsPipe{Reader,Writer} to work with foreign event loops
When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution , but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I1cd87c07db39f3b46a2683ce236d7eb67b5be549 Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r--src/corelib/io/qwindowspipereader.cpp383
1 files changed, 256 insertions, 127 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index c20909766d..3e1e212df2 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -41,61 +41,75 @@
#include "qiodevice_p.h"
#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
+ \internal
Sets the handle to read from. The handle must be valid.
+ Do not call this function if the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
+ \internal
Stops the asynchronous read sequence.
If the read sequence is running then the I/O operation is canceled.
*/
void QWindowsPipeReader::stop()
{
+ if (stopped)
+ return;
+
+ mutex.lock();
stopped = true;
if (readSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -103,8 +117,33 @@ void QWindowsPipeReader::stop()
handle);
}
}
- waitForNotification(-1);
+ readSequenceStarted = false;
}
+ mutex.unlock();
+
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
+}
+
+/*!
+ \internal
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ \internal
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+ */
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
}
/*!
@@ -123,6 +162,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -133,9 +173,10 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
- if (!readSequenceStarted && !stopped)
+ if (!stopped)
startAsyncRead();
if (readSoFar == 0)
return -2; // signal EWOULDBLOCK
@@ -144,131 +185,220 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ \internal
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
\internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emit pipeClosed();
+ stopped = false;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
}
}
/*!
\internal
- Reads data from the pipe into the readbuffer.
+ Starts a new read sequence. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::startAsyncRead()
+void QWindowsPipeReader::startAsyncReadLocked()
{
const DWORD minReadBufferSize = 4096;
- qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
- if (pipeBroken)
- return;
-
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
+ forever {
+ qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
+ if (lastError != ERROR_SUCCESS)
return;
+
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
}
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped))
+ break;
+
+ readCompleted(ERROR_SUCCESS, numberOfBytesRead);
+ }
+
+ const DWORD dwError = GetLastError();
+ if (dwError == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ } else {
+ // Any other errors are treated as EOF.
+ readCompleted(dwError, 0);
}
+}
- char *ptr = readBuffer.reserve(bytesToRead);
+/*!
+ \internal
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
- stopped = false;
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ QMutexLocker locker(&pipeReader->mutex);
+
+ // After the reader was stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and no new read sequence should
+ // be started in this case.
+ if (pipeReader->stopped)
+ return;
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed().
+ if (pipeReader->lastError != ERROR_SUCCESS)
+ return;
+
+ pipeReader->readCompleted(errorCode, numberOfBytesTransfered);
+ if (pipeReader->lastError == ERROR_SUCCESS)
+ pipeReader->startAsyncReadLocked();
+
+ if (!pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
+ SetEvent(pipeReader->syncHandle);
+}
+
+/*!
+ \internal
+ Will be called whenever the read operation completes.
+ */
+void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
+{
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ } else {
+ lastError = errorCode;
+ }
+}
+
+/*!
+ \internal
+ Receives notification that the read operation has completed.
+ */
+bool QWindowsPipeReader::event(QEvent *e)
+{
+ if (e->type() == QEvent::WinEventAct) {
+ emitPendingSignals(true);
+ return true;
}
+ return QObject::event(e);
}
/*!
\internal
- Called when ReadFileEx finished the read operation.
+ Emits pending signals in the main thread. Returns \c true,
+ if readyRead() was emitted.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting)
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ mutex.lock();
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ bool emitReadyRead = false;
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ emitReadyRead = true;
+ }
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
+ }
+
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals"));
+ emit pipeClosed();
+ }
+
+ return emitReadyRead;
}
/*!
\internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
+ Sets QWindowsPipeReader::lastError if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emit pipeClosed();
- }
+
+ readCompleted(GetLastError(), 0);
return 0;
}
@@ -276,27 +406,21 @@ bool QWindowsPipeReader::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
- notifiedCalled = false;
int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle,
+ msecs == -1 ? INFINITE : msecs, TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ } while (msecs != 0);
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
+ return false;
}
/*!
@@ -306,25 +430,21 @@ void QWindowsPipeReader::emitPendingReadyRead()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
+ if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize)
return false;
- if (!waitForNotification(msecs))
- return false;
+ // Prepare handle for waiting.
+ ResetEvent(syncHandle);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
+ // It is necessary to check if there is already data in the queue.
+ if (emitPendingSignals(false))
return true;
- }
- return false;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ if (pipeBroken || !waitForNotification(msecs))
+ return false;
+
+ return emitPendingSignals(false);
}
/*!
@@ -337,9 +457,18 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs)
stopWatch.start();
forever {
waitForReadyRead(0);
+ if (pipeBroken)
+ return true;
+
+ // When the read buffer is full, the read sequence is not running.
+ // So, we should peek the pipe to detect disconnect.
+ mutex.lock();
checkPipeState();
+ mutex.unlock();
+ emitPendingSignals(false);
if (pipeBroken)
return true;
+
if (stopWatch.hasExpired(msecs - sleepTime))
return false;
Sleep(sleepTime);