summaryrefslogtreecommitdiffstats
path: root/src/corelib/io/qwindowspipewriter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/corelib/io/qwindowspipewriter.cpp')
-rw-r--r--src/corelib/io/qwindowspipewriter.cpp348
1 files changed, 228 insertions, 120 deletions
diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp
index e374034a06..5ed584c6e3 100644
--- a/src/corelib/io/qwindowspipewriter.cpp
+++ b/src/corelib/io/qwindowspipewriter.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,189 +39,296 @@
****************************************************************************/
#include "qwindowspipewriter_p.h"
-#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
- : pipeWriter(pipeWriter)
-{
-}
-
-void QWindowsPipeWriter::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
pendingBytesWrittenValue(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
- notifiedCalled(false),
bytesWrittenPending(false),
+ winEventActPosted(false),
inBytesWritten(false)
{
- connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
- this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
-bool QWindowsPipeWriter::waitForWrite(int msecs)
+/*!
+ Stops the asynchronous write sequence.
+ If the write sequence is running then the I/O operation is canceled.
+ */
+void QWindowsPipeWriter::stop()
{
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
- }
-
- if (!writeSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ if (stopped)
+ return;
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
+ mutex.lock();
+ stopped = true;
+ if (writeSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
+ if (!CancelIoEx(handle, &overlapped)) {
+ const DWORD dwError = GetLastError();
+ if (dwError != ERROR_NOT_FOUND) {
+ qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
+ handle);
+ }
+ }
+ writeSequenceStarted = false;
}
+ mutex.unlock();
- return false;
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
+/*!
+ Returns \c true if async operation is in progress or a bytesWritten
+ signal is pending.
+ */
+bool QWindowsPipeWriter::isWriteOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return writeSequenceStarted || bytesWrittenPending;
+}
+
+/*!
+ Returns the number of bytes that are waiting to be written.
+ */
qint64 QWindowsPipeWriter::bytesToWrite() const
{
- return buffer.size() + pendingBytesWrittenValue;
+ QMutexLocker locker(&mutex);
+ return writeBuffer.size() + pendingBytesWrittenValue;
}
-void QWindowsPipeWriter::emitPendingBytesWrittenValue()
+/*!
+ Writes data to the pipe.
+ */
+bool QWindowsPipeWriter::write(const QByteArray &ba)
{
- if (bytesWrittenPending) {
- // Reset the state even if we don't emit bytesWritten().
- // It's a defined behavior to not re-emit this signal recursively.
- bytesWrittenPending = false;
- const qint64 bytes = pendingBytesWrittenValue;
- pendingBytesWrittenValue = 0;
-
- emit canWrite();
- if (!inBytesWritten) {
- QScopedValueRollback<bool> guard(inBytesWritten, true);
- emit bytesWritten(bytes);
- }
+ QMutexLocker locker(&mutex);
+
+ if (lastError != ERROR_SUCCESS)
+ return false;
+
+ writeBuffer.append(ba);
+ if (writeSequenceStarted)
+ return true;
+
+ stopped = false;
+ startAsyncWriteLocked();
+
+ // Do not post the event, if the write operation will be completed asynchronously.
+ if (!bytesWrittenPending)
+ return true;
+
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ SetEvent(syncHandle);
+ return true;
}
-void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+/*!
+ Starts a new write sequence. Thread-safety should be ensured by the caller.
+ */
+void QWindowsPipeWriter::startAsyncWriteLocked()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
+ while (!writeBuffer.isEmpty()) {
+ // WriteFile() returns true, if the write operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesWritten' is valid in this case.
+ DWORD numberOfBytesWritten;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
+ &numberOfBytesWritten, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ writeSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!writeCompleted(errorCode, numberOfBytesWritten))
+ return;
+ }
}
/*!
- \internal
- Will be called whenever the write operation completes.
+ Thread pool callback procedure.
*/
-void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
+void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
- notifiedCalled = true;
- writeSequenceStarted = false;
- Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
- buffer.clear();
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
- break;
- }
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ QMutexLocker locker(&pipeWriter->mutex);
// After the writer was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new write sequence should
- // be started in this case.
- if (stopped)
+ // completion of a cancellation. No signals should be emitted, and no new write sequence
+ // should be started in this case.
+ if (pipeWriter->stopped)
return;
- pendingBytesWrittenValue += qint64(numberOfBytesWritten);
- if (!bytesWrittenPending) {
- bytesWrittenPending = true;
- emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
+ pipeWriter->writeSequenceStarted = false;
+
+ if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
+ pipeWriter->startAsyncWriteLocked();
+
+ if (pipeWriter->lastError == ERROR_SUCCESS && !pipeWriter->winEventActPosted) {
+ pipeWriter->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeWriter->syncHandle);
}
-bool QWindowsPipeWriter::waitForNotification(int timeout)
+/*!
+ Will be called whenever the write operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
+ */
+bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
- return true;
+ if (errorCode == ERROR_SUCCESS) {
+ Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
+ bytesWrittenPending = true;
+ pendingBytesWrittenValue += numberOfBytesWritten;
+ writeBuffer.free(numberOfBytesWritten);
+ return true;
}
- return notifiedCalled;
+
+ lastError = errorCode;
+ writeBuffer.clear();
+ // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
+ if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
+ qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
+ return false;
}
-bool QWindowsPipeWriter::write(const QByteArray &ba)
+/*!
+ Receives notification that the write operation has completed.
+ */
+bool QWindowsPipeWriter::event(QEvent *e)
{
- if (writeSequenceStarted)
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
+
+/*!
+ Updates the state and emits pending signals in the main thread.
+ Returns \c true, if bytesWritten() was emitted.
+ */
+bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
+{
+ QMutexLocker locker(&mutex);
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ if (!bytesWrittenPending)
return false;
- overlapped.clear();
- buffer = ba;
- stopped = false;
- writeSequenceStarted = true;
- if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
- &overlapped, &writeFileCompleted)) {
- writeSequenceStarted = false;
- buffer.clear();
-
- const DWORD errorCode = GetLastError();
- switch (errorCode) {
- case ERROR_NO_DATA: // "The pipe is being closed."
- // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
- break;
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
- }
+ // Reset the state even if we don't emit bytesWritten().
+ // It's a defined behavior to not re-emit this signal recursively.
+ bytesWrittenPending = false;
+ qint64 numberOfBytesWritten = pendingBytesWrittenValue;
+ pendingBytesWrittenValue = 0;
+
+ locker.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
return false;
+
+ emit canWrite();
+ if (!inBytesWritten) {
+ QScopedValueRollback<bool> guard(inBytesWritten, true);
+ emit bytesWritten(numberOfBytesWritten);
}
return true;
}
-void QWindowsPipeWriter::stop()
+bool QWindowsPipeWriter::waitForNotification(const QDeadlineTimer &deadline)
{
- stopped = true;
- bytesWrittenPending = false;
- pendingBytesWrittenValue = 0;
- if (writeSequenceStarted) {
- if (!CancelIoEx(handle, &overlapped)) {
- const DWORD dwError = GetLastError();
- if (dwError != ERROR_NOT_FOUND) {
- qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
- handle);
- }
- }
- waitForNotification(-1);
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
+ return true;
+
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
+
+ return false;
+}
+
+/*!
+ Waits for the completion of the asynchronous write operation.
+ Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
+ or bytesWritten will be emitted by the event loop (recursive case).
+ */
+bool QWindowsPipeWriter::waitForWrite(int msecs)
+{
+ QDeadlineTimer timer(msecs);
+
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isWriteOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
+
+ return false;
}
QT_END_NAMESPACE