summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipereader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r--src/corelib/io/qwindowspipereader.cpp383
1 files changed, 256 insertions, 127 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index c20909766d..3e1e212df2 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -41,61 +41,75 @@
#include "qiodevice_p.h"
#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
+ \internal
Sets the handle to read from. The handle must be valid.
+ Do not call this function if the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
+ \internal
Stops the asynchronous read sequence.
If the read sequence is running then the I/O operation is canceled.
*/
void QWindowsPipeReader::stop()
{
+ if (stopped)
+ return;
+
+ mutex.lock();
stopped = true;
if (readSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -103,8 +117,33 @@ void QWindowsPipeReader::stop()
handle);
}
}
- waitForNotification(-1);
+ readSequenceStarted = false;
}
+ mutex.unlock();
+
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
+}
+
+/*!
+ \internal
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ \internal
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+ */
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
}
/*!
@@ -123,6 +162,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -133,9 +173,10 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
- if (!readSequenceStarted && !stopped)
+ if (!stopped)
startAsyncRead();
if (readSoFar == 0)
return -2; // signal EWOULDBLOCK
@@ -144,131 +185,220 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ \internal
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
\internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emit pipeClosed();
+ stopped = false;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
}
}
/*!
\internal
- Reads data from the pipe into the readbuffer.
+ Starts a new read sequence. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::startAsyncRead()
+void QWindowsPipeReader::startAsyncReadLocked()
{
const DWORD minReadBufferSize = 4096;
- qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
- if (pipeBroken)
- return;
-
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
+ forever {
+ qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
+ if (lastError != ERROR_SUCCESS)
return;
+
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
}
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped))
+ break;
+
+ readCompleted(ERROR_SUCCESS, numberOfBytesRead);
+ }
+
+ const DWORD dwError = GetLastError();
+ if (dwError == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ } else {
+ // Any other errors are treated as EOF.
+ readCompleted(dwError, 0);
}
+}
- char *ptr = readBuffer.reserve(bytesToRead);
+/*!
+ \internal
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
- stopped = false;
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ QMutexLocker locker(&pipeReader->mutex);
+
+ // After the reader was stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and no new read sequence should
+ // be started in this case.
+ if (pipeReader->stopped)
+ return;
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed().
+ if (pipeReader->lastError != ERROR_SUCCESS)
+ return;
+
+ pipeReader->readCompleted(errorCode, numberOfBytesTransfered);
+ if (pipeReader->lastError == ERROR_SUCCESS)
+ pipeReader->startAsyncReadLocked();
+
+ if (!pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
+ SetEvent(pipeReader->syncHandle);
+}
+
+/*!
+ \internal
+ Will be called whenever the read operation completes.
+ */
+void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
+{
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ } else {
+ lastError = errorCode;
+ }
+}
+
+/*!
+ \internal
+ Receives notification that the read operation has completed.
+ */
+bool QWindowsPipeReader::event(QEvent *e)
+{
+ if (e->type() == QEvent::WinEventAct) {
+ emitPendingSignals(true);
+ return true;
}
+ return QObject::event(e);
}
/*!
\internal
- Called when ReadFileEx finished the read operation.
+ Emits pending signals in the main thread. Returns \c true,
+ if readyRead() was emitted.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting)
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ mutex.lock();
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ bool emitReadyRead = false;
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ emitReadyRead = true;
+ }
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
+ }
+
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals"));
+ emit pipeClosed();
+ }
+
+ return emitReadyRead;
}
/*!
\internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
+ Sets QWindowsPipeReader::lastError if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emit pipeClosed();
- }
+
+ readCompleted(GetLastError(), 0);
return 0;
}
@@ -276,27 +406,21 @@ bool QWindowsPipeReader::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
- notifiedCalled = false;
int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle,
+ msecs == -1 ? INFINITE : msecs, TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ } while (msecs != 0);
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
+ return false;
}
/*!
@@ -306,25 +430,21 @@ void QWindowsPipeReader::emitPendingReadyRead()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
+ if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize)
return false;
- if (!waitForNotification(msecs))
- return false;
+ // Prepare handle for waiting.
+ ResetEvent(syncHandle);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
+ // It is necessary to check if there is already data in the queue.
+ if (emitPendingSignals(false))
return true;
- }
- return false;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ if (pipeBroken || !waitForNotification(msecs))
+ return false;
+
+ return emitPendingSignals(false);
}
/*!
@@ -337,9 +457,18 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs)
stopWatch.start();
forever {
waitForReadyRead(0);
+ if (pipeBroken)
+ return true;
+
+ // When the read buffer is full, the read sequence is not running.
+ // So, we should peek the pipe to detect disconnect.
+ mutex.lock();
checkPipeState();
+ mutex.unlock();
+ emitPendingSignals(false);
if (pipeBroken)
return true;
+
if (stopWatch.hasExpired(msecs - sleepTime))
return false;
Sleep(sleepTime);