summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Trotsenko <alex1973tr@gmail.com>2020-10-05 19:22:49 +0300
committerAlex Trotsenko <alex1973tr@gmail.com>2020-11-17 12:45:50 +0200
commitee122077b09430da54ca09750589b37326a22d85 (patch)
treefa06b0006bddc56fa68045d827275dc52c14f1ee
parent6be39809b038768a665b0e29a3a3668fdc424d9a (diff)
Allow QWindowsPipe{Reader,Writer} to work with foreign event loops
When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution , but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I1cd87c07db39f3b46a2683ce236d7eb67b5be549 Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
-rw-r--r--src/corelib/io/qwindowspipereader.cpp383
-rw-r--r--src/corelib/io/qwindowspipereader_p.h39
-rw-r--r--src/corelib/io/qwindowspipewriter.cpp353
-rw-r--r--src/corelib/io/qwindowspipewriter_p.h42
-rw-r--r--tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp41
-rw-r--r--tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp5
-rw-r--r--tests/benchmarks/network/socket/CMakeLists.txt1
-rw-r--r--tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt16
-rw-r--r--tests/benchmarks/network/socket/qlocalsocket/qlocalsocket.pro8
-rw-r--r--tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp224
-rw-r--r--tests/benchmarks/network/socket/socket.pro1
11 files changed, 827 insertions, 286 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index c20909766d..3e1e212df2 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -41,61 +41,75 @@
#include "qiodevice_p.h"
#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
+ \internal
Sets the handle to read from. The handle must be valid.
+ Do not call this function if the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
+ \internal
Stops the asynchronous read sequence.
If the read sequence is running then the I/O operation is canceled.
*/
void QWindowsPipeReader::stop()
{
+ if (stopped)
+ return;
+
+ mutex.lock();
stopped = true;
if (readSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -103,8 +117,33 @@ void QWindowsPipeReader::stop()
handle);
}
}
- waitForNotification(-1);
+ readSequenceStarted = false;
}
+ mutex.unlock();
+
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
+}
+
+/*!
+ \internal
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ \internal
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+ */
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
}
/*!
@@ -123,6 +162,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -133,9 +173,10 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
- if (!readSequenceStarted && !stopped)
+ if (!stopped)
startAsyncRead();
if (readSoFar == 0)
return -2; // signal EWOULDBLOCK
@@ -144,131 +185,220 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ \internal
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
\internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emit pipeClosed();
+ stopped = false;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
}
}
/*!
\internal
- Reads data from the pipe into the readbuffer.
+ Starts a new read sequence. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::startAsyncRead()
+void QWindowsPipeReader::startAsyncReadLocked()
{
const DWORD minReadBufferSize = 4096;
- qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
- if (pipeBroken)
- return;
-
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
+ forever {
+ qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
+ if (lastError != ERROR_SUCCESS)
return;
+
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
}
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped))
+ break;
+
+ readCompleted(ERROR_SUCCESS, numberOfBytesRead);
+ }
+
+ const DWORD dwError = GetLastError();
+ if (dwError == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ } else {
+ // Any other errors are treated as EOF.
+ readCompleted(dwError, 0);
}
+}
- char *ptr = readBuffer.reserve(bytesToRead);
+/*!
+ \internal
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
- stopped = false;
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ QMutexLocker locker(&pipeReader->mutex);
+
+ // After the reader was stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and no new read sequence should
+ // be started in this case.
+ if (pipeReader->stopped)
+ return;
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed().
+ if (pipeReader->lastError != ERROR_SUCCESS)
+ return;
+
+ pipeReader->readCompleted(errorCode, numberOfBytesTransfered);
+ if (pipeReader->lastError == ERROR_SUCCESS)
+ pipeReader->startAsyncReadLocked();
+
+ if (!pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
+ SetEvent(pipeReader->syncHandle);
+}
+
+/*!
+ \internal
+ Will be called whenever the read operation completes.
+ */
+void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
+{
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ } else {
+ lastError = errorCode;
+ }
+}
+
+/*!
+ \internal
+ Receives notification that the read operation has completed.
+ */
+bool QWindowsPipeReader::event(QEvent *e)
+{
+ if (e->type() == QEvent::WinEventAct) {
+ emitPendingSignals(true);
+ return true;
}
+ return QObject::event(e);
}
/*!
\internal
- Called when ReadFileEx finished the read operation.
+ Emits pending signals in the main thread. Returns \c true,
+ if readyRead() was emitted.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting)
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ mutex.lock();
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ bool emitReadyRead = false;
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ emitReadyRead = true;
+ }
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
+ }
+
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals"));
+ emit pipeClosed();
+ }
+
+ return emitReadyRead;
}
/*!
\internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
+ Sets QWindowsPipeReader::lastError if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emit pipeClosed();
- }
+
+ readCompleted(GetLastError(), 0);
return 0;
}
@@ -276,27 +406,21 @@ bool QWindowsPipeReader::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
- notifiedCalled = false;
int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle,
+ msecs == -1 ? INFINITE : msecs, TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ } while (msecs != 0);
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
+ return false;
}
/*!
@@ -306,25 +430,21 @@ void QWindowsPipeReader::emitPendingReadyRead()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
+ if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize)
return false;
- if (!waitForNotification(msecs))
- return false;
+ // Prepare handle for waiting.
+ ResetEvent(syncHandle);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
+ // It is necessary to check if there is already data in the queue.
+ if (emitPendingSignals(false))
return true;
- }
- return false;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ if (pipeBroken || !waitForNotification(msecs))
+ return false;
+
+ return emitPendingSignals(false);
}
/*!
@@ -337,9 +457,18 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs)
stopWatch.start();
forever {
waitForReadyRead(0);
+ if (pipeBroken)
+ return true;
+
+ // When the read buffer is full, the read sequence is not running.
+ // So, we should peek the pipe to detect disconnect.
+ mutex.lock();
checkPipeState();
+ mutex.unlock();
+ emitPendingSignals(false);
if (pipeBroken)
return true;
+
if (stopWatch.hasExpired(msecs - sleepTime))
return false;
Sleep(sleepTime);
diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h
index 2842343597..5974833a86 100644
--- a/src/corelib/io/qwindowspipereader_p.h
+++ b/src/corelib/io/qwindowspipereader_p.h
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -52,6 +52,7 @@
//
#include <qobject.h>
+#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
@@ -69,7 +70,7 @@ public:
void startAsyncRead();
void stop();
- void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
+ void setMaxReadBufferSize(qint64 size);
qint64 maxReadBufferSize() const { return readBufferMaxSize; }
bool isPipeClosed() const { return pipeBroken; }
@@ -79,41 +80,41 @@ public:
bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs);
- bool isReadOperationActive() const { return readSequenceStarted; }
+ bool isReadOperationActive() const;
Q_SIGNALS:
void winError(ulong, const QString &);
void readyRead();
void pipeClosed();
- void _q_queueReadyRead(QPrivateSignal);
+
+protected:
+ bool event(QEvent *e) override;
private:
- static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesRead);
+ void startAsyncReadLocked();
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ void readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
bool waitForNotification(int timeout);
- void emitPendingReadyRead();
-
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeReader *reader);
- void clear();
- QWindowsPipeReader *pipeReader;
- };
+ bool emitPendingSignals(bool allowWinActPosting);
HANDLE handle;
- Overlapped overlapped;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
qint64 readBufferMaxSize;
QRingBuffer readBuffer;
qint64 actualReadBufferSize;
+ qint64 pendingReadBytes;
+ mutable QMutex mutex;
+ DWORD lastError;
bool stopped;
bool readSequenceStarted;
- bool notifiedCalled;
bool pipeBroken;
bool readyReadPending;
+ bool winEventActPosted;
bool inReadyRead;
};
diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp
index e374034a06..6cea9f3a5e 100644
--- a/src/corelib/io/qwindowspipewriter.cpp
+++ b/src/corelib/io/qwindowspipewriter.cpp
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -40,187 +40,306 @@
#include "qwindowspipewriter_p.h"
#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
QT_BEGIN_NAMESPACE
-QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
- : pipeWriter(pipeWriter)
-{
-}
-
-void QWindowsPipeWriter::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
+ waitObject(NULL),
pendingBytesWrittenValue(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
- notifiedCalled(false),
bytesWrittenPending(false),
+ winEventActPosted(false),
inBytesWritten(false)
{
- connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
- this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
-bool QWindowsPipeWriter::waitForWrite(int msecs)
+/*!
+ \internal
+ Stops the asynchronous write sequence.
+ If the write sequence is running then the I/O operation is canceled.
+ */
+void QWindowsPipeWriter::stop()
{
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
+ if (stopped)
+ return;
+
+ mutex.lock();
+ stopped = true;
+ if (writeSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
+ if (!CancelIoEx(handle, &overlapped)) {
+ const DWORD dwError = GetLastError();
+ if (dwError != ERROR_NOT_FOUND) {
+ qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
+ handle);
+ }
+ }
+ writeSequenceStarted = false;
}
+ mutex.unlock();
- if (!writeSequenceStarted)
- return false;
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
+}
- if (!waitForNotification(msecs))
+/*!
+ \internal
+ Returns \c true if async operation is in progress or a bytesWritten
+ signal is pending.
+ */
+bool QWindowsPipeWriter::isWriteOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return writeSequenceStarted || bytesWrittenPending;
+}
+
+/*!
+ \internal
+ Returns the number of bytes that are waiting to be written.
+ */
+qint64 QWindowsPipeWriter::bytesToWrite() const
+{
+ QMutexLocker locker(&mutex);
+ return writeBuffer.size() + pendingBytesWrittenValue;
+}
+
+/*!
+ \internal
+ Writes data to the pipe.
+ */
+bool QWindowsPipeWriter::write(const QByteArray &ba)
+{
+ QMutexLocker locker(&mutex);
+
+ if (lastError != ERROR_SUCCESS)
return false;
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
+ writeBuffer.append(ba);
+ if (writeSequenceStarted)
return true;
- }
- return false;
-}
+ stopped = false;
+ startAsyncWriteLocked();
-qint64 QWindowsPipeWriter::bytesToWrite() const
-{
- return buffer.size() + pendingBytesWrittenValue;
+ // Do not post the event, if the write operation will be completed asynchronously.
+ if (bytesWrittenPending && !winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ }
+ return true;
}
-void QWindowsPipeWriter::emitPendingBytesWrittenValue()
+/*!
+ \internal
+ Starts a new write sequence. Thread-safety should be ensured by the caller.
+ */
+void QWindowsPipeWriter::startAsyncWriteLocked()
{
- if (bytesWrittenPending) {
- // Reset the state even if we don't emit bytesWritten().
- // It's a defined behavior to not re-emit this signal recursively.
- bytesWrittenPending = false;
- const qint64 bytes = pendingBytesWrittenValue;
- pendingBytesWrittenValue = 0;
-
- emit canWrite();
- if (!inBytesWritten) {
- QScopedValueRollback<bool> guard(inBytesWritten, true);
- emit bytesWritten(bytes);
+ forever {
+ if (writeBuffer.isEmpty())
+ return;
+
+ // WriteFile() returns true, if the write operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesWritten' is valid in this case.
+ DWORD numberOfBytesWritten;
+ if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
+ &numberOfBytesWritten, &overlapped)) {
+ break;
}
+
+ writeCompleted(ERROR_SUCCESS, numberOfBytesWritten);
+ }
+
+ const DWORD dwError = GetLastError();
+ if (dwError == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ writeSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ } else {
+ // Other return values are actual errors.
+ writeCompleted(dwError, 0);
}
}
-void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+/*!
+ \internal
+ Thread pool callback procedure.
+ */
+void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ QMutexLocker locker(&pipeWriter->mutex);
+
+ // After the writer was stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and no new write sequence
+ // should be started in this case.
+ if (pipeWriter->stopped)
+ return;
+
+ pipeWriter->writeSequenceStarted = false;
+ pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered);
+ if (pipeWriter->lastError != ERROR_SUCCESS)
+ return;
+
+ pipeWriter->startAsyncWriteLocked();
+
+ if (!pipeWriter->winEventActPosted) {
+ pipeWriter->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
+ SetEvent(pipeWriter->syncHandle);
}
/*!
\internal
Will be called whenever the write operation completes.
*/
-void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
+void QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
- notifiedCalled = true;
- writeSequenceStarted = false;
- Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
- buffer.clear();
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
- break;
+ if (errorCode == ERROR_SUCCESS) {
+ Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
+
+ bytesWrittenPending = true;
+ pendingBytesWrittenValue += numberOfBytesWritten;
+ writeBuffer.free(numberOfBytesWritten);
+ } else {
+ lastError = errorCode;
+ writeBuffer.clear();
+ // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
+ if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
+ qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
}
+}
- // After the writer was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new write sequence should
- // be started in this case.
+/*!
+ \internal
+ Receives notification that the write operation has completed.
+ */
+bool QWindowsPipeWriter::event(QEvent *e)
+{
+ if (e->type() == QEvent::WinEventAct) {
+ emitPendingSignals(true);
+ return true;
+ }
+ return QObject::event(e);
+}
+
+/*!
+ \internal
+ Emits pending signals in the main thread. Returns \c true,
+ if bytesWritten() was emitted.
+ */
+bool QWindowsPipeWriter::emitPendingSignals(bool allowWinActPosting)
+{
+ QMutexLocker locker(&mutex);
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ if (!bytesWrittenPending)
+ return false;
+
+ // Reset the state even if we don't emit bytesWritten().
+ // It's a defined behavior to not re-emit this signal recursively.
+ bytesWrittenPending = false;
+ qint64 numberOfBytesWritten = pendingBytesWrittenValue;
+ pendingBytesWrittenValue = 0;
+
+ locker.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
if (stopped)
- return;
+ return false;
- pendingBytesWrittenValue += qint64(numberOfBytesWritten);
- if (!bytesWrittenPending) {
- bytesWrittenPending = true;
- emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
+ emit canWrite();
+ if (!inBytesWritten) {
+ QScopedValueRollback<bool> guard(inBytesWritten, true);
+ emit bytesWritten(numberOfBytesWritten);
}
+
+ return true;
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
- notifiedCalled = false;
int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle,
+ msecs == -1 ? INFINITE : msecs, TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
+ } while (msecs != 0);
+
+ return false;
}
-bool QWindowsPipeWriter::write(const QByteArray &ba)
+/*!
+ \internal
+ Waits for the completion of the asynchronous write operation.
+ Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
+ or bytesWritten will be emitted by the event loop (recursive case).
+ */
+bool QWindowsPipeWriter::waitForWrite(int msecs)
{
- if (writeSequenceStarted)
- return false;
+ // Prepare handle for waiting.
+ ResetEvent(syncHandle);
- overlapped.clear();
- buffer = ba;
- stopped = false;
- writeSequenceStarted = true;
- if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
- &overlapped, &writeFileCompleted)) {
- writeSequenceStarted = false;
- buffer.clear();
+ // It is necessary to check if there is already pending signal.
+ if (emitPendingSignals(false))
+ return true;
- const DWORD errorCode = GetLastError();
- switch (errorCode) {
- case ERROR_NO_DATA: // "The pipe is being closed."
- // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
- break;
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
- }
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ if (!isWriteOperationActive() || !waitForNotification(msecs))
return false;
- }
- return true;
-}
-
-void QWindowsPipeWriter::stop()
-{
- stopped = true;
- bytesWrittenPending = false;
- pendingBytesWrittenValue = 0;
- if (writeSequenceStarted) {
- if (!CancelIoEx(handle, &overlapped)) {
- const DWORD dwError = GetLastError();
- if (dwError != ERROR_NOT_FOUND) {
- qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
- handle);
- }
- }
- waitForNotification(-1);
- }
+ return emitPendingSignals(false);
}
QT_END_NAMESPACE
diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h
index 39e8ffe40a..b5a48e926f 100644
--- a/src/corelib/io/qwindowspipewriter_p.h
+++ b/src/corelib/io/qwindowspipewriter_p.h
@@ -1,6 +1,6 @@
/****************************************************************************
**
-** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -54,7 +54,9 @@
#include <QtCore/private/qglobal_p.h>
#include <qelapsedtimer.h>
#include <qobject.h>
-#include <qbytearray.h>
+#include <qmutex.h>
+#include <private/qringbuffer_p.h>
+
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
@@ -117,39 +119,37 @@ public:
bool write(const QByteArray &ba);
void stop();
bool waitForWrite(int msecs);
- bool isWriteOperationActive() const { return writeSequenceStarted; }
+ bool isWriteOperationActive() const;
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
- void _q_queueBytesWritten(QPrivateSignal);
+
+protected:
+ bool event(QEvent *e) override;
private:
- static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesWritten);
+ void startAsyncWriteLocked();
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ void writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
bool waitForNotification(int timeout);
- void emitPendingBytesWrittenValue();
-
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeWriter *pipeWriter);
- void clear();
-
- QWindowsPipeWriter *pipeWriter;
- };
+ bool emitPendingSignals(bool allowWinActPosting);
HANDLE handle;
- Overlapped overlapped;
- QByteArray buffer;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
+ QRingBuffer writeBuffer;
qint64 pendingBytesWrittenValue;
+ mutable QMutex mutex;
+ DWORD lastError;
bool stopped;
bool writeSequenceStarted;
- bool notifiedCalled;
bool bytesWrittenPending;
+ bool winEventActPosted;
bool inBytesWritten;
};
diff --git a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
index 3e19764618..ba0e03af4c 100644
--- a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
+++ b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
@@ -35,6 +35,8 @@
#include <QtGui/qrasterwindow.h>
#include <QtNetwork/qtcpserver.h>
#include <QtNetwork/qtcpsocket.h>
+#include <QtNetwork/qlocalserver.h>
+#include <QtNetwork/qlocalsocket.h>
#include <QtCore/qelapsedtimer.h>
#include <QtCore/qtimer.h>
#include <QtCore/qwineventnotifier.h>
@@ -51,6 +53,7 @@ class tst_NoQtEventLoop : public QObject
private slots:
void consumeMouseEvents();
void consumeSocketEvents();
+ void consumeLocalSocketEvents();
void consumeWinEvents_data();
void consumeWinEvents();
void deliverEventsInLivelock();
@@ -318,6 +321,44 @@ void tst_NoQtEventLoop::consumeSocketEvents()
QVERIFY(server.hasPendingConnections());
}
+void tst_NoQtEventLoop::consumeLocalSocketEvents()
+{
+ int argc = 1;
+ char *argv[] = { const_cast<char *>("test"), 0 };
+ QGuiApplication app(argc, argv);
+ QLocalServer server;
+ QLocalSocket client;
+ QSignalSpy readyReadSpy(&client, &QIODevice::readyRead);
+
+ QVERIFY(server.listen("consumeLocalSocketEvents"));
+ client.connectToServer("consumeLocalSocketEvents");
+ QVERIFY(client.waitForConnected(200));
+ QVERIFY(server.waitForNewConnection(200));
+ QLocalSocket *clientSocket = server.nextPendingConnection();
+ QVERIFY(clientSocket);
+ QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten);
+ server.close();
+
+ bool timeExpired = false;
+ QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() {
+ timeExpired = true;
+ });
+ QVERIFY(clientSocket->putChar(0));
+
+ // Exec own message loop
+ MSG msg;
+ while (::GetMessage(&msg, NULL, 0, 0)) {
+ ::TranslateMessage(&msg);
+ ::DispatchMessage(&msg);
+
+ if (timeExpired || readyReadSpy.count() != 0)
+ break;
+ }
+ QVERIFY(!timeExpired);
+ QCOMPARE(bytesWrittenSpy.count(), 1);
+ QCOMPARE(readyReadSpy.count(), 1);
+}
+
void tst_NoQtEventLoop::consumeWinEvents_data()
{
QTest::addColumn<bool>("peeking");
diff --git a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
index e481db6644..c4f4fcc207 100644
--- a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
+++ b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
@@ -634,11 +634,12 @@ void tst_QLocalSocket::readBufferOverflow()
QCOMPARE(client.bytesAvailable(), 0);
#ifdef Q_OS_WIN
+ // ensure the previous write operation is finished
+ QVERIFY(serverSocket->waitForBytesWritten());
+
serverSocket->write(buffer, readBufferSize);
QVERIFY(serverSocket->waitForBytesWritten());
- // ensure the read completion routine is called
- SleepEx(100, true);
QVERIFY(client.waitForReadyRead());
QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));
diff --git a/tests/benchmarks/network/socket/CMakeLists.txt b/tests/benchmarks/network/socket/CMakeLists.txt
index 6d54bc05f5..7c122a73ef 100644
--- a/tests/benchmarks/network/socket/CMakeLists.txt
+++ b/tests/benchmarks/network/socket/CMakeLists.txt
@@ -1,4 +1,5 @@
# Generated from socket.pro.
+add_subdirectory(qlocalsocket)
add_subdirectory(qtcpserver)
add_subdirectory(qudpsocket)
diff --git a/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt
new file mode 100644
index 0000000000..2ab340127e
--- /dev/null
+++ b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt
@@ -0,0 +1,16 @@
+# Generated from qlocalsocket.pro.
+
+#####################################################################
+## tst_bench_qlocalsocket Binary:
+#####################################################################
+
+qt_add_benchmark(tst_bench_qlocalsocket
+ SOURCES
+ tst_qlocalsocket.cpp
+ PUBLIC_LIBRARIES
+ Qt::Network
+ Qt::Test
+)
+
+#### Keys ignored in scope 1:.:.:qlocalsocket.pro:<TRUE>:
+# TEMPLATE = "app"
diff --git a/tests/benchmarks/network/socket/qlocalsocket/qlocalsocket.pro b/tests/benchmarks/network/socket/qlocalsocket/qlocalsocket.pro
new file mode 100644
index 0000000000..5c6f25ea61
--- /dev/null
+++ b/tests/benchmarks/network/socket/qlocalsocket/qlocalsocket.pro
@@ -0,0 +1,8 @@
+TEMPLATE = app
+TARGET = tst_bench_qlocalsocket
+
+QT = network testlib
+
+CONFIG += release
+
+SOURCES += tst_qlocalsocket.cpp
diff --git a/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp
new file mode 100644
index 0000000000..adedc25dc2
--- /dev/null
+++ b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp
@@ -0,0 +1,224 @@
+/****************************************************************************
+**
+** Copyright (C) 2020 The Qt Company Ltd.
+** Contact: https://www.qt.io/licensing/
+**
+** This file is part of the test suite of the Qt Toolkit.
+**
+** $QT_BEGIN_LICENSE:GPL-EXCEPT$
+** Commercial License Usage
+** Licensees holding valid commercial Qt licenses may use this file in
+** accordance with the commercial license agreement provided with the
+** Software or, alternatively, in accordance with the terms contained in
+** a written agreement between you and The Qt Company. For licensing terms
+** and conditions see https://www.qt.io/terms-conditions. For further
+** information use the contact form at https://www.qt.io/contact-us.
+**
+** GNU General Public License Usage
+** Alternatively, this file may be used under the terms of the GNU
+** General Public License version 3 as published by the Free Software
+** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
+** included in the packaging of this file. Please review the following
+** information to ensure the GNU General Public License requirements will
+** be met: https://www.gnu.org/licenses/gpl-3.0.html.
+**
+** $QT_END_LICENSE$
+**
+****************************************************************************/
+
+#include <QtTest/QtTest>
+#include <QtCore/qglobal.h>
+#include <QtCore/qthread.h>
+#include <QtCore/qsemaphore.h>
+#include <QtCore/qbytearray.h>
+#include <QtCore/qeventloop.h>
+#include <QtCore/qvector.h>
+#include <QtCore/qelapsedtimer.h>
+#include <QtNetwork/qlocalsocket.h>
+#include <QtNetwork/qlocalserver.h>
+
+class tst_QLocalSocket : public QObject
+{
+ Q_OBJECT
+
+private slots:
+ void pingPong_data();
+ void pingPong();
+ void dataExchange_data();
+ void dataExchange();
+};
+
+class ServerThread : public QThread
+{
+public:
+ QSemaphore running;
+
+ explicit ServerThread(int chunkSize)
+ {
+ buffer.resize(chunkSize);
+ }
+
+ void run() override
+ {
+ QLocalServer server;
+
+ connect(&server, &QLocalServer::newConnection, [this, &server]() {
+ auto socket = server.nextPendingConnection();
+
+ connect(socket, &QLocalSocket::readyRead, [this, socket]() {
+ const qint64 bytesAvailable = socket->bytesAvailable();
+ Q_ASSERT(bytesAvailable <= this->buffer.size());
+
+ QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
+ QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
+ });
+ });
+
+ QVERIFY(server.listen("foo"));
+ running.release();
+ exec();
+ }
+
+protected:
+ QByteArray buffer;
+};
+
+class SocketFactory : public QObject
+{
+ Q_OBJECT
+
+public:
+ bool stopped = false;
+
+ explicit SocketFactory(int chunkSize, int connections)
+ {
+ buffer.resize(chunkSize);
+ for (int i = 0; i < connections; ++i) {
+ QLocalSocket *socket = new QLocalSocket(this);
+ Q_CHECK_PTR(socket);
+
+ connect(this, &SocketFactory::start, [this, socket]() {
+ QCOMPARE(socket->write(this->buffer), this->buffer.size());
+ });
+
+ connect(socket, &QLocalSocket::readyRead, [i, this, socket]() {
+ const qint64 bytesAvailable = socket->bytesAvailable();
+ Q_ASSERT(bytesAvailable <= this->buffer.size());
+
+ QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
+ emit this->bytesReceived(i, bytesAvailable);
+
+ if (!this->stopped)
+ QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
+ });
+
+ socket->connectToServer("foo");
+ QCOMPARE(socket->state(), QLocalSocket::ConnectedState);
+ }
+ }
+
+signals:
+ void start();
+ void bytesReceived(int channel, qint64 bytes);
+
+protected:
+ QByteArray buffer;
+};
+
+void tst_QLocalSocket::pingPong_data()
+{
+ QTest::addColumn<int>("connections");
+ for (int value : {10, 50, 100, 1000, 5000})
+ QTest::addRow("connections: %d", value) << value;
+}
+
+void tst_QLocalSocket::pingPong()
+{
+ QFETCH(int, connections);
+
+ const int iterations = 100000;
+ Q_ASSERT(iterations >= connections && connections > 0);
+
+ ServerThread serverThread(1);
+ serverThread.start();
+ // Wait for server to start.
+ QVERIFY(serverThread.running.tryAcquire(1, 3000));
+
+ SocketFactory factory(1, connections);
+ QEventLoop eventLoop;
+ QVector<qint64> bytesToRead;
+ QElapsedTimer timer;
+
+ bytesToRead.fill(iterations / connections, connections);
+ connect(&factory, &SocketFactory::bytesReceived,
+ [&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) {
+ Q_UNUSED(bytes);
+
+ if (--bytesToRead[channel] == 0 && --connections == 0) {
+ factory.stopped = true;
+ eventLoop.quit();
+ }
+ });
+
+ timer.start();
+ emit factory.start();
+ eventLoop.exec();
+
+ qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0);
+ serverThread.quit();
+ serverThread.wait();
+}
+
+void tst_QLocalSocket::dataExchange_data()
+{
+ QTest::addColumn<int>("connections");
+ QTest::addColumn<int>("chunkSize");
+ for (int connections : {1, 5, 10}) {
+ for (int chunkSize : {100, 1000, 10000, 100000}) {
+ QTest::addRow("connections: %d, chunk size: %d",
+ connections, chunkSize) << connections << chunkSize;
+ }
+ }
+}
+
+void tst_QLocalSocket::dataExchange()
+{
+ QFETCH(int, connections);
+ QFETCH(int, chunkSize);
+
+ Q_ASSERT(chunkSize > 0 && connections > 0);
+ const qint64 timeToTest = 5000;
+
+ ServerThread serverThread(chunkSize);
+ serverThread.start();
+ // Wait for server to start.
+ QVERIFY(serverThread.running.tryAcquire(1, 3000));
+
+ SocketFactory factory(chunkSize, connections);
+ QEventLoop eventLoop;
+ qint64 totalReceived = 0;
+ QElapsedTimer timer;
+
+ connect(&factory, &SocketFactory::bytesReceived,
+ [&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) {
+ Q_UNUSED(channel);
+
+ totalReceived += bytes;
+ if (timer.elapsed() >= timeToTest) {
+ factory.stopped = true;
+ eventLoop.quit();
+ }
+ });
+
+ timer.start();
+ emit factory.start();
+ eventLoop.exec();
+
+ qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed());
+ serverThread.quit();
+ serverThread.wait();
+}
+
+QTEST_MAIN(tst_QLocalSocket)
+
+#include "tst_qlocalsocket.moc"
diff --git a/tests/benchmarks/network/socket/socket.pro b/tests/benchmarks/network/socket/socket.pro
index d428a4d973..1cc67cffd7 100644
--- a/tests/benchmarks/network/socket/socket.pro
+++ b/tests/benchmarks/network/socket/socket.pro
@@ -1,4 +1,5 @@
TEMPLATE = subdirs
SUBDIRS = \
+ qlocalsocket \
qtcpserver \
qudpsocket