diff options
-rw-r--r-- | src/corelib/io/qwindowspipereader.cpp | 468 | ||||
-rw-r--r-- | src/corelib/io/qwindowspipereader_p.h | 51 | ||||
-rw-r--r-- | src/corelib/io/qwindowspipewriter.cpp | 348 | ||||
-rw-r--r-- | src/corelib/io/qwindowspipewriter_p.h | 44 | ||||
-rw-r--r-- | tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp | 42 | ||||
-rw-r--r-- | tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp | 20 | ||||
-rw-r--r-- | tests/benchmarks/network/socket/CMakeLists.txt | 1 | ||||
-rw-r--r-- | tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt | 14 | ||||
-rw-r--r-- | tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp | 225 |
9 files changed, 852 insertions, 361 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp index b525e88282..bf03737c39 100644 --- a/src/corelib/io/qwindowspipereader.cpp +++ b/src/corelib/io/qwindowspipereader.cpp @@ -1,6 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -38,58 +39,63 @@ ****************************************************************************/ #include "qwindowspipereader_p.h" -#include "qiodevice_p.h" -#include <qelapsedtimer.h> #include <qscopedvaluerollback.h> +#include <qcoreapplication.h> +#include <QMutexLocker> QT_BEGIN_NAMESPACE static const DWORD minReadBufferSize = 4096; -QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader) - : pipeReader(reader) -{ -} - -void QWindowsPipeReader::Overlapped::clear() -{ - ZeroMemory(this, sizeof(OVERLAPPED)); -} - - QWindowsPipeReader::QWindowsPipeReader(QObject *parent) : QObject(parent), handle(INVALID_HANDLE_VALUE), - overlapped(this), + eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + waitObject(NULL), readBufferMaxSize(0), actualReadBufferSize(0), - bytesPending(0), + pendingReadBytes(0), + lastError(ERROR_SUCCESS), state(Stopped), readSequenceStarted(false), - notifiedCalled(false), pipeBroken(false), readyReadPending(false), + winEventActPosted(false), inReadyRead(false) { - connect(this, &QWindowsPipeReader::_q_queueReadyRead, - this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection); + ZeroMemory(&overlapped, sizeof(OVERLAPPED)); + overlapped.hEvent = eventHandle; + waitObject = CreateThreadpoolWait(waitCallback, this, NULL); + if (waitObject == NULL) + qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed."); } QWindowsPipeReader::~QWindowsPipeReader() { stop(); + + // Wait for thread pool callback to complete, as it can be still + // executing some completion code. + WaitForThreadpoolWaitCallbacks(waitObject, FALSE); + CloseThreadpoolWait(waitObject); + CloseHandle(eventHandle); + CloseHandle(syncHandle); } /*! Sets the handle to read from. The handle must be valid. + Do not call this function while the pipe is running. */ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) { readBuffer.clear(); actualReadBufferSize = 0; - bytesPending = 0; + readyReadPending = false; + pendingReadBytes = 0; handle = hPipeReadEnd; pipeBroken = false; + lastError = ERROR_SUCCESS; } /*! @@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd) */ void QWindowsPipeReader::stop() { - state = Stopped; - cancelAsyncRead(); + cancelAsyncRead(Stopped); } /*! @@ -108,16 +113,27 @@ void QWindowsPipeReader::stop() */ void QWindowsPipeReader::drainAndStop() { - state = Draining; - cancelAsyncRead(); + cancelAsyncRead(Draining); + + // Note that signals are not emitted in the call below, as the caller + // is expected to do that synchronously. + consumePending(); } /*! Stops the asynchronous read sequence. */ -void QWindowsPipeReader::cancelAsyncRead() +void QWindowsPipeReader::cancelAsyncRead(State newState) { + if (state != Running) + return; + + QMutexLocker locker(&mutex); + state = newState; if (readSequenceStarted) { + // This can legitimately fail due to the GetOverlappedResult() + // in the callback not being locked. We ignore ERROR_NOT_FOUND + // in this case. if (!CancelIoEx(handle, &overlapped)) { const DWORD dwError = GetLastError(); if (dwError != ERROR_NOT_FOUND) { @@ -125,11 +141,37 @@ void QWindowsPipeReader::cancelAsyncRead() handle); } } - waitForNotification(-1); + + // Wait for callback to complete. + do { + locker.unlock(); + waitForNotification(QDeadlineTimer(-1)); + locker.relock(); + } while (readSequenceStarted); } } /*! + Sets the size of internal read buffer. + */ +void QWindowsPipeReader::setMaxReadBufferSize(qint64 size) +{ + QMutexLocker locker(&mutex); + readBufferMaxSize = size; +} + +/*! + Returns \c true if async operation is in progress, there is + pending data to read, or a read error is pending. +*/ +bool QWindowsPipeReader::isReadOperationActive() const +{ + QMutexLocker locker(&mutex); + return readSequenceStarted || readyReadPending + || (lastError != ERROR_SUCCESS && !pipeBroken); +} + +/*! Returns the number of bytes we've read so far. */ qint64 QWindowsPipeReader::bytesAvailable() const @@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) if (pipeBroken && actualReadBufferSize == 0) return 0; // signal EOF + mutex.lock(); qint64 readSoFar; // If startAsyncRead() has read data, copy it to its destination. if (maxlen == 1 && actualReadBufferSize > 0) { @@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen)); actualReadBufferSize -= readSoFar; } + mutex.unlock(); if (!pipeBroken) { if (state == Running) @@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen) return readSoFar; } +/*! + Returns \c true if a complete line of data can be read from the buffer. + */ bool QWindowsPipeReader::canReadLine() const { + QMutexLocker locker(&mutex); return readBuffer.indexOf('\n', actualReadBufferSize) >= 0; } /*! - \internal - Will be called whenever the read operation completes. + Starts an asynchronous read sequence on the pipe. */ -void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead) +void QWindowsPipeReader::startAsyncRead() { - notifiedCalled = true; - readSequenceStarted = false; - - switch (errorCode) { - case ERROR_SUCCESS: - break; - case ERROR_MORE_DATA: - // This is not an error. We're connected to a message mode - // pipe and the message didn't fit into the pipe's system - // buffer. We will read the remaining data in the next call. - break; - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - pipeBroken = true; - break; - case ERROR_OPERATION_ABORTED: - if (state != Running) - break; - Q_FALLTHROUGH(); - default: - emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified")); - pipeBroken = true; - break; - } + QMutexLocker locker(&mutex); - // After the reader was stopped, the only reason why this function can be called is the - // completion of a cancellation. No signals should be emitted, and no new read sequence should - // be started in this case. - if (state == Stopped) + if (readSequenceStarted || lastError != ERROR_SUCCESS) return; - if (pipeBroken) { - emitPipeClosed(); + state = Running; + startAsyncReadLocked(); + + // Do not post the event, if the read operation will be completed asynchronously. + if (!readyReadPending && lastError == ERROR_SUCCESS) return; - } - actualReadBufferSize += numberOfBytesRead; - readBuffer.truncate(actualReadBufferSize); + if (!winEventActPosted) { + winEventActPosted = true; + locker.unlock(); + QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); + } else { + locker.unlock(); + } - // Read all pending data from the pipe's buffer in 'Draining' state. - if (state == Draining) { - // Determine the number of pending bytes on the first iteration. - if (bytesPending == 0) - bytesPending = checkPipeState(); - else - bytesPending -= numberOfBytesRead; + SetEvent(syncHandle); +} - if (bytesPending == 0) // all data received - return; // unblock waitForNotification() in cancelAsyncRead() +/*! + Starts a new read sequence. Thread-safety should be ensured + by the caller. + */ +void QWindowsPipeReader::startAsyncReadLocked() +{ + // Determine the number of bytes to read. + qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0); - startAsyncReadHelper(bytesPending); - if (readSequenceStarted) - notifiedCalled = false; // wait for more data + // This can happen only while draining; just do nothing in this case. + if (bytesToRead == 0) return; + + while (lastError == ERROR_SUCCESS) { + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead <= 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } + } + + char *ptr = readBuffer.reserve(bytesToRead); + + // ReadFile() returns true, if the read operation completes synchronously. + // We don't need to call GetOverlappedResult() additionally, because + // 'numberOfBytesRead' is valid in this case. + DWORD numberOfBytesRead; + DWORD errorCode = ERROR_SUCCESS; + if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) { + errorCode = GetLastError(); + if (errorCode == ERROR_IO_PENDING) { + Q_ASSERT(state == Running); + // Operation has been queued and will complete in the future. + readSequenceStarted = true; + SetThreadpoolWait(waitObject, eventHandle, NULL); + return; + } + } + + if (!readCompleted(errorCode, numberOfBytesRead)) + return; + + // In the 'Draining' state, we have to get all the data with one call + // to ReadFile(). Note that message mode pipes are not supported here. + if (state == Draining) { + Q_ASSERT(bytesToRead == qint64(numberOfBytesRead)); + return; + } + + // We need to loop until all pending data has been read and an + // operation is queued for asynchronous completion. + // If the pipe is configured to work in message mode, we read + // the data in chunks. + bytesToRead = qMax(checkPipeState(), minReadBufferSize); } +} - startAsyncRead(); - if (!readyReadPending) { - readyReadPending = true; - emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal()); +/*! + Thread pool callback procedure. + */ +void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult) +{ + Q_UNUSED(instance); + Q_UNUSED(wait); + Q_UNUSED(waitResult); + QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context); + + // Get the result of the asynchronous operation. + DWORD numberOfBytesTransfered = 0; + DWORD errorCode = ERROR_SUCCESS; + if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped, + &numberOfBytesTransfered, FALSE)) + errorCode = GetLastError(); + + pipeReader->mutex.lock(); + + pipeReader->readSequenceStarted = false; + + // Do not overwrite error code, if error has been detected by + // checkPipeState() in waitForPipeClosed(). Also, if the reader was + // stopped, the only reason why this function can be called is the + // completion of a cancellation. No signals should be emitted, and + // no new read sequence should be started in this case. + if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) { + // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation + // specifically for flushing the pipe. + if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED) + errorCode = ERROR_SUCCESS; + + if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered)) + pipeReader->startAsyncReadLocked(); + + if (pipeReader->state == Running && !pipeReader->winEventActPosted) { + pipeReader->winEventActPosted = true; + pipeReader->mutex.unlock(); + QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct)); + } else { + pipeReader->mutex.unlock(); + } + } else { + pipeReader->mutex.unlock(); } + + // We set the event only after unlocking to avoid additional context + // switches due to the released thread immediately running into the lock. + SetEvent(pipeReader->syncHandle); } /*! - \internal - Starts an asynchronous read sequence on the pipe. + Will be called whenever the read operation completes. Returns \c true if + no error occurred; otherwise returns \c false. */ -void QWindowsPipeReader::startAsyncRead() +bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead) { - if (readSequenceStarted) - return; + // ERROR_MORE_DATA is not an error. We're connected to a message mode + // pipe and the message didn't fit into the pipe's system + // buffer. We will read the remaining data in the next call. + if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) { + readyReadPending = true; + pendingReadBytes += numberOfBytesRead; + readBuffer.truncate(actualReadBufferSize + pendingReadBytes); + return true; + } - state = Running; - startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize)); + lastError = errorCode; + return false; } /*! - \internal - Starts a new read sequence. + Receives notification that the read operation has completed. */ -void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead) +bool QWindowsPipeReader::event(QEvent *e) { - Q_ASSERT(bytesToRead != 0); + if (e->type() == QEvent::WinEventAct) { + consumePendingAndEmit(true); + return true; + } + return QObject::event(e); +} - if (pipeBroken) - return; +/*! + Updates the read buffer size and emits pending signals in the main thread. + Returns \c true, if readyRead() was emitted. + */ +bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting) +{ + mutex.lock(); - if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { - bytesToRead = readBufferMaxSize - readBuffer.size(); - if (bytesToRead <= 0) { - // Buffer is full. User must read data from the buffer - // before we can read more from the pipe. - return; - } + // Enable QEvent::WinEventAct posting. + if (allowWinActPosting) + winEventActPosted = false; + + const bool emitReadyRead = consumePending(); + const DWORD dwError = lastError; + + mutex.unlock(); + + // Disable any further processing, if the pipe was stopped. + // We are not allowed to emit signals in either 'Stopped' + // or 'Draining' state. + if (state != Running) + return false; + + if (emitReadyRead && !inReadyRead) { + QScopedValueRollback<bool> guard(inReadyRead, true); + emit readyRead(); } - char *ptr = readBuffer.reserve(bytesToRead); - - readSequenceStarted = true; - overlapped.clear(); - if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) { - readSequenceStarted = false; - - const DWORD dwError = GetLastError(); - switch (dwError) { - case ERROR_BROKEN_PIPE: - case ERROR_PIPE_NOT_CONNECTED: - // It may happen, that the other side closes the connection directly - // after writing data. Then we must set the appropriate socket state. - pipeBroken = true; - emit pipeClosed(); - break; - default: - emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); - break; - } + // Trigger 'pipeBroken' only once. + if (dwError != ERROR_SUCCESS && !pipeBroken) { + pipeBroken = true; + if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED) + emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit")); + emit pipeClosed(); } + + return emitReadyRead; } /*! - \internal - Called when ReadFileEx finished the read operation. + Updates the read buffer size. Returns \c true, if readyRead() + should be emitted. Thread-safety should be ensured by the caller. */ -void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase) +bool QWindowsPipeReader::consumePending() { - Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase); - overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered); + if (readyReadPending) { + readyReadPending = false; + actualReadBufferSize += pendingReadBytes; + pendingReadBytes = 0; + return true; + } + + return false; } /*! - \internal Returns the number of available bytes in the pipe. - Sets QWindowsPipeReader::pipeBroken to true if the connection is broken. */ DWORD QWindowsPipeReader::checkPipeState() { DWORD bytes; if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr)) return bytes; - if (!pipeBroken) { - pipeBroken = true; - emitPipeClosed(); - } + + lastError = GetLastError(); return 0; } -bool QWindowsPipeReader::waitForNotification(int timeout) +bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline) { - QElapsedTimer t; - t.start(); - notifiedCalled = false; - int msecs = timeout; - while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { - if (notifiedCalled) + do { + DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE); + if (waitRet == WAIT_OBJECT_0) return true; - // Some other I/O completion routine was called. Wait some more. - msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - if (!msecs) - break; - } - return notifiedCalled; -} + if (waitRet != WAIT_IO_COMPLETION) + return false; -void QWindowsPipeReader::emitPendingReadyRead() -{ - if (readyReadPending) { - readyReadPending = false; - QScopedValueRollback<bool> guard(inReadyRead, true); - emit readyRead(); - } -} + // Some I/O completion routine was called. Wait some more. + } while (!deadline.hasExpired()); -void QWindowsPipeReader::emitPipeClosed() -{ - // We are not allowed to emit signals in either 'Stopped' - // or 'Draining' state. - if (state == Running) - emit pipeClosed(); + return false; } /*! @@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed() */ bool QWindowsPipeReader::waitForReadyRead(int msecs) { - if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); - return true; - } - - if (!readSequenceStarted) - return false; - - if (!waitForNotification(msecs)) - return false; + QDeadlineTimer timer(msecs); - if (readyReadPending) { - if (!inReadyRead) - emitPendingReadyRead(); - return true; + // Make sure that 'syncHandle' was triggered by the thread pool callback. + while (isReadOperationActive() && waitForNotification(timer)) { + if (consumePendingAndEmit(false)) + return true; } return false; @@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs) bool QWindowsPipeReader::waitForPipeClosed(int msecs) { const int sleepTime = 10; - QElapsedTimer stopWatch; - stopWatch.start(); + QDeadlineTimer timer(msecs); + + while (waitForReadyRead(timer.remainingTime())) {} + if (pipeBroken) + return true; + + if (timer.hasExpired()) + return false; + + // When the read buffer is full, the read sequence is not running, + // so we need to peek the pipe to detect disconnection. forever { - waitForReadyRead(0); checkPipeState(); + consumePendingAndEmit(false); if (pipeBroken) return true; - if (stopWatch.hasExpired(msecs - sleepTime)) + + if (timer.hasExpired()) return false; + Sleep(sleepTime); } } diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h index c61018d87d..a284f55b3b 100644 --- a/src/corelib/io/qwindowspipereader_p.h +++ b/src/corelib/io/qwindowspipereader_p.h @@ -1,6 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -52,6 +53,8 @@ // #include <qobject.h> +#include <qdeadlinetimer.h> +#include <qmutex.h> #include <private/qringbuffer_p.h> #include <qt_windows.h> @@ -70,7 +73,7 @@ public: void stop(); void drainAndStop(); - void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; } + void setMaxReadBufferSize(qint64 size); qint64 maxReadBufferSize() const { return readBufferMaxSize; } bool isPipeClosed() const { return pipeBroken; } @@ -80,46 +83,46 @@ public: bool waitForReadyRead(int msecs); bool waitForPipeClosed(int msecs); - bool isReadOperationActive() const { return readSequenceStarted; } + bool isReadOperationActive() const; Q_SIGNALS: void winError(ulong, const QString &); void readyRead(); void pipeClosed(); - void _q_queueReadyRead(QPrivateSignal); + +protected: + bool event(QEvent *e) override; private: - void startAsyncReadHelper(qint64 bytesToRead); - void cancelAsyncRead(); - static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase); - void notified(DWORD errorCode, DWORD numberOfBytesRead); + enum State { Stopped, Running, Draining }; + + void startAsyncReadLocked(); + void cancelAsyncRead(State newState); + static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult); + bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead); DWORD checkPipeState(); - bool waitForNotification(int timeout); - void emitPendingReadyRead(); - void emitPipeClosed(); - - class Overlapped : public OVERLAPPED - { - Q_DISABLE_COPY_MOVE(Overlapped) - public: - explicit Overlapped(QWindowsPipeReader *reader); - void clear(); - QWindowsPipeReader *pipeReader; - }; + bool waitForNotification(const QDeadlineTimer &deadline); + bool consumePendingAndEmit(bool allowWinActPosting); + bool consumePending(); HANDLE handle; - Overlapped overlapped; + HANDLE eventHandle; + HANDLE syncHandle; + PTP_WAIT waitObject; + OVERLAPPED overlapped; qint64 readBufferMaxSize; QRingBuffer readBuffer; qint64 actualReadBufferSize; - qint64 bytesPending; + qint64 pendingReadBytes; + mutable QMutex mutex; + DWORD lastError; - enum State { Stopped, Running, Draining } state; + State state; bool readSequenceStarted; - bool notifiedCalled; bool pipeBroken; bool readyReadPending; + bool winEventActPosted; bool inReadyRead; }; diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp index e374034a06..5ed584c6e3 100644 --- a/src/corelib/io/qwindowspipewriter.cpp +++ b/src/corelib/io/qwindowspipewriter.cpp @@ -1,6 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -38,189 +39,296 @@ ****************************************************************************/ #include "qwindowspipewriter_p.h" -#include "qiodevice_p.h" #include <qscopedvaluerollback.h> +#include <qcoreapplication.h> +#include <QMutexLocker> QT_BEGIN_NAMESPACE -QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter) - : pipeWriter(pipeWriter) -{ -} - -void QWindowsPipeWriter::Overlapped::clear() -{ - ZeroMemory(this, sizeof(OVERLAPPED)); -} - - QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent) : QObject(parent), handle(pipeWriteEnd), - overlapped(this), + eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)), + waitObject(NULL), pendingBytesWrittenValue(0), + lastError(ERROR_SUCCESS), stopped(true), writeSequenceStarted(false), - notifiedCalled(false), bytesWrittenPending(false), + winEventActPosted(false), inBytesWritten(false) { - connect(this, &QWindowsPipeWriter::_q_queueBytesWritten, - this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection); + ZeroMemory(&overlapped, sizeof(OVERLAPPED)); + overlapped.hEvent = eventHandle; + waitObject = CreateThreadpoolWait(waitCallback, this, NULL); + if (waitObject == NULL) + qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed."); } QWindowsPipeWriter::~QWindowsPipeWriter() { stop(); + CloseThreadpoolWait(waitObject); + CloseHandle(eventHandle); + CloseHandle(syncHandle); } -bool QWindowsPipeWriter::waitForWrite(int msecs) +/*! + Stops the asynchronous write sequence. + If the write sequence is running then the I/O operation is canceled. + */ +void QWindowsPipeWriter::stop() { - if (bytesWrittenPending) { - emitPendingBytesWrittenValue(); - return true; - } - - if (!writeSequenceStarted) - return false; - - if (!waitForNotification(msecs)) - return false; + if (stopped) + return; - if (bytesWrittenPending) { - emitPendingBytesWrittenValue(); - return true; + mutex.lock(); + stopped = true; + if (writeSequenceStarted) { + // Trying to disable callback before canceling the operation. + // Callback invocation is unnecessary here. + SetThreadpoolWait(waitObject, NULL, NULL); + if (!CancelIoEx(handle, &overlapped)) { + const DWORD dwError = GetLastError(); + if (dwError != ERROR_NOT_FOUND) { + qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.", + handle); + } + } + writeSequenceStarted = false; } + mutex.unlock(); - return false; + WaitForThreadpoolWaitCallbacks(waitObject, TRUE); } +/*! + Returns \c true if async operation is in progress or a bytesWritten + signal is pending. + */ +bool QWindowsPipeWriter::isWriteOperationActive() const +{ + QMutexLocker locker(&mutex); + return writeSequenceStarted || bytesWrittenPending; +} + +/*! + Returns the number of bytes that are waiting to be written. + */ qint64 QWindowsPipeWriter::bytesToWrite() const { - return buffer.size() + pendingBytesWrittenValue; + QMutexLocker locker(&mutex); + return writeBuffer.size() + pendingBytesWrittenValue; } -void QWindowsPipeWriter::emitPendingBytesWrittenValue() +/*! + Writes data to the pipe. + */ +bool QWindowsPipeWriter::write(const QByteArray &ba) { - if (bytesWrittenPending) { - // Reset the state even if we don't emit bytesWritten(). - // It's a defined behavior to not re-emit this signal recursively. - bytesWrittenPending = false; - const qint64 bytes = pendingBytesWrittenValue; - pendingBytesWrittenValue = 0; - - emit canWrite(); - if (!inBytesWritten) { - QScopedValueRollback<bool> guard(inBytesWritten, true); - emit bytesWritten(bytes); - } + QMutexLocker locker(&mutex); + + if (lastError != ERROR_SUCCESS) + return false; + + writeBuffer.append(ba); + if (writeSequenceStarted) + return true; + + stopped = false; + startAsyncWriteLocked(); + + // Do not post the event, if the write operation will be completed asynchronously. + if (!bytesWrittenPending) + return true; + + if (!winEventActPosted) { + winEventActPosted = true; + locker.unlock(); + QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct)); + } else { + locker.unlock(); } + + SetEvent(syncHandle); + return true; } -void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase) +/*! + Starts a new write sequence. Thread-safety should be ensured by the caller. + */ +void QWindowsPipeWriter::startAsyncWriteLocked() { - Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase); - overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered); + while (!writeBuffer.isEmpty()) { + // WriteFile() returns true, if the write operation completes synchronously. + // We don't need to call GetOverlappedResult() additionally, because + // 'numberOfBytesWritten' is valid in this case. + DWORD numberOfBytesWritten; + DWORD errorCode = ERROR_SUCCESS; + if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(), + &numberOfBytesWritten, &overlapped)) { + errorCode = GetLastError(); + if (errorCode == ERROR_IO_PENDING) { + // Operation has been queued and will complete in the future. + writeSequenceStarted = true; + SetThreadpoolWait(waitObject, eventHandle, NULL); + return; + } + } + + if (!writeCompleted(errorCode, numberOfBytesWritten)) + return; + } } /*! - \internal - Will be called whenever the write operation completes. + Thread pool callback procedure. */ -void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten) +void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult) { - notifiedCalled = true; - writeSequenceStarted = false; - Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size())); - buffer.clear(); - - switch (errorCode) { - case ERROR_SUCCESS: - break; - case ERROR_OPERATION_ABORTED: - if (stopped) - break; - Q_FALLTHROUGH(); - default: - qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed."); - break; - } + Q_UNUSED(instance); + Q_UNUSED(wait); + Q_UNUSED(waitResult); + QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context); + + // Get the result of the asynchronous operation. + DWORD numberOfBytesTransfered = 0; + DWORD errorCode = ERROR_SUCCESS; + if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped, + &numberOfBytesTransfered, FALSE)) + errorCode = GetLastError(); + + QMutexLocker locker(&pipeWriter->mutex); // After the writer was stopped, the only reason why this function can be called is the - // completion of a cancellation. No signals should be emitted, and no new write sequence should - // be started in this case. - if (stopped) + // completion of a cancellation. No signals should be emitted, and no new write sequence + // should be started in this case. + if (pipeWriter->stopped) return; - pendingBytesWrittenValue += qint64(numberOfBytesWritten); - if (!bytesWrittenPending) { - bytesWrittenPending = true; - emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal()); + pipeWriter->writeSequenceStarted = false; + + if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered)) + pipeWriter->startAsyncWriteLocked(); + + if (pipeWriter->lastError == ERROR_SUCCESS && !pipeWriter->winEventActPosted) { + pipeWriter->winEventActPosted = true; + locker.unlock(); + QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct)); + } else { + locker.unlock(); } + + // We set the event only after unlocking to avoid additional context + // switches due to the released thread immediately running into the lock. + SetEvent(pipeWriter->syncHandle); } -bool QWindowsPipeWriter::waitForNotification(int timeout) +/*! + Will be called whenever the write operation completes. Returns \c true if + no error occurred; otherwise returns \c false. + */ +bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten) { - QElapsedTimer t; - t.start(); - notifiedCalled = false; - int msecs = timeout; - while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) { - if (notifiedCalled) - return true; + if (errorCode == ERROR_SUCCESS) { + Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize())); - // Some other I/O completion routine was called. Wait some more. - msecs = qt_subtract_from_timeout(timeout, t.elapsed()); - if (!msecs) - break; + bytesWrittenPending = true; + pendingBytesWrittenValue += numberOfBytesWritten; + writeBuffer.free(numberOfBytesWritten); + return true; } - return notifiedCalled; + + lastError = errorCode; + writeBuffer.clear(); + // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn. + if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA) + qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed."); + return false; } -bool QWindowsPipeWriter::write(const QByteArray &ba) +/*! + Receives notification that the write operation has completed. + */ +bool QWindowsPipeWriter::event(QEvent *e) { - if (writeSequenceStarted) + if (e->type() == QEvent::WinEventAct) { + consumePendingAndEmit(true); + return true; + } + return QObject::event(e); +} + +/*! + Updates the state and emits pending signals in the main thread. + Returns \c true, if bytesWritten() was emitted. + */ +bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting) +{ + QMutexLocker locker(&mutex); + + // Enable QEvent::WinEventAct posting. + if (allowWinActPosting) + winEventActPosted = false; + + if (!bytesWrittenPending) return false; - overlapped.clear(); - buffer = ba; - stopped = false; - writeSequenceStarted = true; - if (!WriteFileEx(handle, buffer.constData(), buffer.size(), - &overlapped, &writeFileCompleted)) { - writeSequenceStarted = false; - buffer.clear(); - - const DWORD errorCode = GetLastError(); - switch (errorCode) { - case ERROR_NO_DATA: // "The pipe is being closed." - // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn. - break; - default: - qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed."); - } + // Reset the state even if we don't emit bytesWritten(). + // It's a defined behavior to not re-emit this signal recursively. + bytesWrittenPending = false; + qint64 numberOfBytesWritten = pendingBytesWrittenValue; + pendingBytesWrittenValue = 0; + + locker.unlock(); + + // Disable any further processing, if the pipe was stopped. + if (stopped) return false; + + emit canWrite(); + if (!inBytesWritten) { + QScopedValueRollback<bool> guard(inBytesWritten, true); + emit bytesWritten(numberOfBytesWritten); } return true; } -void QWindowsPipeWriter::stop() +bool QWindowsPipeWriter::waitForNotification(const QDeadlineTimer &deadline) { - stopped = true; - bytesWrittenPending = false; - pendingBytesWrittenValue = 0; - if (writeSequenceStarted) { - if (!CancelIoEx(handle, &overlapped)) { - const DWORD dwError = GetLastError(); - if (dwError != ERROR_NOT_FOUND) { - qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.", - handle); - } - } - waitForNotification(-1); + do { + DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE); + if (waitRet == WAIT_OBJECT_0) + return true; + + if (waitRet != WAIT_IO_COMPLETION) + return false; + + // Some I/O completion routine was called. Wait some more. + } while (!deadline.hasExpired()); + + return false; +} + +/*! + Waits for the completion of the asynchronous write operation. + Returns \c true, if we've emitted the bytesWritten signal (non-recursive case) + or bytesWritten will be emitted by the event loop (recursive case). + */ +bool QWindowsPipeWriter::waitForWrite(int msecs) +{ + QDeadlineTimer timer(msecs); + + // Make sure that 'syncHandle' was triggered by the thread pool callback. + while (isWriteOperationActive() && waitForNotification(timer)) { + if (consumePendingAndEmit(false)) + return true; } + + return false; } QT_END_NAMESPACE diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h index 39e8ffe40a..b648d7b846 100644 --- a/src/corelib/io/qwindowspipewriter_p.h +++ b/src/corelib/io/qwindowspipewriter_p.h @@ -1,6 +1,7 @@ /**************************************************************************** ** ** Copyright (C) 2016 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> ** Contact: https://www.qt.io/licensing/ ** ** This file is part of the QtCore module of the Qt Toolkit. @@ -54,7 +55,10 @@ #include <QtCore/private/qglobal_p.h> #include <qelapsedtimer.h> #include <qobject.h> -#include <qbytearray.h> +#include <qdeadlinetimer.h> +#include <qmutex.h> +#include <private/qringbuffer_p.h> + #include <qt_windows.h> QT_BEGIN_NAMESPACE @@ -117,39 +121,37 @@ public: bool write(const QByteArray &ba); void stop(); bool waitForWrite(int msecs); - bool isWriteOperationActive() const { return writeSequenceStarted; } + bool isWriteOperationActive() const; qint64 bytesToWrite() const; Q_SIGNALS: void canWrite(); void bytesWritten(qint64 bytes); - void _q_queueBytesWritten(QPrivateSignal); -private: - static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered, - OVERLAPPED *overlappedBase); - void notified(DWORD errorCode, DWORD numberOfBytesWritten); - bool waitForNotification(int timeout); - void emitPendingBytesWrittenValue(); +protected: + bool event(QEvent *e) override; - class Overlapped : public OVERLAPPED - { - Q_DISABLE_COPY_MOVE(Overlapped) - public: - explicit Overlapped(QWindowsPipeWriter *pipeWriter); - void clear(); - - QWindowsPipeWriter *pipeWriter; - }; +private: + void startAsyncWriteLocked(); + static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context, + PTP_WAIT wait, TP_WAIT_RESULT waitResult); + bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten); + bool waitForNotification(const QDeadlineTimer &deadline); + bool consumePendingAndEmit(bool allowWinActPosting); HANDLE handle; - Overlapped overlapped; - QByteArray buffer; + HANDLE eventHandle; + HANDLE syncHandle; + PTP_WAIT waitObject; + OVERLAPPED overlapped; + QRingBuffer writeBuffer; qint64 pendingBytesWrittenValue; + mutable QMutex mutex; + DWORD lastError; bool stopped; bool writeSequenceStarted; - bool notifiedCalled; bool bytesWrittenPending; + bool winEventActPosted; bool inBytesWritten; }; diff --git a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp index eba4aa4790..65f2329e3d 100644 --- a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp +++ b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp @@ -29,12 +29,15 @@ #include <QTest> #include <QEvent> +#include <QtTest/QSignalSpy> #include <QtCore/qthread.h> #include <QtGui/qguiapplication.h> #include <QtGui/qpainter.h> #include <QtGui/qrasterwindow.h> #include <QtNetwork/qtcpserver.h> #include <QtNetwork/qtcpsocket.h> +#include <QtNetwork/qlocalserver.h> +#include <QtNetwork/qlocalsocket.h> #include <QtCore/qelapsedtimer.h> #include <QtCore/qtimer.h> #include <QtCore/qwineventnotifier.h> @@ -51,6 +54,7 @@ class tst_NoQtEventLoop : public QObject private slots: void consumeMouseEvents(); void consumeSocketEvents(); + void consumeLocalSocketEvents(); void consumeWinEvents_data(); void consumeWinEvents(); void deliverEventsInLivelock(); @@ -318,6 +322,44 @@ void tst_NoQtEventLoop::consumeSocketEvents() QVERIFY(server.hasPendingConnections()); } +void tst_NoQtEventLoop::consumeLocalSocketEvents() +{ + int argc = 1; + char *argv[] = { const_cast<char *>("test"), 0 }; + QGuiApplication app(argc, argv); + QLocalServer server; + QLocalSocket client; + QSignalSpy readyReadSpy(&client, &QIODevice::readyRead); + + QVERIFY(server.listen("consumeLocalSocketEvents")); + client.connectToServer("consumeLocalSocketEvents"); + QVERIFY(client.waitForConnected(200)); + QVERIFY(server.waitForNewConnection(200)); + QLocalSocket *clientSocket = server.nextPendingConnection(); + QVERIFY(clientSocket); + QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten); + server.close(); + + bool timeExpired = false; + QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() { + timeExpired = true; + }); + QVERIFY(clientSocket->putChar(0)); + + // Exec own message loop + MSG msg; + while (::GetMessage(&msg, NULL, 0, 0)) { + ::TranslateMessage(&msg); + ::DispatchMessage(&msg); + + if (timeExpired || readyReadSpy.count() != 0) + break; + } + QVERIFY(!timeExpired); + QCOMPARE(bytesWrittenSpy.count(), 1); + QCOMPARE(readyReadSpy.count(), 1); +} + void tst_NoQtEventLoop::consumeWinEvents_data() { QTest::addColumn<bool>("peeking"); diff --git a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp index c99ca990da..8b2b4ea4da 100644 --- a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp +++ b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp @@ -639,26 +639,6 @@ void tst_QLocalSocket::readBufferOverflow() QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize)); // no more bytes available QCOMPARE(client.bytesAvailable(), 0); - -#ifdef Q_OS_WIN - serverSocket->write(buffer, readBufferSize); - QVERIFY(serverSocket->waitForBytesWritten()); - - // ensure the read completion routine is called - SleepEx(100, true); - QVERIFY(client.waitForReadyRead()); - QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize)); - - // Test overflow caused by an asynchronous pipe operation. - client.setReadBufferSize(1); - serverSocket->write(buffer, 2); - - QVERIFY(client.waitForReadyRead()); - // socket disconnects, if there any error on pipe - QCOMPARE(client.state(), QLocalSocket::ConnectedState); - QCOMPARE(client.bytesAvailable(), qint64(2)); - QCOMPARE(client.read(buffer, 2), qint64(2)); -#endif } static qint64 writeCommand(const QVariant &command, QIODevice *device, int commandCounter) diff --git a/tests/benchmarks/network/socket/CMakeLists.txt b/tests/benchmarks/network/socket/CMakeLists.txt index 6d54bc05f5..7c122a73ef 100644 --- a/tests/benchmarks/network/socket/CMakeLists.txt +++ b/tests/benchmarks/network/socket/CMakeLists.txt @@ -1,4 +1,5 @@ # Generated from socket.pro. +add_subdirectory(qlocalsocket) add_subdirectory(qtcpserver) add_subdirectory(qudpsocket) diff --git a/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt new file mode 100644 index 0000000000..7c56b0b946 --- /dev/null +++ b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt @@ -0,0 +1,14 @@ +##################################################################### +## tst_bench_qlocalsocket Binary: +##################################################################### + +qt_internal_add_benchmark(tst_bench_qlocalsocket + SOURCES + tst_qlocalsocket.cpp + PUBLIC_LIBRARIES + Qt::Network + Qt::Test +) + +#### Keys ignored in scope 1:.:.:qlocalsocket.pro:<TRUE>: +# TEMPLATE = "app" diff --git a/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp new file mode 100644 index 0000000000..86112f442d --- /dev/null +++ b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp @@ -0,0 +1,225 @@ +/**************************************************************************** +** +** Copyright (C) 2021 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com> +** Contact: https://www.qt.io/licensing/ +** +** This file is part of the test suite of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:GPL-EXCEPT$ +** Commercial License Usage +** Licensees holding valid commercial Qt licenses may use this file in +** accordance with the commercial license agreement provided with the +** Software or, alternatively, in accordance with the terms contained in +** a written agreement between you and The Qt Company. For licensing terms +** and conditions see https://www.qt.io/terms-conditions. For further +** information use the contact form at https://www.qt.io/contact-us. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 3 as published by the Free Software +** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT +** included in the packaging of this file. Please review the following +** information to ensure the GNU General Public License requirements will +** be met: https://www.gnu.org/licenses/gpl-3.0.html. +** +** $QT_END_LICENSE$ +** +****************************************************************************/ + +#include <QTest> +#include <QtCore/qglobal.h> +#include <QtCore/qthread.h> +#include <QtCore/qsemaphore.h> +#include <QtCore/qbytearray.h> +#include <QtCore/qeventloop.h> +#include <QtCore/qvector.h> +#include <QtCore/qelapsedtimer.h> +#include <QtNetwork/qlocalsocket.h> +#include <QtNetwork/qlocalserver.h> + +class tst_QLocalSocket : public QObject +{ + Q_OBJECT + +private slots: + void pingPong_data(); + void pingPong(); + void dataExchange_data(); + void dataExchange(); +}; + +class ServerThread : public QThread +{ +public: + QSemaphore running; + + explicit ServerThread(int chunkSize) + { + buffer.resize(chunkSize); + } + + void run() override + { + QLocalServer server; + + connect(&server, &QLocalServer::newConnection, [this, &server]() { + auto socket = server.nextPendingConnection(); + + connect(socket, &QLocalSocket::readyRead, [this, socket]() { + const qint64 bytesAvailable = socket->bytesAvailable(); + Q_ASSERT(bytesAvailable <= this->buffer.size()); + + QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable); + QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable); + }); + }); + + QVERIFY(server.listen("foo")); + running.release(); + exec(); + } + +protected: + QByteArray buffer; +}; + +class SocketFactory : public QObject +{ + Q_OBJECT + +public: + bool stopped = false; + + explicit SocketFactory(int chunkSize, int connections) + { + buffer.resize(chunkSize); + for (int i = 0; i < connections; ++i) { + QLocalSocket *socket = new QLocalSocket(this); + Q_CHECK_PTR(socket); + + connect(this, &SocketFactory::start, [this, socket]() { + QCOMPARE(socket->write(this->buffer), this->buffer.size()); + }); + + connect(socket, &QLocalSocket::readyRead, [i, this, socket]() { + const qint64 bytesAvailable = socket->bytesAvailable(); + Q_ASSERT(bytesAvailable <= this->buffer.size()); + + QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable); + emit this->bytesReceived(i, bytesAvailable); + + if (!this->stopped) + QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable); + }); + + socket->connectToServer("foo"); + QCOMPARE(socket->state(), QLocalSocket::ConnectedState); + } + } + +signals: + void start(); + void bytesReceived(int channel, qint64 bytes); + +protected: + QByteArray buffer; +}; + +void tst_QLocalSocket::pingPong_data() +{ + QTest::addColumn<int>("connections"); + for (int value : {10, 50, 100, 1000, 5000}) + QTest::addRow("connections: %d", value) << value; +} + +void tst_QLocalSocket::pingPong() +{ + QFETCH(int, connections); + + const int iterations = 100000; + Q_ASSERT(iterations >= connections && connections > 0); + + ServerThread serverThread(1); + serverThread.start(); + // Wait for server to start. + QVERIFY(serverThread.running.tryAcquire(1, 3000)); + + SocketFactory factory(1, connections); + QEventLoop eventLoop; + QVector<qint64> bytesToRead; + QElapsedTimer timer; + + bytesToRead.fill(iterations / connections, connections); + connect(&factory, &SocketFactory::bytesReceived, + [&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) { + Q_UNUSED(bytes); + + if (--bytesToRead[channel] == 0 && --connections == 0) { + factory.stopped = true; + eventLoop.quit(); + } + }); + + timer.start(); + emit factory.start(); + eventLoop.exec(); + + qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0); + serverThread.quit(); + serverThread.wait(); +} + +void tst_QLocalSocket::dataExchange_data() +{ + QTest::addColumn<int>("connections"); + QTest::addColumn<int>("chunkSize"); + for (int connections : {1, 5, 10}) { + for (int chunkSize : {100, 1000, 10000, 100000}) { + QTest::addRow("connections: %d, chunk size: %d", + connections, chunkSize) << connections << chunkSize; + } + } +} + +void tst_QLocalSocket::dataExchange() +{ + QFETCH(int, connections); + QFETCH(int, chunkSize); + + Q_ASSERT(chunkSize > 0 && connections > 0); + const qint64 timeToTest = 5000; + + ServerThread serverThread(chunkSize); + serverThread.start(); + // Wait for server to start. + QVERIFY(serverThread.running.tryAcquire(1, 3000)); + + SocketFactory factory(chunkSize, connections); + QEventLoop eventLoop; + qint64 totalReceived = 0; + QElapsedTimer timer; + + connect(&factory, &SocketFactory::bytesReceived, + [&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) { + Q_UNUSED(channel); + + totalReceived += bytes; + if (timer.elapsed() >= timeToTest) { + factory.stopped = true; + eventLoop.quit(); + } + }); + + timer.start(); + emit factory.start(); + eventLoop.exec(); + + qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed()); + serverThread.quit(); + serverThread.wait(); +} + +QTEST_MAIN(tst_QLocalSocket) + +#include "tst_qlocalsocket.moc" |