From ce29ce586f06f56a66198b934f8860946380e26e Mon Sep 17 00:00:00 2001 From: Kai Koehne Date: Thu, 19 Nov 2020 16:06:05 +0100 Subject: Revert "Allow QWindowsPipe{Reader,Writer} to work with foreign event loops" This reverts commit ee122077b09430da54ca09750589b37326a22d85. Reason for revert: This causes QProcess::readAll() to sometimes return nothing after the process has ended. Fixes: QTBUG-88624 Change-Id: I34fa27ae7fb38cc7c3a1e8eb2fdae2a5775584c2 Reviewed-by: Lars Knoll Reviewed-by: Paul Wicking (cherry picked from commit 23100ee61e33680d20f934dcbc96b57e8da29bf9) Reviewed-by: Qt Cherry-pick Bot --- src/corelib/io/qwindowspipereader.cpp | 383 +++++++++++----------------------- src/corelib/io/qwindowspipereader_p.h | 39 ++-- src/corelib/io/qwindowspipewriter.cpp | 353 +++++++++++-------------------- src/corelib/io/qwindowspipewriter_p.h | 42 ++-- 4 files changed, 284 insertions(+), 533 deletions(-) (limited to 'src/corelib/io') diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp index 3e1e212df2..c20909766d 100644 --- a/src/corelib/io/qwindowspipereader.cpp +++ b/src/corelib/io/qwindowspipereader.cpp @@ -1,6 +1,6 @@ /**************************************************************************** ** -** Copyright (C) 2020 The Qt Company Ltd. +** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -41,75 +41,61 @@ #include "qiodevice_p.h" #include #include -#include -#include QT_BEGIN_NAMESPACE +QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader) + : pipeReader(reader) +{ +} + +void QWindowsPipeReader::Overlapped::clear() +{ + ZeroMemory(this, sizeof(OVERLAPPED)); +} + + QWindowsPipeReader::QWindowsPipeReader(QObject *parent) : QObject(parent), handle(INVALID_HANDLE_VALUE), - eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), - syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)), - waitObject(NULL), + overlapped(this), readBufferMaxSize(0), actualReadBufferSize(0), - pendingReadBytes(0), - lastError(ERROR_SUCCESS), stopped(true), readSequenceStarted(false), + notifiedCalled(false), pipeBroken(false), readyReadPending(false), - winEventActPosted(false), inReadyRead(false) { - ZeroMemory(&overlapped, sizeof(OVERLAPPED)); - overlapped.hEvent = eventHandle; - waitObject = CreateThreadpoolWait(waitCallback, this, NULL); - if (waitObject == NULL) - qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed."); + connect(this, &QWindowsPipeReader::_q_queueReadyRead, + this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection); } QWindowsPipeReader::~QWindowsPipeReader() { stop(); - CloseThreadpoolWait(waitObject); - CloseHandle(eventHandle); - CloseHandle(syncHandle); } /*! - \internal Sets the handle to read from. The handle must be valid. - Do not call this function if the pipe is running. */ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) { readBuffer.clear(); actualReadBufferSize = 0; - readyReadPending = false; - pendingReadBytes = 0; handle = hPipeReadEnd; pipeBroken = false; - lastError = ERROR_SUCCESS; } /*! - \internal Stops the asynchronous read sequence. If the read sequence is running then the I/O operation is canceled. */ void QWindowsPipeReader::stop() { - if (stopped) - return; - - mutex.lock(); stopped = true; if (readSequenceStarted) { - // Trying to disable callback before canceling the operation. - // Callback invocation is unnecessary here. - SetThreadpoolWait(waitObject, NULL, NULL); if (!CancelIoEx(handle, &overlapped)) { const DWORD dwError = GetLastError(); if (dwError != ERROR_NOT_FOUND) { @@ -117,33 +103,8 @@ void QWindowsPipeReader::stop() handle); } } - readSequenceStarted = false; + waitForNotification(-1); } - mutex.unlock(); - - WaitForThreadpoolWaitCallbacks(waitObject, TRUE); -} - -/*! - \internal - Sets the size of internal read buffer. - */ -void QWindowsPipeReader::setMaxReadBufferSize(qint64 size) -{ - QMutexLocker locker(&mutex); - readBufferMaxSize = size; -} - -/*! - \internal - Returns \c true if async operation is in progress, there is - pending data to read, or a read error is pending. - */ -bool QWindowsPipeReader::isReadOperationActive() const -{ - QMutexLocker locker(&mutex); - return readSequenceStarted || readyReadPending - || (lastError != ERROR_SUCCESS && !pipeBroken); } /*! @@ -162,7 +123,6 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) if (pipeBroken && actualReadBufferSize == 0) return 0; // signal EOF - mutex.lock(); qint64 readSoFar; // If startAsyncRead() has read data, copy it to its destination. if (maxlen == 1 && actualReadBufferSize > 0) { @@ -173,10 +133,9 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= readSoFar; } - mutex.unlock(); if (!pipeBroken) { - if (!stopped) + if (!readSequenceStarted && !stopped) startAsyncRead(); if (readSoFar == 0) return -2; // signal EWOULDBLOCK @@ -185,220 +144,131 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) return readSoFar; } -/*! - \internal - Returns \c true if a complete line of data can be read from the buffer. - */ bool QWindowsPipeReader::canReadLine() const { - QMutexLocker locker(&mutex); return readBuffer.indexOf('\n', actualReadBufferSize) >= 0; } /*! \internal - Starts an asynchronous read sequence on the pipe. - */ -void QWindowsPipeReader::startAsyncRead() -{ - QMutexLocker locker(&mutex); - - if (readSequenceStarted || lastError != ERROR_SUCCESS) - return; - - stopped = false; - startAsyncReadLocked(); - - // Do not post the event, if the read operation will be completed asynchronously. - if (!readyReadPending && lastError == ERROR_SUCCESS) - return; - - if (!winEventActPosted) { - winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); - } -} - -/*! - \internal - Starts a new read sequence. Thread-safety should be ensured by the caller. + Will be called whenever the read operation completes. */ -void QWindowsPipeReader::startAsyncReadLocked() +void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead) { - const DWORD minReadBufferSize = 4096; - forever { - qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize); - if (lastError != ERROR_SUCCESS) - return; - - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead <= 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; - } - } - - char *ptr = readBuffer.reserve(bytesToRead); - - // ReadFile() returns true, if the read operation completes synchronously. - // We don't need to call GetOverlappedResult() additionally, because - // 'numberOfBytesRead' is valid in this case. - DWORD numberOfBytesRead; - if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) + notifiedCalled = true; + readSequenceStarted = false; + + switch (errorCode) { + case ERROR_SUCCESS: + break; + case ERROR_MORE_DATA: + // This is not an error. We're connected to a message mode + // pipe and the message didn't fit into the pipe's system + // buffer. We will read the remaining data in the next call. + break; + case ERROR_BROKEN_PIPE: + case ERROR_PIPE_NOT_CONNECTED: + pipeBroken = true; + break; + case ERROR_OPERATION_ABORTED: + if (stopped) break; - - readCompleted(ERROR_SUCCESS, numberOfBytesRead); - } - - const DWORD dwError = GetLastError(); - if (dwError == ERROR_IO_PENDING) { - // Operation has been queued and will complete in the future. - readSequenceStarted = true; - SetThreadpoolWait(waitObject, eventHandle, NULL); - } else { - // Any other errors are treated as EOF. - readCompleted(dwError, 0); + Q_FALLTHROUGH(); + default: + emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified")); + pipeBroken = true; + break; } -} - -/*! - \internal - Thread pool callback procedure. - */ -void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, - PTP_WAIT wait, TP_WAIT_RESULT waitResult) -{ - Q_UNUSED(instance); - Q_UNUSED(wait); - Q_UNUSED(waitResult); - QWindowsPipeReader *pipeReader = reinterpret_cast(context); - - // Get the result of the asynchronous operation. - DWORD numberOfBytesTransfered = 0; - DWORD errorCode = ERROR_SUCCESS; - if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped, - &numberOfBytesTransfered, FALSE)) - errorCode = GetLastError(); - - QMutexLocker locker(&pipeReader->mutex); // After the reader was stopped, the only reason why this function can be called is the // completion of a cancellation. No signals should be emitted, and no new read sequence should // be started in this case. - if (pipeReader->stopped) + if (stopped) return; - pipeReader->readSequenceStarted = false; - - // Do not overwrite error code, if error has been detected by - // checkPipeState() in waitForPipeClosed(). - if (pipeReader->lastError != ERROR_SUCCESS) + if (pipeBroken) { + emit pipeClosed(); return; - - pipeReader->readCompleted(errorCode, numberOfBytesTransfered); - if (pipeReader->lastError == ERROR_SUCCESS) - pipeReader->startAsyncReadLocked(); - - if (!pipeReader->winEventActPosted) { - pipeReader->winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct)); - } else { - locker.unlock(); } - SetEvent(pipeReader->syncHandle); -} -/*! - \internal - Will be called whenever the read operation completes. - */ -void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead) -{ - // ERROR_MORE_DATA is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { + actualReadBufferSize += numberOfBytesRead; + readBuffer.truncate(actualReadBufferSize); + startAsyncRead(); + if (!readyReadPending) { readyReadPending = true; - pendingReadBytes += numberOfBytesRead; - readBuffer.truncate(actualReadBufferSize + pendingReadBytes); - } else { - lastError = errorCode; - } -} - -/*! - \internal - Receives notification that the read operation has completed. - */ -bool QWindowsPipeReader::event(QEvent *e) -{ - if (e->type() == QEvent::WinEventAct) { - emitPendingSignals(true); - return true; + emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal()); } - return QObject::event(e); } /*! \internal - Emits pending signals in the main thread. Returns \c true, - if readyRead() was emitted. + Reads data from the pipe into the readbuffer. */ -bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting) +void QWindowsPipeReader::startAsyncRead() { - mutex.lock(); - - // Enable QEvent::WinEventAct posting. - if (allowWinActPosting) - winEventActPosted = false; + const DWORD minReadBufferSize = 4096; + qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize); + if (pipeBroken) + return; - bool emitReadyRead = false; - if (readyReadPending) { - readyReadPending = false; - actualReadBufferSize += pendingReadBytes; - pendingReadBytes = 0; - emitReadyRead = true; + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead <= 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } } - const DWORD dwError = lastError; - mutex.unlock(); + char *ptr = readBuffer.reserve(bytesToRead); - // Disable any further processing, if the pipe was stopped. - if (stopped) - return false; - - if (emitReadyRead && !inReadyRead) { - QScopedValueRollback guard(inReadyRead, true); - emit readyRead(); - } + stopped = false; + readSequenceStarted = true; + overlapped.clear(); + if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) { + readSequenceStarted = false; - // Trigger 'pipeBroken' only once. - if (dwError != ERROR_SUCCESS && !pipeBroken) { - pipeBroken = true; - if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) - emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals")); - emit pipeClosed(); + const DWORD dwError = GetLastError(); + switch (dwError) { + case ERROR_BROKEN_PIPE: + case ERROR_PIPE_NOT_CONNECTED: + // It may happen, that the other side closes the connection directly + // after writing data. Then we must set the appropriate socket state. + pipeBroken = true; + emit pipeClosed(); + break; + default: + emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); + break; + } } +} - return emitReadyRead; +/*! + \internal + Called when ReadFileEx finished the read operation. + */ +void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, + OVERLAPPED *overlappedBase) +{ + Overlapped *overlapped = static_cast(overlappedBase); + overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered); } /*! \internal Returns the number of available bytes in the pipe. - Sets QWindowsPipeReader::lastError if the connection is broken. + Sets QWindowsPipeReader::pipeBroken to true if the connection is broken. */ DWORD QWindowsPipeReader::checkPipeState() { DWORD bytes; if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) return bytes; - - readCompleted(GetLastError(), 0); + if (!pipeBroken) { + pipeBroken = true; + emit pipeClosed(); + } return 0; } @@ -406,21 +276,27 @@ bool QWindowsPipeReader::waitForNotification(int timeout) { QElapsedTimer t; t.start(); + notifiedCalled = false; int msecs = timeout; - do { - DWORD waitRet = WaitForSingleObjectEx(syncHandle, - msecs == -1 ? INFINITE : msecs, TRUE); - if (waitRet == WAIT_OBJECT_0) + while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { + if (notifiedCalled) return true; - if (waitRet != WAIT_IO_COMPLETION) - return false; - - // Some I/O completion routine was called. Wait some more. + // Some other I/O completion routine was called. Wait some more. msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - } while (msecs != 0); + if (!msecs) + break; + } + return notifiedCalled; +} - return false; +void QWindowsPipeReader::emitPendingReadyRead() +{ + if (readyReadPending) { + readyReadPending = false; + QScopedValueRollback guard(inReadyRead, true); + emit readyRead(); + } } /*! @@ -430,21 +306,25 @@ bool QWindowsPipeReader::waitForNotification(int timeout) */ bool QWindowsPipeReader::waitForReadyRead(int msecs) { - if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize) + if (readyReadPending) { + if (!inReadyRead) + emitPendingReadyRead(); + return true; + } + + if (!readSequenceStarted) return false; - // Prepare handle for waiting. - ResetEvent(syncHandle); + if (!waitForNotification(msecs)) + return false; - // It is necessary to check if there is already data in the queue. - if (emitPendingSignals(false)) + if (readyReadPending) { + if (!inReadyRead) + emitPendingReadyRead(); return true; + } - // Make sure that 'syncHandle' was triggered by the thread pool callback. - if (pipeBroken || !waitForNotification(msecs)) - return false; - - return emitPendingSignals(false); + return false; } /*! @@ -457,18 +337,9 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs) stopWatch.start(); forever { waitForReadyRead(0); - if (pipeBroken) - return true; - - // When the read buffer is full, the read sequence is not running. - // So, we should peek the pipe to detect disconnect. - mutex.lock(); checkPipeState(); - mutex.unlock(); - emitPendingSignals(false); if (pipeBroken) return true; - if (stopWatch.hasExpired(msecs - sleepTime)) return false; Sleep(sleepTime); diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h index 5974833a86..2842343597 100644 --- a/src/corelib/io/qwindowspipereader_p.h +++ b/src/corelib/io/qwindowspipereader_p.h @@ -1,6 +1,6 @@ /**************************************************************************** ** -** Copyright (C) 2020 The Qt Company Ltd. +** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -52,7 +52,6 @@ // #include -#include #include #include @@ -70,7 +69,7 @@ public: void startAsyncRead(); void stop(); - void setMaxReadBufferSize(qint64 size); + void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; } qint64 maxReadBufferSize() const { return readBufferMaxSize; } bool isPipeClosed() const { return pipeBroken; } @@ -80,41 +79,41 @@ public: bool waitForReadyRead(int msecs); bool waitForPipeClosed(int msecs); - bool isReadOperationActive() const; + bool isReadOperationActive() const { return readSequenceStarted; } Q_SIGNALS: void winError(ulong, const QString &); void readyRead(); void pipeClosed(); - -protected: - bool event(QEvent *e) override; + void _q_queueReadyRead(QPrivateSignal); private: - void startAsyncReadLocked(); - static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, - PTP_WAIT wait, TP_WAIT_RESULT waitResult); - void readCompleted(DWORD errorCode, DWORD numberOfBytesRead); + static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, + OVERLAPPED *overlappedBase); + void notified(DWORD errorCode, DWORD numberOfBytesRead); DWORD checkPipeState(); bool waitForNotification(int timeout); - bool emitPendingSignals(bool allowWinActPosting); + void emitPendingReadyRead(); + + class Overlapped : public OVERLAPPED + { + Q_DISABLE_COPY_MOVE(Overlapped) + public: + explicit Overlapped(QWindowsPipeReader *reader); + void clear(); + QWindowsPipeReader *pipeReader; + }; HANDLE handle; - HANDLE eventHandle; - HANDLE syncHandle; - PTP_WAIT waitObject; - OVERLAPPED overlapped; + Overlapped overlapped; qint64 readBufferMaxSize; QRingBuffer readBuffer; qint64 actualReadBufferSize; - qint64 pendingReadBytes; - mutable QMutex mutex; - DWORD lastError; bool stopped; bool readSequenceStarted; + bool notifiedCalled; bool pipeBroken; bool readyReadPending; - bool winEventActPosted; bool inReadyRead; }; diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp index 6cea9f3a5e..e374034a06 100644 --- a/src/corelib/io/qwindowspipewriter.cpp +++ b/src/corelib/io/qwindowspipewriter.cpp @@ -1,6 +1,6 @@ /**************************************************************************** ** -** Copyright (C) 2020 The Qt Company Ltd. +** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -40,306 +40,187 @@ #include "qwindowspipewriter_p.h" #include "qiodevice_p.h" #include -#include QT_BEGIN_NAMESPACE +QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter) + : pipeWriter(pipeWriter) +{ +} + +void QWindowsPipeWriter::Overlapped::clear() +{ + ZeroMemory(this, sizeof(OVERLAPPED)); +} + + QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent) : QObject(parent), handle(pipeWriteEnd), - eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), - syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)), - waitObject(NULL), + overlapped(this), pendingBytesWrittenValue(0), - lastError(ERROR_SUCCESS), stopped(true), writeSequenceStarted(false), + notifiedCalled(false), bytesWrittenPending(false), - winEventActPosted(false), inBytesWritten(false) { - ZeroMemory(&overlapped, sizeof(OVERLAPPED)); - overlapped.hEvent = eventHandle; - waitObject = CreateThreadpoolWait(waitCallback, this, NULL); - if (waitObject == NULL) - qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed."); + connect(this, &QWindowsPipeWriter::_q_queueBytesWritten, + this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection); } QWindowsPipeWriter::~QWindowsPipeWriter() { stop(); - CloseThreadpoolWait(waitObject); - CloseHandle(eventHandle); - CloseHandle(syncHandle); } -/*! - \internal - Stops the asynchronous write sequence. - If the write sequence is running then the I/O operation is canceled. - */ -void QWindowsPipeWriter::stop() +bool QWindowsPipeWriter::waitForWrite(int msecs) { - if (stopped) - return; - - mutex.lock(); - stopped = true; - if (writeSequenceStarted) { - // Trying to disable callback before canceling the operation. - // Callback invocation is unnecessary here. - SetThreadpoolWait(waitObject, NULL, NULL); - if (!CancelIoEx(handle, &overlapped)) { - const DWORD dwError = GetLastError(); - if (dwError != ERROR_NOT_FOUND) { - qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.", - handle); - } - } - writeSequenceStarted = false; + if (bytesWrittenPending) { + emitPendingBytesWrittenValue(); + return true; } - mutex.unlock(); - - WaitForThreadpoolWaitCallbacks(waitObject, TRUE); -} -/*! - \internal - Returns \c true if async operation is in progress or a bytesWritten - signal is pending. - */ -bool QWindowsPipeWriter::isWriteOperationActive() const -{ - QMutexLocker locker(&mutex); - return writeSequenceStarted || bytesWrittenPending; -} - -/*! - \internal - Returns the number of bytes that are waiting to be written. - */ -qint64 QWindowsPipeWriter::bytesToWrite() const -{ - QMutexLocker locker(&mutex); - return writeBuffer.size() + pendingBytesWrittenValue; -} - -/*! - \internal - Writes data to the pipe. - */ -bool QWindowsPipeWriter::write(const QByteArray &ba) -{ - QMutexLocker locker(&mutex); + if (!writeSequenceStarted) + return false; - if (lastError != ERROR_SUCCESS) + if (!waitForNotification(msecs)) return false; - writeBuffer.append(ba); - if (writeSequenceStarted) + if (bytesWrittenPending) { + emitPendingBytesWrittenValue(); return true; - - stopped = false; - startAsyncWriteLocked(); - - // Do not post the event, if the write operation will be completed asynchronously. - if (bytesWrittenPending && !winEventActPosted) { - winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); } - return true; -} -/*! - \internal - Starts a new write sequence. Thread-safety should be ensured by the caller. - */ -void QWindowsPipeWriter::startAsyncWriteLocked() -{ - forever { - if (writeBuffer.isEmpty()) - return; - - // WriteFile() returns true, if the write operation completes synchronously. - // We don't need to call GetOverlappedResult() additionally, because - // 'numberOfBytesWritten' is valid in this case. - DWORD numberOfBytesWritten; - if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(), - &numberOfBytesWritten, &overlapped)) { - break; - } - - writeCompleted(ERROR_SUCCESS, numberOfBytesWritten); - } - - const DWORD dwError = GetLastError(); - if (dwError == ERROR_IO_PENDING) { - // Operation has been queued and will complete in the future. - writeSequenceStarted = true; - SetThreadpoolWait(waitObject, eventHandle, NULL); - } else { - // Other return values are actual errors. - writeCompleted(dwError, 0); - } + return false; } -/*! - \internal - Thread pool callback procedure. - */ -void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, - PTP_WAIT wait, TP_WAIT_RESULT waitResult) +qint64 QWindowsPipeWriter::bytesToWrite() const { - Q_UNUSED(instance); - Q_UNUSED(wait); - Q_UNUSED(waitResult); - QWindowsPipeWriter *pipeWriter = reinterpret_cast(context); - - // Get the result of the asynchronous operation. - DWORD numberOfBytesTransfered = 0; - DWORD errorCode = ERROR_SUCCESS; - if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped, - &numberOfBytesTransfered, FALSE)) - errorCode = GetLastError(); - - QMutexLocker locker(&pipeWriter->mutex); - - // After the writer was stopped, the only reason why this function can be called is the - // completion of a cancellation. No signals should be emitted, and no new write sequence - // should be started in this case. - if (pipeWriter->stopped) - return; - - pipeWriter->writeSequenceStarted = false; - pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered); - if (pipeWriter->lastError != ERROR_SUCCESS) - return; - - pipeWriter->startAsyncWriteLocked(); - - if (!pipeWriter->winEventActPosted) { - pipeWriter->winEventActPosted = true; - locker.unlock(); - QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct)); - } else { - locker.unlock(); - } - SetEvent(pipeWriter->syncHandle); + return buffer.size() + pendingBytesWrittenValue; } -/*! - \internal - Will be called whenever the write operation completes. - */ -void QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten) +void QWindowsPipeWriter::emitPendingBytesWrittenValue() { - if (errorCode == ERROR_SUCCESS) { - Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize())); - - bytesWrittenPending = true; - pendingBytesWrittenValue += numberOfBytesWritten; - writeBuffer.free(numberOfBytesWritten); - } else { - lastError = errorCode; - writeBuffer.clear(); - // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn. - if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA) - qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed."); + if (bytesWrittenPending) { + // Reset the state even if we don't emit bytesWritten(). + // It's a defined behavior to not re-emit this signal recursively. + bytesWrittenPending = false; + const qint64 bytes = pendingBytesWrittenValue; + pendingBytesWrittenValue = 0; + + emit canWrite(); + if (!inBytesWritten) { + QScopedValueRollback guard(inBytesWritten, true); + emit bytesWritten(bytes); + } } } -/*! - \internal - Receives notification that the write operation has completed. - */ -bool QWindowsPipeWriter::event(QEvent *e) +void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, + OVERLAPPED *overlappedBase) { - if (e->type() == QEvent::WinEventAct) { - emitPendingSignals(true); - return true; - } - return QObject::event(e); + Overlapped *overlapped = static_cast(overlappedBase); + overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered); } /*! \internal - Emits pending signals in the main thread. Returns \c true, - if bytesWritten() was emitted. + Will be called whenever the write operation completes. */ -bool QWindowsPipeWriter::emitPendingSignals(bool allowWinActPosting) +void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten) { - QMutexLocker locker(&mutex); - - // Enable QEvent::WinEventAct posting. - if (allowWinActPosting) - winEventActPosted = false; - - if (!bytesWrittenPending) - return false; - - // Reset the state even if we don't emit bytesWritten(). - // It's a defined behavior to not re-emit this signal recursively. - bytesWrittenPending = false; - qint64 numberOfBytesWritten = pendingBytesWrittenValue; - pendingBytesWrittenValue = 0; - - locker.unlock(); + notifiedCalled = true; + writeSequenceStarted = false; + Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size())); + buffer.clear(); + + switch (errorCode) { + case ERROR_SUCCESS: + break; + case ERROR_OPERATION_ABORTED: + if (stopped) + break; + Q_FALLTHROUGH(); + default: + qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed."); + break; + } - // Disable any further processing, if the pipe was stopped. + // After the writer was stopped, the only reason why this function can be called is the + // completion of a cancellation. No signals should be emitted, and no new write sequence should + // be started in this case. if (stopped) - return false; + return; - emit canWrite(); - if (!inBytesWritten) { - QScopedValueRollback guard(inBytesWritten, true); - emit bytesWritten(numberOfBytesWritten); + pendingBytesWrittenValue += qint64(numberOfBytesWritten); + if (!bytesWrittenPending) { + bytesWrittenPending = true; + emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal()); } - - return true; } bool QWindowsPipeWriter::waitForNotification(int timeout) { QElapsedTimer t; t.start(); + notifiedCalled = false; int msecs = timeout; - do { - DWORD waitRet = WaitForSingleObjectEx(syncHandle, - msecs == -1 ? INFINITE : msecs, TRUE); - if (waitRet == WAIT_OBJECT_0) + while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { + if (notifiedCalled) return true; - if (waitRet != WAIT_IO_COMPLETION) - return false; - - // Some I/O completion routine was called. Wait some more. + // Some other I/O completion routine was called. Wait some more. msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - } while (msecs != 0); - - return false; + if (!msecs) + break; + } + return notifiedCalled; } -/*! - \internal - Waits for the completion of the asynchronous write operation. - Returns \c true, if we've emitted the bytesWritten signal (non-recursive case) - or bytesWritten will be emitted by the event loop (recursive case). - */ -bool QWindowsPipeWriter::waitForWrite(int msecs) +bool QWindowsPipeWriter::write(const QByteArray &ba) { - // Prepare handle for waiting. - ResetEvent(syncHandle); + if (writeSequenceStarted) + return false; - // It is necessary to check if there is already pending signal. - if (emitPendingSignals(false)) - return true; + overlapped.clear(); + buffer = ba; + stopped = false; + writeSequenceStarted = true; + if (!WriteFileEx(handle, buffer.constData(), buffer.size(), + &overlapped, &writeFileCompleted)) { + writeSequenceStarted = false; + buffer.clear(); - // Make sure that 'syncHandle' was triggered by the thread pool callback. - if (!isWriteOperationActive() || !waitForNotification(msecs)) + const DWORD errorCode = GetLastError(); + switch (errorCode) { + case ERROR_NO_DATA: // "The pipe is being closed." + // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn. + break; + default: + qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed."); + } return false; + } - return emitPendingSignals(false); + return true; +} + +void QWindowsPipeWriter::stop() +{ + stopped = true; + bytesWrittenPending = false; + pendingBytesWrittenValue = 0; + if (writeSequenceStarted) { + if (!CancelIoEx(handle, &overlapped)) { + const DWORD dwError = GetLastError(); + if (dwError != ERROR_NOT_FOUND) { + qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.", + handle); + } + } + waitForNotification(-1); + } } QT_END_NAMESPACE diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h index b5a48e926f..39e8ffe40a 100644 --- a/src/corelib/io/qwindowspipewriter_p.h +++ b/src/corelib/io/qwindowspipewriter_p.h @@ -1,6 +1,6 @@ /**************************************************************************** ** -** Copyright (C) 2020 The Qt Company Ltd. +** Copyright (C) 2016 The Qt Company Ltd. ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -54,9 +54,7 @@ #include #include #include -#include -#include - +#include #include QT_BEGIN_NAMESPACE @@ -119,37 +117,39 @@ public: bool write(const QByteArray &ba); void stop(); bool waitForWrite(int msecs); - bool isWriteOperationActive() const; + bool isWriteOperationActive() const { return writeSequenceStarted; } qint64 bytesToWrite() const; Q_SIGNALS: void canWrite(); void bytesWritten(qint64 bytes); - -protected: - bool event(QEvent *e) override; + void _q_queueBytesWritten(QPrivateSignal); private: - void startAsyncWriteLocked(); - static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, - PTP_WAIT wait, TP_WAIT_RESULT waitResult); - void writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten); + static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, + OVERLAPPED *overlappedBase); + void notified(DWORD errorCode, DWORD numberOfBytesWritten); bool waitForNotification(int timeout); - bool emitPendingSignals(bool allowWinActPosting); + void emitPendingBytesWrittenValue(); + + class Overlapped : public OVERLAPPED + { + Q_DISABLE_COPY_MOVE(Overlapped) + public: + explicit Overlapped(QWindowsPipeWriter *pipeWriter); + void clear(); + + QWindowsPipeWriter *pipeWriter; + }; HANDLE handle; - HANDLE eventHandle; - HANDLE syncHandle; - PTP_WAIT waitObject; - OVERLAPPED overlapped; - QRingBuffer writeBuffer; + Overlapped overlapped; + QByteArray buffer; qint64 pendingBytesWrittenValue; - mutable QMutex mutex; - DWORD lastError; bool stopped; bool writeSequenceStarted; + bool notifiedCalled; bool bytesWrittenPending; - bool winEventActPosted; bool inBytesWritten; }; -- cgit v1.2.3