diff options
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r-- | src/corelib/io/qwindowspipereader.cpp | 383 |
1 files changed, 127 insertions, 256 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp index 3e1e212df2..c20909766d 100644 --- a/src/corelib/io/qwindowspipereader.cpp +++ b/src/corelib/io/qwindowspipereader.cpp @@ -1,6 +1,6 @@ /**************************************************************************** ** -** Copyright (C) 2020 The Qt Company Ltd. +** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -41,75 +41,61 @@ #include "qiodevice_p.h" #include <qelapsedtimer.h> #include <qscopedvaluerollback.h> -#include <qcoreapplication.h> -#include <QMutexLocker> QT_BEGIN_NAMESPACE +QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader) + : pipeReader(reader) +{ +} + +void QWindowsPipeReader::Overlapped::clear() +{ + ZeroMemory(this, sizeof(OVERLAPPED)); +} + + QWindowsPipeReader::QWindowsPipeReader(QObject *parent) : QObject(parent), handle(INVALID_HANDLE_VALUE), - eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), - syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)), - waitObject(NULL), + overlapped(this), readBufferMaxSize(0), actualReadBufferSize(0), - pendingReadBytes(0), - lastError(ERROR_SUCCESS), stopped(true), readSequenceStarted(false), + notifiedCalled(false), pipeBroken(false), readyReadPending(false), - winEventActPosted(false), inReadyRead(false) { - ZeroMemory(&overlapped, sizeof(OVERLAPPED)); - overlapped.hEvent = eventHandle; - waitObject = CreateThreadpoolWait(waitCallback, this, NULL); - if (waitObject == NULL) - qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed."); + connect(this, &QWindowsPipeReader::_q_queueReadyRead, + this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection); } QWindowsPipeReader::~QWindowsPipeReader() { stop(); - CloseThreadpoolWait(waitObject); - CloseHandle(eventHandle); - CloseHandle(syncHandle); } /*! - \internal Sets the handle to read from. The handle must be valid. - Do not call this function if the pipe is running. */ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) { readBuffer.clear(); actualReadBufferSize = 0; - readyReadPending = false; - pendingReadBytes = 0; handle = hPipeReadEnd; pipeBroken = false; - lastError = ERROR_SUCCESS; } /*! - \internal Stops the asynchronous read sequence. If the read sequence is running then the I/O operation is canceled. */ void QWindowsPipeReader::stop() { - if (stopped) - return; - - mutex.lock(); stopped = true; if (readSequenceStarted) { - // Trying to disable callback before canceling the operation. - // Callback invocation is unnecessary here. - SetThreadpoolWait(waitObject, NULL, NULL); if (!CancelIoEx(handle, &overlapped)) { const DWORD dwError = GetLastError(); if (dwError != ERROR_NOT_FOUND) { @@ -117,33 +103,8 @@ void QWindowsPipeReader::stop() handle); } } - readSequenceStarted = false; + waitForNotification(-1); } - mutex.unlock(); - - WaitForThreadpoolWaitCallbacks(waitObject, TRUE); -} - -/*! - \internal - Sets the size of internal read buffer. - */ -void QWindowsPipeReader::setMaxReadBufferSize(qint64 size) -{ - QMutexLocker locker(&mutex); - readBufferMaxSize = size; -} - -/*! - \internal - Returns \c true if async operation is in progress, there is - pending data to read, or a read error is pending. - */ -bool QWindowsPipeReader::isReadOperationActive() const -{ - QMutexLocker locker(&mutex); - return readSequenceStarted || readyReadPending - || (lastError != ERROR_SUCCESS && !pipeBroken); } /*! @@ -162,7 +123,6 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) if (pipeBroken && actualReadBufferSize == 0) return 0; // signal EOF - mutex.lock(); qint64 readSoFar; // If startAsyncRead() has read data, copy it to its destination. if (maxlen == 1 && actualReadBufferSize > 0) { @@ -173,10 +133,9 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= readSoFar; } - mutex.unlock(); if (!pipeBroken) { - if (!stopped) + if (!readSequenceStarted && !stopped) startAsyncRead(); if (readSoFar == 0) return -2; // signal EWOULDBLOCK @@ -185,220 +144,131 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) return readSoFar; } -/*! - \internal - Returns \c true if a complete line of data can be read from the buffer. - */ bool QWindowsPipeReader::canReadLine() const { - QMutexLocker locker(&mutex); return readBuffer.indexOf('\n', actualReadBufferSize) >= 0; } /*! \internal - Starts an asynchronous read sequence on the pipe. - */ -void QWindowsPipeReader::startAsyncRead() -{ - QMutexLocker locker(&mutex); - - if (readSequenceStarted || lastError != ERROR_SUCCESS) - return; - - stopped = false; - startAsyncReadLocked(); - - // Do not post the event, if the read operation will be completed asynchronously. - if (!readyReadPending && lastError == ERROR_SUCCESS) - return; - - if (!winEventActPosted) { - winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); - } -} - -/*! - \internal - Starts a new read sequence. Thread-safety should be ensured by the caller. + Will be called whenever the read operation completes. */ -void QWindowsPipeReader::startAsyncReadLocked() +void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead) { - const DWORD minReadBufferSize = 4096; - forever { - qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize); - if (lastError != ERROR_SUCCESS) - return; - - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead <= 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; - } - } - - char *ptr = readBuffer.reserve(bytesToRead); - - // ReadFile() returns true, if the read operation completes synchronously. - // We don't need to call GetOverlappedResult() additionally, because - // 'numberOfBytesRead' is valid in this case. - DWORD numberOfBytesRead; - if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) + notifiedCalled = true; + readSequenceStarted = false; + + switch (errorCode) { + case ERROR_SUCCESS: + break; + case ERROR_MORE_DATA: + // This is not an error. We're connected to a message mode + // pipe and the message didn't fit into the pipe's system + // buffer. We will read the remaining data in the next call. + break; + case ERROR_BROKEN_PIPE: + case ERROR_PIPE_NOT_CONNECTED: + pipeBroken = true; + break; + case ERROR_OPERATION_ABORTED: + if (stopped) break; - - readCompleted(ERROR_SUCCESS, numberOfBytesRead); - } - - const DWORD dwError = GetLastError(); - if (dwError == ERROR_IO_PENDING) { - // Operation has been queued and will complete in the future. - readSequenceStarted = true; - SetThreadpoolWait(waitObject, eventHandle, NULL); - } else { - // Any other errors are treated as EOF. - readCompleted(dwError, 0); + Q_FALLTHROUGH(); + default: + emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified")); + pipeBroken = true; + break; } -} - -/*! - \internal - Thread pool callback procedure. - */ -void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, - PTP_WAIT wait, TP_WAIT_RESULT waitResult) -{ - Q_UNUSED(instance); - Q_UNUSED(wait); - Q_UNUSED(waitResult); - QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context); - - // Get the result of the asynchronous operation. - DWORD numberOfBytesTransfered = 0; - DWORD errorCode = ERROR_SUCCESS; - if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped, - &numberOfBytesTransfered, FALSE)) - errorCode = GetLastError(); - - QMutexLocker locker(&pipeReader->mutex); // After the reader was stopped, the only reason why this function can be called is the // completion of a cancellation. No signals should be emitted, and no new read sequence should // be started in this case. - if (pipeReader->stopped) + if (stopped) return; - pipeReader->readSequenceStarted = false; - - // Do not overwrite error code, if error has been detected by - // checkPipeState() in waitForPipeClosed(). - if (pipeReader->lastError != ERROR_SUCCESS) + if (pipeBroken) { + emit pipeClosed(); return; - - pipeReader->readCompleted(errorCode, numberOfBytesTransfered); - if (pipeReader->lastError == ERROR_SUCCESS) - pipeReader->startAsyncReadLocked(); - - if (!pipeReader->winEventActPosted) { - pipeReader->winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct)); - } else { - locker.unlock(); } - SetEvent(pipeReader->syncHandle); -} -/*! - \internal - Will be called whenever the read operation completes. - */ -void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead) -{ - // ERROR_MORE_DATA is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { + actualReadBufferSize += numberOfBytesRead; + readBuffer.truncate(actualReadBufferSize); + startAsyncRead(); + if (!readyReadPending) { readyReadPending = true; - pendingReadBytes += numberOfBytesRead; - readBuffer.truncate(actualReadBufferSize + pendingReadBytes); - } else { - lastError = errorCode; - } -} - -/*! - \internal - Receives notification that the read operation has completed. - */ -bool QWindowsPipeReader::event(QEvent *e) -{ - if (e->type() == QEvent::WinEventAct) { - emitPendingSignals(true); - return true; + emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal()); } - return QObject::event(e); } /*! \internal - Emits pending signals in the main thread. Returns \c true, - if readyRead() was emitted. + Reads data from the pipe into the readbuffer. */ -bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting) +void QWindowsPipeReader::startAsyncRead() { - mutex.lock(); - - // Enable QEvent::WinEventAct posting. - if (allowWinActPosting) - winEventActPosted = false; + const DWORD minReadBufferSize = 4096; + qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize); + if (pipeBroken) + return; - bool emitReadyRead = false; - if (readyReadPending) { - readyReadPending = false; - actualReadBufferSize += pendingReadBytes; - pendingReadBytes = 0; - emitReadyRead = true; + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead <= 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } } - const DWORD dwError = lastError; - mutex.unlock(); + char *ptr = readBuffer.reserve(bytesToRead); - // Disable any further processing, if the pipe was stopped. - if (stopped) - return false; - - if (emitReadyRead && !inReadyRead) { - QScopedValueRollback<bool> guard(inReadyRead, true); - emit readyRead(); - } + stopped = false; + readSequenceStarted = true; + overlapped.clear(); + if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) { + readSequenceStarted = false; - // Trigger 'pipeBroken' only once. - if (dwError != ERROR_SUCCESS && !pipeBroken) { - pipeBroken = true; - if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) - emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals")); - emit pipeClosed(); + const DWORD dwError = GetLastError(); + switch (dwError) { + case ERROR_BROKEN_PIPE: + case ERROR_PIPE_NOT_CONNECTED: + // It may happen, that the other side closes the connection directly + // after writing data. Then we must set the appropriate socket state. + pipeBroken = true; + emit pipeClosed(); + break; + default: + emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); + break; + } } +} - return emitReadyRead; +/*! + \internal + Called when ReadFileEx finished the read operation. + */ +void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, + OVERLAPPED *overlappedBase) +{ + Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase); + overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered); } /*! \internal Returns the number of available bytes in the pipe. - Sets QWindowsPipeReader::lastError if the connection is broken. + Sets QWindowsPipeReader::pipeBroken to true if the connection is broken. */ DWORD QWindowsPipeReader::checkPipeState() { DWORD bytes; if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) return bytes; - - readCompleted(GetLastError(), 0); + if (!pipeBroken) { + pipeBroken = true; + emit pipeClosed(); + } return 0; } @@ -406,21 +276,27 @@ bool QWindowsPipeReader::waitForNotification(int timeout) { QElapsedTimer t; t.start(); + notifiedCalled = false; int msecs = timeout; - do { - DWORD waitRet = WaitForSingleObjectEx(syncHandle, - msecs == -1 ? INFINITE : msecs, TRUE); - if (waitRet == WAIT_OBJECT_0) + while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { + if (notifiedCalled) return true; - if (waitRet != WAIT_IO_COMPLETION) - return false; - - // Some I/O completion routine was called. Wait some more. + // Some other I/O completion routine was called. Wait some more. msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - } while (msecs != 0); + if (!msecs) + break; + } + return notifiedCalled; +} - return false; +void QWindowsPipeReader::emitPendingReadyRead() +{ + if (readyReadPending) { + readyReadPending = false; + QScopedValueRollback<bool> guard(inReadyRead, true); + emit readyRead(); + } } /*! @@ -430,21 +306,25 @@ bool QWindowsPipeReader::waitForNotification(int timeout) */ bool QWindowsPipeReader::waitForReadyRead(int msecs) { - if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize) + if (readyReadPending) { + if (!inReadyRead) + emitPendingReadyRead(); + return true; + } + + if (!readSequenceStarted) return false; - // Prepare handle for waiting. - ResetEvent(syncHandle); + if (!waitForNotification(msecs)) + return false; - // It is necessary to check if there is already data in the queue. - if (emitPendingSignals(false)) + if (readyReadPending) { + if (!inReadyRead) + emitPendingReadyRead(); return true; + } - // Make sure that 'syncHandle' was triggered by the thread pool callback. - if (pipeBroken || !waitForNotification(msecs)) - return false; - - return emitPendingSignals(false); + return false; } /*! @@ -457,18 +337,9 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs) stopWatch.start(); forever { waitForReadyRead(0); - if (pipeBroken) - return true; - - // When the read buffer is full, the read sequence is not running. - // So, we should peek the pipe to detect disconnect. - mutex.lock(); checkPipeState(); - mutex.unlock(); - emitPendingSignals(false); if (pipeBroken) return true; - if (stopWatch.hasExpired(msecs - sleepTime)) return false; Sleep(sleepTime); |