diff options
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r-- | src/corelib/io/qwindowspipereader.cpp | 468 |
1 files changed, 292 insertions, 176 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp index b525e88282..bf03737c39 100644 --- a/src/corelib/io/qwindowspipereader.cpp +++ b/src/corelib/io/qwindowspipereader.cpp @@ -1,6 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -38,58 +39,63 @@ ****************************************************************************/ #include "qwindowspipereader_p.h" -#include "qiodevice_p.h" -#include <qelapsedtimer.h> #include <qscopedvaluerollback.h> +#include <qcoreapplication.h> +#include <QMutexLocker> QT_BEGIN_NAMESPACE static const DWORD minReadBufferSize = 4096; -QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader) - : pipeReader(reader) -{ -} - -void QWindowsPipeReader::Overlapped::clear() -{ - ZeroMemory(this, sizeof(OVERLAPPED)); -} - - QWindowsPipeReader::QWindowsPipeReader(QObject *parent) : QObject(parent), handle(INVALID_HANDLE_VALUE), - overlapped(this), + eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + waitObject(NULL), readBufferMaxSize(0), actualReadBufferSize(0), - bytesPending(0), + pendingReadBytes(0), + lastError(ERROR_SUCCESS), state(Stopped), readSequenceStarted(false), - notifiedCalled(false), pipeBroken(false), readyReadPending(false), + winEventActPosted(false), inReadyRead(false) { - connect(this, &QWindowsPipeReader::_q_queueReadyRead, - this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection); + ZeroMemory(&overlapped, sizeof(OVERLAPPED)); + overlapped.hEvent = eventHandle; + waitObject = CreateThreadpoolWait(waitCallback, this, NULL); + if (waitObject == NULL) + qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed."); } QWindowsPipeReader::~QWindowsPipeReader() { stop(); + + // Wait for thread pool callback to complete, as it can be still + // executing some completion code. + WaitForThreadpoolWaitCallbacks(waitObject, FALSE); + CloseThreadpoolWait(waitObject); + CloseHandle(eventHandle); + CloseHandle(syncHandle); } /*! Sets the handle to read from. The handle must be valid. + Do not call this function while the pipe is running. */ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) { readBuffer.clear(); actualReadBufferSize = 0; - bytesPending = 0; + readyReadPending = false; + pendingReadBytes = 0; handle = hPipeReadEnd; pipeBroken = false; + lastError = ERROR_SUCCESS; } /*! @@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) */ void QWindowsPipeReader::stop() { - state = Stopped; - cancelAsyncRead(); + cancelAsyncRead(Stopped); } /*! @@ -108,16 +113,27 @@ void QWindowsPipeReader::stop() */ void QWindowsPipeReader::drainAndStop() { - state = Draining; - cancelAsyncRead(); + cancelAsyncRead(Draining); + + // Note that signals are not emitted in the call below, as the caller + // is expected to do that synchronously. + consumePending(); } /*! Stops the asynchronous read sequence. */ -void QWindowsPipeReader::cancelAsyncRead() +void QWindowsPipeReader::cancelAsyncRead(State newState) { + if (state != Running) + return; + + QMutexLocker locker(&mutex); + state = newState; if (readSequenceStarted) { + // This can legitimately fail due to the GetOverlappedResult() + // in the callback not being locked. We ignore ERROR_NOT_FOUND + // in this case. if (!CancelIoEx(handle, &overlapped)) { const DWORD dwError = GetLastError(); if (dwError != ERROR_NOT_FOUND) { @@ -125,11 +141,37 @@ void QWindowsPipeReader::cancelAsyncRead() handle); } } - waitForNotification(-1); + + // Wait for callback to complete. + do { + locker.unlock(); + waitForNotification(QDeadlineTimer(-1)); + locker.relock(); + } while (readSequenceStarted); } } /*! + Sets the size of internal read buffer. + */ +void QWindowsPipeReader::setMaxReadBufferSize(qint64 size) +{ + QMutexLocker locker(&mutex); + readBufferMaxSize = size; +} + +/*! + Returns \c true if async operation is in progress, there is + pending data to read, or a read error is pending. +*/ +bool QWindowsPipeReader::isReadOperationActive() const +{ + QMutexLocker locker(&mutex); + return readSequenceStarted || readyReadPending + || (lastError != ERROR_SUCCESS && !pipeBroken); +} + +/*! Returns the number of bytes we've read so far. */ qint64 QWindowsPipeReader::bytesAvailable() const @@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) if (pipeBroken && actualReadBufferSize == 0) return 0; // signal EOF + mutex.lock(); qint64 readSoFar; // If startAsyncRead() has read data, copy it to its destination. if (maxlen == 1 && actualReadBufferSize > 0) { @@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= readSoFar; } + mutex.unlock(); if (!pipeBroken) { if (state == Running) @@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) return readSoFar; } +/*! + Returns \c true if a complete line of data can be read from the buffer. + */ bool QWindowsPipeReader::canReadLine() const { + QMutexLocker locker(&mutex); return readBuffer.indexOf('\n', actualReadBufferSize) >= 0; } /*! - \internal - Will be called whenever the read operation completes. + Starts an asynchronous read sequence on the pipe. */ -void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead) +void QWindowsPipeReader::startAsyncRead() { - notifiedCalled = true; - readSequenceStarted = false; - - switch (errorCode) { - case ERROR_SUCCESS: - break; - case ERROR_MORE_DATA: - // This is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - break; - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - pipeBroken = true; - break; - case ERROR_OPERATION_ABORTED: - if (state != Running) - break; - Q_FALLTHROUGH(); - default: - emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified")); - pipeBroken = true; - break; - } + QMutexLocker locker(&mutex); - // After the reader was stopped, the only reason why this function can be called is the - // completion of a cancellation. No signals should be emitted, and no new read sequence should - // be started in this case. - if (state == Stopped) + if (readSequenceStarted || lastError != ERROR_SUCCESS) return; - if (pipeBroken) { - emitPipeClosed(); + state = Running; + startAsyncReadLocked(); + + // Do not post the event, if the read operation will be completed asynchronously. + if (!readyReadPending && lastError == ERROR_SUCCESS) return; - } - actualReadBufferSize += numberOfBytesRead; - readBuffer.truncate(actualReadBufferSize); + if (!winEventActPosted) { + winEventActPosted = true; + locker.unlock(); + QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); + } else { + locker.unlock(); + } - // Read all pending data from the pipe's buffer in 'Draining' state. - if (state == Draining) { - // Determine the number of pending bytes on the first iteration. - if (bytesPending == 0) - bytesPending = checkPipeState(); - else - bytesPending -= numberOfBytesRead; + SetEvent(syncHandle); +} - if (bytesPending == 0) // all data received - return; // unblock waitForNotification() in cancelAsyncRead() +/*! + Starts a new read sequence. Thread-safety should be ensured + by the caller. + */ +void QWindowsPipeReader::startAsyncReadLocked() +{ + // Determine the number of bytes to read. + qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0); - startAsyncReadHelper(bytesPending); - if (readSequenceStarted) - notifiedCalled = false; // wait for more data + // This can happen only while draining; just do nothing in this case. + if (bytesToRead == 0) return; + + while (lastError == ERROR_SUCCESS) { + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead <= 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } + } + + char *ptr = readBuffer.reserve(bytesToRead); + + // ReadFile() returns true, if the read operation completes synchronously. + // We don't need to call GetOverlappedResult() additionally, because + // 'numberOfBytesRead' is valid in this case. + DWORD numberOfBytesRead; + DWORD errorCode = ERROR_SUCCESS; + if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) { + errorCode = GetLastError(); + if (errorCode == ERROR_IO_PENDING) { + Q_ASSERT(state == Running); + // Operation has been queued and will complete in the future. + readSequenceStarted = true; + SetThreadpoolWait(waitObject, eventHandle, NULL); + return; + } + } + + if (!readCompleted(errorCode, numberOfBytesRead)) + return; + + // In the 'Draining' state, we have to get all the data with one call + // to ReadFile(). Note that message mode pipes are not supported here. + if (state == Draining) { + Q_ASSERT(bytesToRead == qint64(numberOfBytesRead)); + return; + } + + // We need to loop until all pending data has been read and an + // operation is queued for asynchronous completion. + // If the pipe is configured to work in message mode, we read + // the data in chunks. + bytesToRead = qMax(checkPipeState(), minReadBufferSize); } +} - startAsyncRead(); - if (!readyReadPending) { - readyReadPending = true; - emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal()); +/*! + Thread pool callback procedure. + */ +void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult) +{ + Q_UNUSED(instance); + Q_UNUSED(wait); + Q_UNUSED(waitResult); + QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context); + + // Get the result of the asynchronous operation. + DWORD numberOfBytesTransfered = 0; + DWORD errorCode = ERROR_SUCCESS; + if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped, + &numberOfBytesTransfered, FALSE)) + errorCode = GetLastError(); + + pipeReader->mutex.lock(); + + pipeReader->readSequenceStarted = false; + + // Do not overwrite error code, if error has been detected by + // checkPipeState() in waitForPipeClosed(). Also, if the reader was + // stopped, the only reason why this function can be called is the + // completion of a cancellation. No signals should be emitted, and + // no new read sequence should be started in this case. + if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) { + // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation + // specifically for flushing the pipe. + if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED) + errorCode = ERROR_SUCCESS; + + if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered)) + pipeReader->startAsyncReadLocked(); + + if (pipeReader->state == Running && !pipeReader->winEventActPosted) { + pipeReader->winEventActPosted = true; + pipeReader->mutex.unlock(); + QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct)); + } else { + pipeReader->mutex.unlock(); + } + } else { + pipeReader->mutex.unlock(); } + + // We set the event only after unlocking to avoid additional context + // switches due to the released thread immediately running into the lock. + SetEvent(pipeReader->syncHandle); } /*! - \internal - Starts an asynchronous read sequence on the pipe. + Will be called whenever the read operation completes. Returns \c true if + no error occurred; otherwise returns \c false. */ -void QWindowsPipeReader::startAsyncRead() +bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead) { - if (readSequenceStarted) - return; + // ERROR_MORE_DATA is not an error. We're connected to a message mode + // pipe and the message didn't fit into the pipe's system + // buffer. We will read the remaining data in the next call. + if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { + readyReadPending = true; + pendingReadBytes += numberOfBytesRead; + readBuffer.truncate(actualReadBufferSize + pendingReadBytes); + return true; + } - state = Running; - startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize)); + lastError = errorCode; + return false; } /*! - \internal - Starts a new read sequence. + Receives notification that the read operation has completed. */ -void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead) +bool QWindowsPipeReader::event(QEvent *e) { - Q_ASSERT(bytesToRead != 0); + if (e->type() == QEvent::WinEventAct) { + consumePendingAndEmit(true); + return true; + } + return QObject::event(e); +} - if (pipeBroken) - return; +/*! + Updates the read buffer size and emits pending signals in the main thread. + Returns \c true, if readyRead() was emitted. + */ +bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting) +{ + mutex.lock(); - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead <= 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; - } + // Enable QEvent::WinEventAct posting. + if (allowWinActPosting) + winEventActPosted = false; + + const bool emitReadyRead = consumePending(); + const DWORD dwError = lastError; + + mutex.unlock(); + + // Disable any further processing, if the pipe was stopped. + // We are not allowed to emit signals in either 'Stopped' + // or 'Draining' state. + if (state != Running) + return false; + + if (emitReadyRead && !inReadyRead) { + QScopedValueRollback<bool> guard(inReadyRead, true); + emit readyRead(); } - char *ptr = readBuffer.reserve(bytesToRead); - - readSequenceStarted = true; - overlapped.clear(); - if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) { - readSequenceStarted = false; - - const DWORD dwError = GetLastError(); - switch (dwError) { - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - // It may happen, that the other side closes the connection directly - // after writing data. Then we must set the appropriate socket state. - pipeBroken = true; - emit pipeClosed(); - break; - default: - emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); - break; - } + // Trigger 'pipeBroken' only once. + if (dwError != ERROR_SUCCESS && !pipeBroken) { + pipeBroken = true; + if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) + emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit")); + emit pipeClosed(); } + + return emitReadyRead; } /*! - \internal - Called when ReadFileEx finished the read operation. + Updates the read buffer size. Returns \c true, if readyRead() + should be emitted. Thread-safety should be ensured by the caller. */ -void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase) +bool QWindowsPipeReader::consumePending() { - Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase); - overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered); + if (readyReadPending) { + readyReadPending = false; + actualReadBufferSize += pendingReadBytes; + pendingReadBytes = 0; + return true; + } + + return false; } /*! - \internal Returns the number of available bytes in the pipe. - Sets QWindowsPipeReader::pipeBroken to true if the connection is broken. */ DWORD QWindowsPipeReader::checkPipeState() { DWORD bytes; if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) return bytes; - if (!pipeBroken) { - pipeBroken = true; - emitPipeClosed(); - } + + lastError = GetLastError(); return 0; } -bool QWindowsPipeReader::waitForNotification(int timeout) +bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline) { - QElapsedTimer t; - t.start(); - notifiedCalled = false; - int msecs = timeout; - while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { - if (notifiedCalled) + do { + DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE); + if (waitRet == WAIT_OBJECT_0) return true; - // Some other I/O completion routine was called. Wait some more. - msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - if (!msecs) - break; - } - return notifiedCalled; -} + if (waitRet != WAIT_IO_COMPLETION) + return false; -void QWindowsPipeReader::emitPendingReadyRead() -{ - if (readyReadPending) { - readyReadPending = false; - QScopedValueRollback<bool> guard(inReadyRead, true); - emit readyRead(); - } -} + // Some I/O completion routine was called. Wait some more. + } while (!deadline.hasExpired()); -void QWindowsPipeReader::emitPipeClosed() -{ - // We are not allowed to emit signals in either 'Stopped' - // or 'Draining' state. - if (state == Running) - emit pipeClosed(); + return false; } /*! @@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed() */ bool QWindowsPipeReader::waitForReadyRead(int msecs) { - if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); - return true; - } - - if (!readSequenceStarted) - return false; - - if (!waitForNotification(msecs)) - return false; + QDeadlineTimer timer(msecs); - if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); - return true; + // Make sure that 'syncHandle' was triggered by the thread pool callback. + while (isReadOperationActive() && waitForNotification(timer)) { + if (consumePendingAndEmit(false)) + return true; } return false; @@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs) bool QWindowsPipeReader::waitForPipeClosed(int msecs) { const int sleepTime = 10; - QElapsedTimer stopWatch; - stopWatch.start(); + QDeadlineTimer timer(msecs); + + while (waitForReadyRead(timer.remainingTime())) {} + if (pipeBroken) + return true; + + if (timer.hasExpired()) + return false; + + // When the read buffer is full, the read sequence is not running, + // so we need to peek the pipe to detect disconnection. forever { - waitForReadyRead(0); checkPipeState(); + consumePendingAndEmit(false); if (pipeBroken) return true; - if (stopWatch.hasExpired(msecs - sleepTime)) + + if (timer.hasExpired()) return false; + Sleep(sleepTime); } } |