diff options
Diffstat (limited to 'src/corelib/io/qwindowspipereader.cpp')
-rw-r--r-- | src/corelib/io/qwindowspipereader.cpp | 580 |
1 files changed, 369 insertions, 211 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp index 1f03ac5d5a..31d0dc1417 100644 --- a/src/corelib/io/qwindowspipereader.cpp +++ b/src/corelib/io/qwindowspipereader.cpp @@ -1,92 +1,66 @@ -/**************************************************************************** -** -** Copyright (C) 2016 The Qt Company Ltd. -** Contact: https://www.qt.io/licensing/ -** -** This file is part of the QtCore module of the Qt Toolkit. -** -** $QT_BEGIN_LICENSE:LGPL$ -** Commercial License Usage -** Licensees holding valid commercial Qt licenses may use this file in -** accordance with the commercial license agreement provided with the -** Software or, alternatively, in accordance with the terms contained in -** a written agreement between you and The Qt Company. For licensing terms -** and conditions see https://www.qt.io/terms-conditions. For further -** information use the contact form at https://www.qt.io/contact-us. -** -** GNU Lesser General Public License Usage -** Alternatively, this file may be used under the terms of the GNU Lesser -** General Public License version 3 as published by the Free Software -** Foundation and appearing in the file LICENSE.LGPL3 included in the -** packaging of this file. Please review the following information to -** ensure the GNU Lesser General Public License version 3 requirements -** will be met: https://www.gnu.org/licenses/lgpl-3.0.html. -** -** GNU General Public License Usage -** Alternatively, this file may be used under the terms of the GNU -** General Public License version 2.0 or (at your option) the GNU General -** Public license version 3 or any later version approved by the KDE Free -** Qt Foundation. The licenses are as published by the Free Software -** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3 -** included in the packaging of this file. Please review the following -** information to ensure the GNU General Public License requirements will -** be met: https://www.gnu.org/licenses/gpl-2.0.html and -** https://www.gnu.org/licenses/gpl-3.0.html. -** -** $QT_END_LICENSE$ -** -****************************************************************************/ +// Copyright (C) 2016 The Qt Company Ltd. +// Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> +// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only #include "qwindowspipereader_p.h" -#include "qiodevice_p.h" -#include <qelapsedtimer.h> -#include <qscopedvaluerollback.h> +#include <qcoreapplication.h> +#include <QMutexLocker> +#include <QPointer> QT_BEGIN_NAMESPACE -QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader) - : pipeReader(reader) -{ -} - -void QWindowsPipeReader::Overlapped::clear() -{ - ZeroMemory(this, sizeof(OVERLAPPED)); -} +using namespace Qt::StringLiterals; +static const DWORD minReadBufferSize = 4096; QWindowsPipeReader::QWindowsPipeReader(QObject *parent) : QObject(parent), handle(INVALID_HANDLE_VALUE), - overlapped(nullptr), + eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)), + waitObject(NULL), readBufferMaxSize(0), actualReadBufferSize(0), - stopped(true), + pendingReadBytes(0), + lastError(ERROR_SUCCESS), + state(Stopped), readSequenceStarted(false), - notifiedCalled(false), - pipeBroken(false), + pipeBroken(true), readyReadPending(false), - inReadyRead(false) + winEventActPosted(false) { - connect(this, &QWindowsPipeReader::_q_queueReadyRead, - this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection); + ZeroMemory(&overlapped, sizeof(OVERLAPPED)); + overlapped.hEvent = eventHandle; + waitObject = CreateThreadpoolWait(waitCallback, this, NULL); + if (waitObject == NULL) + qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed."); } QWindowsPipeReader::~QWindowsPipeReader() { stop(); - delete overlapped; + + // Wait for thread pool callback to complete, as it can be still + // executing some completion code. + WaitForThreadpoolWaitCallbacks(waitObject, FALSE); + CloseThreadpoolWait(waitObject); + CloseHandle(eventHandle); + CloseHandle(syncHandle); } /*! Sets the handle to read from. The handle must be valid. + Do not call this function while the pipe is running. */ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) { readBuffer.clear(); actualReadBufferSize = 0; + readyReadPending = false; + pendingReadBytes = 0; handle = hPipeReadEnd; pipeBroken = false; + lastError = ERROR_SUCCESS; } /*! @@ -95,19 +69,90 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) */ void QWindowsPipeReader::stop() { - stopped = true; + cancelAsyncRead(Stopped); + pipeBroken = true; +} + +/*! + Stops the asynchronous read sequence. + Reads all pending bytes into the internal buffer. + */ +void QWindowsPipeReader::drainAndStop() +{ + cancelAsyncRead(Draining); + pipeBroken = true; +} + +/*! + Stops the asynchronous read sequence. + Clears the internal buffer and discards any pending data. + */ +void QWindowsPipeReader::stopAndClear() +{ + cancelAsyncRead(Stopped); + readBuffer.clear(); + actualReadBufferSize = 0; + // QLocalSocket is supposed to write data in the 'Closing' + // state, so we don't set 'pipeBroken' flag here. Also, avoid + // setting this flag in checkForReadyRead(). + lastError = ERROR_SUCCESS; +} + +/*! + Stops the asynchronous read sequence. + */ +void QWindowsPipeReader::cancelAsyncRead(State newState) +{ + if (state != Running) + return; + + mutex.lock(); + state = newState; if (readSequenceStarted) { - overlapped->pipeReader = nullptr; - if (!CancelIoEx(handle, overlapped)) { + // This can legitimately fail due to the GetOverlappedResult() + // in the callback not being locked. We ignore ERROR_NOT_FOUND + // in this case. + if (!CancelIoEx(handle, &overlapped)) { const DWORD dwError = GetLastError(); if (dwError != ERROR_NOT_FOUND) { qErrnoWarning(dwError, "QWindowsPipeReader: CancelIoEx on handle %p failed.", handle); } } - overlapped = nullptr; // The object will be deleted in the I/O callback. - readSequenceStarted = false; + + // Wait for callback to complete. + do { + mutex.unlock(); + waitForNotification(); + mutex.lock(); + } while (readSequenceStarted); } + mutex.unlock(); + + // Finish reading to keep the class state consistent. Note that + // signals are not emitted in the call below, as the caller is + // expected to do that synchronously. + consumePending(); +} + +/*! + Sets the size of internal read buffer. + */ +void QWindowsPipeReader::setMaxReadBufferSize(qint64 size) +{ + QMutexLocker locker(&mutex); + readBufferMaxSize = size; +} + +/*! + Returns \c true if async operation is in progress, there is + pending data to read, or a read error is pending. +*/ +bool QWindowsPipeReader::isReadOperationActive() const +{ + QMutexLocker locker(&mutex); + return readSequenceStarted || readyReadPending + || (lastError != ERROR_SUCCESS && !pipeBroken); } /*! @@ -123,10 +168,9 @@ qint64 QWindowsPipeReader::bytesAvailable() const */ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) { - if (pipeBroken && actualReadBufferSize == 0) - return 0; // signal EOF - + QMutexLocker locker(&mutex); qint64 readSoFar; + // If startAsyncRead() has read data, copy it to its destination. if (maxlen == 1 && actualReadBufferSize > 0) { *data = readBuffer.getChar(); @@ -138,8 +182,31 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) } if (!pipeBroken) { - if (!readSequenceStarted && !stopped) - startAsyncRead(); + startAsyncReadHelper(&locker); + if (readSoFar == 0) + return -2; // signal EWOULDBLOCK + } + + return readSoFar; +} + +/*! + Reads a line from the internal buffer, but no more than \c{maxlen} + characters. A terminating '\0' byte is always appended to \c{data}, + so \c{maxlen} must be larger than 1. + */ +qint64 QWindowsPipeReader::readLine(char *data, qint64 maxlen) +{ + QMutexLocker locker(&mutex); + qint64 readSoFar = 0; + + if (actualReadBufferSize > 0) { + readSoFar = readBuffer.readLine(data, qMin(actualReadBufferSize + 1, maxlen)); + actualReadBufferSize -= readSoFar; + } + + if (!pipeBroken) { + startAsyncReadHelper(&locker); if (readSoFar == 0) return -2; // signal EWOULDBLOCK } @@ -147,188 +214,269 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) return readSoFar; } +/*! + Skips up to \c{maxlen} bytes from the internal read buffer. + */ +qint64 QWindowsPipeReader::skip(qint64 maxlen) +{ + QMutexLocker locker(&mutex); + + const qint64 skippedSoFar = readBuffer.skip(qMin(actualReadBufferSize, maxlen)); + actualReadBufferSize -= skippedSoFar; + + if (!pipeBroken) { + startAsyncReadHelper(&locker); + if (skippedSoFar == 0) + return -2; // signal EWOULDBLOCK + } + + return skippedSoFar; +} + +/*! + Returns \c true if a complete line of data can be read from the buffer. + */ bool QWindowsPipeReader::canReadLine() const { + QMutexLocker locker(&mutex); return readBuffer.indexOf('\n', actualReadBufferSize) >= 0; } /*! - \internal - Will be called whenever the read operation completes. + Starts an asynchronous read sequence on the pipe. */ -void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead) +void QWindowsPipeReader::startAsyncRead() { - notifiedCalled = true; - readSequenceStarted = false; - - switch (errorCode) { - case ERROR_SUCCESS: - break; - case ERROR_MORE_DATA: - // This is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - break; - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - pipeBroken = true; - break; - case ERROR_OPERATION_ABORTED: - if (stopped) - break; - Q_FALLTHROUGH(); - default: - emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified")); - pipeBroken = true; - break; - } + QMutexLocker locker(&mutex); + startAsyncReadHelper(&locker); +} - // After the reader was stopped, the only reason why this function can be called is the - // completion of a cancellation. No signals should be emitted, and no new read sequence should - // be started in this case. - if (stopped) +void QWindowsPipeReader::startAsyncReadHelper(QMutexLocker<QMutex> *locker) +{ + if (readSequenceStarted || lastError != ERROR_SUCCESS) return; - if (pipeBroken) { - emit pipeClosed(); + state = Running; + startAsyncReadLocked(); + + // Do not post the event, if the read operation will be completed asynchronously. + if (!readyReadPending && lastError == ERROR_SUCCESS) return; - } - actualReadBufferSize += numberOfBytesRead; - readBuffer.truncate(actualReadBufferSize); - startAsyncRead(); - if (!readyReadPending) { - readyReadPending = true; - emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal()); + if (!winEventActPosted) { + winEventActPosted = true; + locker->unlock(); + QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); + } else { + locker->unlock(); } + + SetEvent(syncHandle); } /*! - \internal - Reads data from the pipe into the readbuffer. + Starts a new read sequence. Thread-safety should be ensured + by the caller. */ -void QWindowsPipeReader::startAsyncRead() +void QWindowsPipeReader::startAsyncReadLocked() { - const DWORD minReadBufferSize = 4096; - qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize); - if (pipeBroken) + // Determine the number of bytes to read. + qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0); + + // This can happen only while draining; just do nothing in this case. + if (bytesToRead == 0) return; - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead <= 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; + while (lastError == ERROR_SUCCESS) { + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead <= 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } } - } - char *ptr = readBuffer.reserve(bytesToRead); - - stopped = false; - readSequenceStarted = true; - if (!overlapped) - overlapped = new Overlapped(this); - overlapped->clear(); - if (!ReadFileEx(handle, ptr, bytesToRead, overlapped, &readFileCompleted)) { - readSequenceStarted = false; - - const DWORD dwError = GetLastError(); - switch (dwError) { - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - // It may happen, that the other side closes the connection directly - // after writing data. Then we must set the appropriate socket state. - pipeBroken = true; - emit pipeClosed(); - break; - default: - emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); - break; + char *ptr = readBuffer.reserve(bytesToRead); + + // ReadFile() returns true, if the read operation completes synchronously. + // We don't need to call GetOverlappedResult() additionally, because + // 'numberOfBytesRead' is valid in this case. + DWORD numberOfBytesRead; + DWORD errorCode = ERROR_SUCCESS; + if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) { + errorCode = GetLastError(); + if (errorCode == ERROR_IO_PENDING) { + Q_ASSERT(state == Running); + // Operation has been queued and will complete in the future. + readSequenceStarted = true; + SetThreadpoolWait(waitObject, eventHandle, NULL); + return; + } + } + + if (!readCompleted(errorCode, numberOfBytesRead)) + return; + + // In the 'Draining' state, we have to get all the data with one call + // to ReadFile(). Note that message mode pipes are not supported here. + if (state == Draining) { + Q_ASSERT(bytesToRead == qint64(numberOfBytesRead)); + return; } + + // We need to loop until all pending data has been read and an + // operation is queued for asynchronous completion. + // If the pipe is configured to work in message mode, we read + // the data in chunks. + bytesToRead = qMax(checkPipeState(), minReadBufferSize); } } /*! \internal - Called when ReadFileEx finished the read operation. + + Thread pool callback procedure. */ -void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase) +void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult) { - Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase); - if (overlapped->pipeReader) - overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered); - else - delete overlapped; + Q_UNUSED(instance); + Q_UNUSED(wait); + Q_UNUSED(waitResult); + QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context); + + // Get the result of the asynchronous operation. + DWORD numberOfBytesTransfered = 0; + DWORD errorCode = ERROR_SUCCESS; + if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped, + &numberOfBytesTransfered, FALSE)) + errorCode = GetLastError(); + + pipeReader->mutex.lock(); + + pipeReader->readSequenceStarted = false; + + // Do not overwrite error code, if error has been detected by + // checkPipeState() in waitForPipeClosed(). Also, if the reader was + // stopped, the only reason why this function can be called is the + // completion of a cancellation. No signals should be emitted, and + // no new read sequence should be started in this case. + if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) { + // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation + // specifically for flushing the pipe. + if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED) + errorCode = ERROR_SUCCESS; + + if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered)) + pipeReader->startAsyncReadLocked(); + + if (pipeReader->state == Running && !pipeReader->winEventActPosted) { + pipeReader->winEventActPosted = true; + pipeReader->mutex.unlock(); + QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct)); + } else { + pipeReader->mutex.unlock(); + } + } else { + pipeReader->mutex.unlock(); + } + + // We set the event only after unlocking to avoid additional context + // switches due to the released thread immediately running into the lock. + SetEvent(pipeReader->syncHandle); } /*! - \internal - Returns the number of available bytes in the pipe. - Sets QWindowsPipeReader::pipeBroken to true if the connection is broken. + Will be called whenever the read operation completes. Returns \c true if + no error occurred; otherwise returns \c false. */ -DWORD QWindowsPipeReader::checkPipeState() +bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead) { - DWORD bytes; - if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) - return bytes; - if (!pipeBroken) { - pipeBroken = true; - emit pipeClosed(); + // ERROR_MORE_DATA is not an error. We're connected to a message mode + // pipe and the message didn't fit into the pipe's system + // buffer. We will read the remaining data in the next call. + if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { + readyReadPending = true; + pendingReadBytes += numberOfBytesRead; + readBuffer.truncate(actualReadBufferSize + pendingReadBytes); + return true; } - return 0; -} -bool QWindowsPipeReader::waitForNotification(int timeout) -{ - QElapsedTimer t; - t.start(); - notifiedCalled = false; - int msecs = timeout; - while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { - if (notifiedCalled) - return true; - - // Some other I/O completion routine was called. Wait some more. - msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - if (!msecs) - break; - } - return notifiedCalled; + lastError = errorCode; + return false; } -void QWindowsPipeReader::emitPendingReadyRead() +/*! + Receives notification that the read operation has completed. + */ +bool QWindowsPipeReader::event(QEvent *e) { - if (readyReadPending) { - readyReadPending = false; - QScopedValueRollback<bool> guard(inReadyRead, true); - emit readyRead(); + if (e->type() == QEvent::WinEventAct) { + consumePendingAndEmit(true); + return true; } + return QObject::event(e); } /*! - Waits for the completion of the asynchronous read operation. - Returns \c true, if we've emitted the readyRead signal (non-recursive case) - or readyRead will be emitted by the event loop (recursive case). + Updates the read buffer size and emits pending signals in the main thread. + Returns \c true, if readyRead() was emitted. */ -bool QWindowsPipeReader::waitForReadyRead(int msecs) +bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting) { - if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); - return true; - } + ResetEvent(syncHandle); + mutex.lock(); - if (!readSequenceStarted) - return false; + // Enable QEvent::WinEventAct posting. + if (allowWinActPosting) + winEventActPosted = false; + + const bool emitReadyRead = consumePending(); + const DWORD dwError = lastError; - if (!waitForNotification(msecs)) + mutex.unlock(); + + // Trigger 'pipeBroken' only once. This flag must be updated before + // emitting the readyRead() signal. Otherwise, the read sequence will + // be considered not finished, and we may hang if a slot connected + // to readyRead() calls waitForReadyRead(). + const bool emitPipeClosed = (dwError != ERROR_SUCCESS && !pipeBroken); + if (emitPipeClosed) + pipeBroken = true; + + // Disable any further processing, if the pipe was stopped. + // We are not allowed to emit signals in either 'Stopped' + // or 'Draining' state. + if (state != Running) return false; + if (!emitPipeClosed) { + if (emitReadyRead) + emit readyRead(); + } else { + QPointer<QWindowsPipeReader> alive(this); + if (emitReadyRead) + emit readyRead(); + if (alive && dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) + emit winError(dwError, "QWindowsPipeReader::consumePendingAndEmit"_L1); + if (alive) + emit pipeClosed(); + } + + return emitReadyRead; +} + +/*! + Updates the read buffer size. Returns \c true, if readyRead() + should be emitted. Thread-safety should be ensured by the caller. + */ +bool QWindowsPipeReader::consumePending() +{ if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); + readyReadPending = false; + actualReadBufferSize += pendingReadBytes; + pendingReadBytes = 0; return true; } @@ -336,22 +484,32 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs) } /*! - Waits until the pipe is closed. + Returns the number of available bytes in the pipe. */ -bool QWindowsPipeReader::waitForPipeClosed(int msecs) +DWORD QWindowsPipeReader::checkPipeState() { - const int sleepTime = 10; - QElapsedTimer stopWatch; - stopWatch.start(); - forever { - waitForReadyRead(0); - checkPipeState(); - if (pipeBroken) + DWORD bytes; + if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) + return bytes; + + lastError = GetLastError(); + return 0; +} + +bool QWindowsPipeReader::waitForNotification() +{ + DWORD waitRet; + do { + waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE); + if (waitRet == WAIT_OBJECT_0) return true; - if (stopWatch.hasExpired(msecs - sleepTime)) - return false; - Sleep(sleepTime); - } + + // Some I/O completion routine was called. Wait some more. + } while (waitRet == WAIT_IO_COMPLETION); + + return false; } QT_END_NAMESPACE + +#include "moc_qwindowspipereader_p.cpp" |