summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipewriter.cpp
diff options
context:
space:
mode:
authorAlex Trotsenko <alex1973tr@gmail.com>2020-10-05 19:22:49 +0300
committerAlex Trotsenko <alex1973tr@gmail.com>2020-11-17 12:45:50 +0200
commitee122077b09430da54ca09750589b37326a22d85 (patch)
treefa06b0006bddc56fa68045d827275dc52c14f1ee /src/corelib/io/qwindowspipewriter.cpp
parent6be39809b038768a665b0e29a3a3668fdc424d9a (diff)
Allow QWindowsPipe{Reader,Writer} to work with foreign event loops
When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution , but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I1cd87c07db39f3b46a2683ce236d7eb67b5be549 Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
Diffstat (limited to 'src/corelib/io/qwindowspipewriter.cpp')
-rw-r--r--src/corelib/io/qwindowspipewriter.cpp353
1 files changed, 236 insertions, 117 deletions
diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp
index e374034a06..6cea9f3a5e 100644
--- a/src/corelib/io/qwindowspipewriter.cpp
+++ b/src/corelib/io/qwindowspipewriter.cpp
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -40,187 +40,306 @@
#include "qwindowspipewriter_p.h"
#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
QT_BEGIN_NAMESPACE
-QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
- : pipeWriter(pipeWriter)
-{
-}
-
-void QWindowsPipeWriter::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ waitObject(NULL),
pendingBytesWrittenValue(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
- notifiedCalled(false),
bytesWrittenPending(false),
+ winEventActPosted(false),
inBytesWritten(false)
{
- connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
- this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
-bool QWindowsPipeWriter::waitForWrite(int msecs)
+/*!
+ \internal
+ Stops the asynchronous write sequence.
+ If the write sequence is running then the I/O operation is canceled.
+ */
+void QWindowsPipeWriter::stop()
{
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
+ if (stopped)
+ return;
+
+ mutex.lock();
+ stopped = true;
+ if (writeSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
+ if (!CancelIoEx(handle, &overlapped)) {
+ const DWORD dwError = GetLastError();
+ if (dwError != ERROR_NOT_FOUND) {
+ qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
+ handle);
+ }
+ }
+ writeSequenceStarted = false;
}
+ mutex.unlock();
- if (!writeSequenceStarted)
- return false;
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
+}
- if (!waitForNotification(msecs))
+/*!
+ \internal
+ Returns \c true if async operation is in progress or a bytesWritten
+ signal is pending.
+ */
+bool QWindowsPipeWriter::isWriteOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return writeSequenceStarted || bytesWrittenPending;
+}
+
+/*!
+ \internal
+ Returns the number of bytes that are waiting to be written.
+ */
+qint64 QWindowsPipeWriter::bytesToWrite() const
+{
+ QMutexLocker locker(&mutex);
+ return writeBuffer.size() + pendingBytesWrittenValue;
+}
+
+/*!
+ \internal
+ Writes data to the pipe.
+ */
+bool QWindowsPipeWriter::write(const QByteArray &ba)
+{
+ QMutexLocker locker(&mutex);
+
+ if (lastError != ERROR_SUCCESS)
return false;
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
+ writeBuffer.append(ba);
+ if (writeSequenceStarted)
return true;
- }
- return false;
-}
+ stopped = false;
+ startAsyncWriteLocked();
-qint64 QWindowsPipeWriter::bytesToWrite() const
-{
- return buffer.size() + pendingBytesWrittenValue;
+ // Do not post the event, if the write operation will be completed asynchronously.
+ if (bytesWrittenPending && !winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ }
+ return true;
}
-void QWindowsPipeWriter::emitPendingBytesWrittenValue()
+/*!
+ \internal
+ Starts a new write sequence. Thread-safety should be ensured by the caller.
+ */
+void QWindowsPipeWriter::startAsyncWriteLocked()
{
- if (bytesWrittenPending) {
- // Reset the state even if we don't emit bytesWritten().
- // It's a defined behavior to not re-emit this signal recursively.
- bytesWrittenPending = false;
- const qint64 bytes = pendingBytesWrittenValue;
- pendingBytesWrittenValue = 0;
-
- emit canWrite();
- if (!inBytesWritten) {
- QScopedValueRollback<bool> guard(inBytesWritten, true);
- emit bytesWritten(bytes);
+ forever {
+ if (writeBuffer.isEmpty())
+ return;
+
+ // WriteFile() returns true, if the write operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesWritten' is valid in this case.
+ DWORD numberOfBytesWritten;
+ if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
+ &numberOfBytesWritten, &overlapped)) {
+ break;
}
+
+ writeCompleted(ERROR_SUCCESS, numberOfBytesWritten);
+ }
+
+ const DWORD dwError = GetLastError();
+ if (dwError == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ writeSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ } else {
+ // Other return values are actual errors.
+ writeCompleted(dwError, 0);
}
}
-void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+/*!
+ \internal
+ Thread pool callback procedure.
+ */
+void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ QMutexLocker locker(&pipeWriter->mutex);
+
+ // After the writer was stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and no new write sequence
+ // should be started in this case.
+ if (pipeWriter->stopped)
+ return;
+
+ pipeWriter->writeSequenceStarted = false;
+ pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered);
+ if (pipeWriter->lastError != ERROR_SUCCESS)
+ return;
+
+ pipeWriter->startAsyncWriteLocked();
+
+ if (!pipeWriter->winEventActPosted) {
+ pipeWriter->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
+ SetEvent(pipeWriter->syncHandle);
}
/*!
\internal
Will be called whenever the write operation completes.
*/
-void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
+void QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
- notifiedCalled = true;
- writeSequenceStarted = false;
- Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
- buffer.clear();
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
- break;
+ if (errorCode == ERROR_SUCCESS) {
+ Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
+
+ bytesWrittenPending = true;
+ pendingBytesWrittenValue += numberOfBytesWritten;
+ writeBuffer.free(numberOfBytesWritten);
+ } else {
+ lastError = errorCode;
+ writeBuffer.clear();
+ // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
+ if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
+ qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
}
+}
- // After the writer was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new write sequence should
- // be started in this case.
+/*!
+ \internal
+ Receives notification that the write operation has completed.
+ */
+bool QWindowsPipeWriter::event(QEvent *e)
+{
+ if (e->type() == QEvent::WinEventAct) {
+ emitPendingSignals(true);
+ return true;
+ }
+ return QObject::event(e);
+}
+
+/*!
+ \internal
+ Emits pending signals in the main thread. Returns \c true,
+ if bytesWritten() was emitted.
+ */
+bool QWindowsPipeWriter::emitPendingSignals(bool allowWinActPosting)
+{
+ QMutexLocker locker(&mutex);
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ if (!bytesWrittenPending)
+ return false;
+
+ // Reset the state even if we don't emit bytesWritten().
+ // It's a defined behavior to not re-emit this signal recursively.
+ bytesWrittenPending = false;
+ qint64 numberOfBytesWritten = pendingBytesWrittenValue;
+ pendingBytesWrittenValue = 0;
+
+ locker.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
if (stopped)
- return;
+ return false;
- pendingBytesWrittenValue += qint64(numberOfBytesWritten);
- if (!bytesWrittenPending) {
- bytesWrittenPending = true;
- emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
+ emit canWrite();
+ if (!inBytesWritten) {
+ QScopedValueRollback<bool> guard(inBytesWritten, true);
+ emit bytesWritten(numberOfBytesWritten);
}
+
+ return true;
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
- notifiedCalled = false;
int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle,
+ msecs == -1 ? INFINITE : msecs, TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
+ } while (msecs != 0);
+
+ return false;
}
-bool QWindowsPipeWriter::write(const QByteArray &ba)
+/*!
+ \internal
+ Waits for the completion of the asynchronous write operation.
+ Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
+ or bytesWritten will be emitted by the event loop (recursive case).
+ */
+bool QWindowsPipeWriter::waitForWrite(int msecs)
{
- if (writeSequenceStarted)
- return false;
+ // Prepare handle for waiting.
+ ResetEvent(syncHandle);
- overlapped.clear();
- buffer = ba;
- stopped = false;
- writeSequenceStarted = true;
- if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
- &overlapped, &writeFileCompleted)) {
- writeSequenceStarted = false;
- buffer.clear();
+ // It is necessary to check if there is already pending signal.
+ if (emitPendingSignals(false))
+ return true;
- const DWORD errorCode = GetLastError();
- switch (errorCode) {
- case ERROR_NO_DATA: // "The pipe is being closed."
- // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
- break;
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
- }
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ if (!isWriteOperationActive() || !waitForNotification(msecs))
return false;
- }
- return true;
-}
-
-void QWindowsPipeWriter::stop()
-{
- stopped = true;
- bytesWrittenPending = false;
- pendingBytesWrittenValue = 0;
- if (writeSequenceStarted) {
- if (!CancelIoEx(handle, &overlapped)) {
- const DWORD dwError = GetLastError();
- if (dwError != ERROR_NOT_FOUND) {
- qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
- handle);
- }
- }
- waitForNotification(-1);
- }
+ return emitPendingSignals(false);
}
QT_END_NAMESPACE