summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/corelib/io/qwindowspipereader.cpp468
-rw-r--r--src/corelib/io/qwindowspipereader_p.h51
-rw-r--r--src/corelib/io/qwindowspipewriter.cpp348
-rw-r--r--src/corelib/io/qwindowspipewriter_p.h44
-rw-r--r--tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp42
-rw-r--r--tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp20
-rw-r--r--tests/benchmarks/network/socket/CMakeLists.txt1
-rw-r--r--tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt14
-rw-r--r--tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp225
9 files changed, 852 insertions, 361 deletions
diff --git a/src/corelib/io/qwindowspipereader.cpp b/src/corelib/io/qwindowspipereader.cpp
index b525e88282..bf03737c39 100644
--- a/src/corelib/io/qwindowspipereader.cpp
+++ b/src/corelib/io/qwindowspipereader.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,58 +39,63 @@
****************************************************************************/
#include "qwindowspipereader_p.h"
-#include "qiodevice_p.h"
-#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
static const DWORD minReadBufferSize = 4096;
-QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
- : pipeReader(reader)
-{
-}
-
-void QWindowsPipeReader::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
readBufferMaxSize(0),
actualReadBufferSize(0),
- bytesPending(0),
+ pendingReadBytes(0),
+ lastError(ERROR_SUCCESS),
state(Stopped),
readSequenceStarted(false),
- notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
+ winEventActPosted(false),
inReadyRead(false)
{
- connect(this, &QWindowsPipeReader::_q_queueReadyRead,
- this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
+
+ // Wait for thread pool callback to complete, as it can be still
+ // executing some completion code.
+ WaitForThreadpoolWaitCallbacks(waitObject, FALSE);
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
/*!
Sets the handle to read from. The handle must be valid.
+ Do not call this function while the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
- bytesPending = 0;
+ readyReadPending = false;
+ pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
+ lastError = ERROR_SUCCESS;
}
/*!
@@ -98,8 +104,7 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
*/
void QWindowsPipeReader::stop()
{
- state = Stopped;
- cancelAsyncRead();
+ cancelAsyncRead(Stopped);
}
/*!
@@ -108,16 +113,27 @@ void QWindowsPipeReader::stop()
*/
void QWindowsPipeReader::drainAndStop()
{
- state = Draining;
- cancelAsyncRead();
+ cancelAsyncRead(Draining);
+
+ // Note that signals are not emitted in the call below, as the caller
+ // is expected to do that synchronously.
+ consumePending();
}
/*!
Stops the asynchronous read sequence.
*/
-void QWindowsPipeReader::cancelAsyncRead()
+void QWindowsPipeReader::cancelAsyncRead(State newState)
{
+ if (state != Running)
+ return;
+
+ QMutexLocker locker(&mutex);
+ state = newState;
if (readSequenceStarted) {
+ // This can legitimately fail due to the GetOverlappedResult()
+ // in the callback not being locked. We ignore ERROR_NOT_FOUND
+ // in this case.
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@@ -125,11 +141,37 @@ void QWindowsPipeReader::cancelAsyncRead()
handle);
}
}
- waitForNotification(-1);
+
+ // Wait for callback to complete.
+ do {
+ locker.unlock();
+ waitForNotification(QDeadlineTimer(-1));
+ locker.relock();
+ } while (readSequenceStarted);
}
}
/*!
+ Sets the size of internal read buffer.
+ */
+void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
+{
+ QMutexLocker locker(&mutex);
+ readBufferMaxSize = size;
+}
+
+/*!
+ Returns \c true if async operation is in progress, there is
+ pending data to read, or a read error is pending.
+*/
+bool QWindowsPipeReader::isReadOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return readSequenceStarted || readyReadPending
+ || (lastError != ERROR_SUCCESS && !pipeBroken);
+}
+
+/*!
Returns the number of bytes we've read so far.
*/
qint64 QWindowsPipeReader::bytesAvailable() const
@@ -145,6 +187,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
+ mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@@ -155,6 +198,7 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
+ mutex.unlock();
if (!pipeBroken) {
if (state == Running)
@@ -166,197 +210,268 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
+/*!
+ Returns \c true if a complete line of data can be read from the buffer.
+ */
bool QWindowsPipeReader::canReadLine() const
{
+ QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
- \internal
- Will be called whenever the read operation completes.
+ Starts an asynchronous read sequence on the pipe.
*/
-void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
+void QWindowsPipeReader::startAsyncRead()
{
- notifiedCalled = true;
- readSequenceStarted = false;
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_MORE_DATA:
- // This is not an error. We're connected to a message mode
- // pipe and the message didn't fit into the pipe's system
- // buffer. We will read the remaining data in the next call.
- break;
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- pipeBroken = true;
- break;
- case ERROR_OPERATION_ABORTED:
- if (state != Running)
- break;
- Q_FALLTHROUGH();
- default:
- emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
- pipeBroken = true;
- break;
- }
+ QMutexLocker locker(&mutex);
- // After the reader was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new read sequence should
- // be started in this case.
- if (state == Stopped)
+ if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
- if (pipeBroken) {
- emitPipeClosed();
+ state = Running;
+ startAsyncReadLocked();
+
+ // Do not post the event, if the read operation will be completed asynchronously.
+ if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
- }
- actualReadBufferSize += numberOfBytesRead;
- readBuffer.truncate(actualReadBufferSize);
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
+ }
- // Read all pending data from the pipe's buffer in 'Draining' state.
- if (state == Draining) {
- // Determine the number of pending bytes on the first iteration.
- if (bytesPending == 0)
- bytesPending = checkPipeState();
- else
- bytesPending -= numberOfBytesRead;
+ SetEvent(syncHandle);
+}
- if (bytesPending == 0) // all data received
- return; // unblock waitForNotification() in cancelAsyncRead()
+/*!
+ Starts a new read sequence. Thread-safety should be ensured
+ by the caller.
+ */
+void QWindowsPipeReader::startAsyncReadLocked()
+{
+ // Determine the number of bytes to read.
+ qint64 bytesToRead = qMax(checkPipeState(), state == Running ? minReadBufferSize : 0);
- startAsyncReadHelper(bytesPending);
- if (readSequenceStarted)
- notifiedCalled = false; // wait for more data
+ // This can happen only while draining; just do nothing in this case.
+ if (bytesToRead == 0)
return;
+
+ while (lastError == ERROR_SUCCESS) {
+ if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
+ bytesToRead = readBufferMaxSize - readBuffer.size();
+ if (bytesToRead <= 0) {
+ // Buffer is full. User must read data from the buffer
+ // before we can read more from the pipe.
+ return;
+ }
+ }
+
+ char *ptr = readBuffer.reserve(bytesToRead);
+
+ // ReadFile() returns true, if the read operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesRead' is valid in this case.
+ DWORD numberOfBytesRead;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ Q_ASSERT(state == Running);
+ // Operation has been queued and will complete in the future.
+ readSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!readCompleted(errorCode, numberOfBytesRead))
+ return;
+
+ // In the 'Draining' state, we have to get all the data with one call
+ // to ReadFile(). Note that message mode pipes are not supported here.
+ if (state == Draining) {
+ Q_ASSERT(bytesToRead == qint64(numberOfBytesRead));
+ return;
+ }
+
+ // We need to loop until all pending data has been read and an
+ // operation is queued for asynchronous completion.
+ // If the pipe is configured to work in message mode, we read
+ // the data in chunks.
+ bytesToRead = qMax(checkPipeState(), minReadBufferSize);
}
+}
- startAsyncRead();
- if (!readyReadPending) {
- readyReadPending = true;
- emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
+/*!
+ Thread pool callback procedure.
+ */
+void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
+{
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ pipeReader->mutex.lock();
+
+ pipeReader->readSequenceStarted = false;
+
+ // Do not overwrite error code, if error has been detected by
+ // checkPipeState() in waitForPipeClosed(). Also, if the reader was
+ // stopped, the only reason why this function can be called is the
+ // completion of a cancellation. No signals should be emitted, and
+ // no new read sequence should be started in this case.
+ if (pipeReader->lastError == ERROR_SUCCESS && pipeReader->state != Stopped) {
+ // Ignore ERROR_OPERATION_ABORTED. We have canceled the I/O operation
+ // specifically for flushing the pipe.
+ if (pipeReader->state == Draining && errorCode == ERROR_OPERATION_ABORTED)
+ errorCode = ERROR_SUCCESS;
+
+ if (pipeReader->readCompleted(errorCode, numberOfBytesTransfered))
+ pipeReader->startAsyncReadLocked();
+
+ if (pipeReader->state == Running && !pipeReader->winEventActPosted) {
+ pipeReader->winEventActPosted = true;
+ pipeReader->mutex.unlock();
+ QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
+ } else {
+ pipeReader->mutex.unlock();
+ }
+ } else {
+ pipeReader->mutex.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeReader->syncHandle);
}
/*!
- \internal
- Starts an asynchronous read sequence on the pipe.
+ Will be called whenever the read operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
*/
-void QWindowsPipeReader::startAsyncRead()
+bool QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
{
- if (readSequenceStarted)
- return;
+ // ERROR_MORE_DATA is not an error. We're connected to a message mode
+ // pipe and the message didn't fit into the pipe's system
+ // buffer. We will read the remaining data in the next call.
+ if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
+ readyReadPending = true;
+ pendingReadBytes += numberOfBytesRead;
+ readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
+ return true;
+ }
- state = Running;
- startAsyncReadHelper(qMax(checkPipeState(), minReadBufferSize));
+ lastError = errorCode;
+ return false;
}
/*!
- \internal
- Starts a new read sequence.
+ Receives notification that the read operation has completed.
*/
-void QWindowsPipeReader::startAsyncReadHelper(qint64 bytesToRead)
+bool QWindowsPipeReader::event(QEvent *e)
{
- Q_ASSERT(bytesToRead != 0);
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
- if (pipeBroken)
- return;
+/*!
+ Updates the read buffer size and emits pending signals in the main thread.
+ Returns \c true, if readyRead() was emitted.
+ */
+bool QWindowsPipeReader::consumePendingAndEmit(bool allowWinActPosting)
+{
+ mutex.lock();
- if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
- bytesToRead = readBufferMaxSize - readBuffer.size();
- if (bytesToRead <= 0) {
- // Buffer is full. User must read data from the buffer
- // before we can read more from the pipe.
- return;
- }
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ const bool emitReadyRead = consumePending();
+ const DWORD dwError = lastError;
+
+ mutex.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ // We are not allowed to emit signals in either 'Stopped'
+ // or 'Draining' state.
+ if (state != Running)
+ return false;
+
+ if (emitReadyRead && !inReadyRead) {
+ QScopedValueRollback<bool> guard(inReadyRead, true);
+ emit readyRead();
}
- char *ptr = readBuffer.reserve(bytesToRead);
-
- readSequenceStarted = true;
- overlapped.clear();
- if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
- readSequenceStarted = false;
-
- const DWORD dwError = GetLastError();
- switch (dwError) {
- case ERROR_BROKEN_PIPE:
- case ERROR_PIPE_NOT_CONNECTED:
- // It may happen, that the other side closes the connection directly
- // after writing data. Then we must set the appropriate socket state.
- pipeBroken = true;
- emit pipeClosed();
- break;
- default:
- emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
- break;
- }
+ // Trigger 'pipeBroken' only once.
+ if (dwError != ERROR_SUCCESS && !pipeBroken) {
+ pipeBroken = true;
+ if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
+ emit winError(dwError, QLatin1String("QWindowsPipeReader::consumePendingAndEmit"));
+ emit pipeClosed();
}
+
+ return emitReadyRead;
}
/*!
- \internal
- Called when ReadFileEx finished the read operation.
+ Updates the read buffer size. Returns \c true, if readyRead()
+ should be emitted. Thread-safety should be ensured by the caller.
*/
-void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+bool QWindowsPipeReader::consumePending()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
+ if (readyReadPending) {
+ readyReadPending = false;
+ actualReadBufferSize += pendingReadBytes;
+ pendingReadBytes = 0;
+ return true;
+ }
+
+ return false;
}
/*!
- \internal
Returns the number of available bytes in the pipe.
- Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
- if (!pipeBroken) {
- pipeBroken = true;
- emitPipeClosed();
- }
+
+ lastError = GetLastError();
return 0;
}
-bool QWindowsPipeReader::waitForNotification(int timeout)
+bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
return true;
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
- }
- return notifiedCalled;
-}
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
-void QWindowsPipeReader::emitPendingReadyRead()
-{
- if (readyReadPending) {
- readyReadPending = false;
- QScopedValueRollback<bool> guard(inReadyRead, true);
- emit readyRead();
- }
-}
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
-void QWindowsPipeReader::emitPipeClosed()
-{
- // We are not allowed to emit signals in either 'Stopped'
- // or 'Draining' state.
- if (state == Running)
- emit pipeClosed();
+ return false;
}
/*!
@@ -366,22 +481,12 @@ void QWindowsPipeReader::emitPipeClosed()
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
- }
-
- if (!readSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ QDeadlineTimer timer(msecs);
- if (readyReadPending) {
- if (!inReadyRead)
- emitPendingReadyRead();
- return true;
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isReadOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
return false;
@@ -393,15 +498,26 @@ bool QWindowsPipeReader::waitForReadyRead(int msecs)
bool QWindowsPipeReader::waitForPipeClosed(int msecs)
{
const int sleepTime = 10;
- QElapsedTimer stopWatch;
- stopWatch.start();
+ QDeadlineTimer timer(msecs);
+
+ while (waitForReadyRead(timer.remainingTime())) {}
+ if (pipeBroken)
+ return true;
+
+ if (timer.hasExpired())
+ return false;
+
+ // When the read buffer is full, the read sequence is not running,
+ // so we need to peek the pipe to detect disconnection.
forever {
- waitForReadyRead(0);
checkPipeState();
+ consumePendingAndEmit(false);
if (pipeBroken)
return true;
- if (stopWatch.hasExpired(msecs - sleepTime))
+
+ if (timer.hasExpired())
return false;
+
Sleep(sleepTime);
}
}
diff --git a/src/corelib/io/qwindowspipereader_p.h b/src/corelib/io/qwindowspipereader_p.h
index c61018d87d..a284f55b3b 100644
--- a/src/corelib/io/qwindowspipereader_p.h
+++ b/src/corelib/io/qwindowspipereader_p.h
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -52,6 +53,8 @@
//
#include <qobject.h>
+#include <qdeadlinetimer.h>
+#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
@@ -70,7 +73,7 @@ public:
void stop();
void drainAndStop();
- void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
+ void setMaxReadBufferSize(qint64 size);
qint64 maxReadBufferSize() const { return readBufferMaxSize; }
bool isPipeClosed() const { return pipeBroken; }
@@ -80,46 +83,46 @@ public:
bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs);
- bool isReadOperationActive() const { return readSequenceStarted; }
+ bool isReadOperationActive() const;
Q_SIGNALS:
void winError(ulong, const QString &);
void readyRead();
void pipeClosed();
- void _q_queueReadyRead(QPrivateSignal);
+
+protected:
+ bool event(QEvent *e) override;
private:
- void startAsyncReadHelper(qint64 bytesToRead);
- void cancelAsyncRead();
- static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesRead);
+ enum State { Stopped, Running, Draining };
+
+ void startAsyncReadLocked();
+ void cancelAsyncRead(State newState);
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
- bool waitForNotification(int timeout);
- void emitPendingReadyRead();
- void emitPipeClosed();
-
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeReader *reader);
- void clear();
- QWindowsPipeReader *pipeReader;
- };
+ bool waitForNotification(const QDeadlineTimer &deadline);
+ bool consumePendingAndEmit(bool allowWinActPosting);
+ bool consumePending();
HANDLE handle;
- Overlapped overlapped;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
qint64 readBufferMaxSize;
QRingBuffer readBuffer;
qint64 actualReadBufferSize;
- qint64 bytesPending;
+ qint64 pendingReadBytes;
+ mutable QMutex mutex;
+ DWORD lastError;
- enum State { Stopped, Running, Draining } state;
+ State state;
bool readSequenceStarted;
- bool notifiedCalled;
bool pipeBroken;
bool readyReadPending;
+ bool winEventActPosted;
bool inReadyRead;
};
diff --git a/src/corelib/io/qwindowspipewriter.cpp b/src/corelib/io/qwindowspipewriter.cpp
index e374034a06..5ed584c6e3 100644
--- a/src/corelib/io/qwindowspipewriter.cpp
+++ b/src/corelib/io/qwindowspipewriter.cpp
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -38,189 +39,296 @@
****************************************************************************/
#include "qwindowspipewriter_p.h"
-#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
+#include <qcoreapplication.h>
+#include <QMutexLocker>
QT_BEGIN_NAMESPACE
-QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
- : pipeWriter(pipeWriter)
-{
-}
-
-void QWindowsPipeWriter::Overlapped::clear()
-{
- ZeroMemory(this, sizeof(OVERLAPPED));
-}
-
-
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
- overlapped(this),
+ eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ syncHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
+ waitObject(NULL),
pendingBytesWrittenValue(0),
+ lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
- notifiedCalled(false),
bytesWrittenPending(false),
+ winEventActPosted(false),
inBytesWritten(false)
{
- connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
- this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
+ ZeroMemory(&overlapped, sizeof(OVERLAPPED));
+ overlapped.hEvent = eventHandle;
+ waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
+ if (waitObject == NULL)
+ qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
+ CloseThreadpoolWait(waitObject);
+ CloseHandle(eventHandle);
+ CloseHandle(syncHandle);
}
-bool QWindowsPipeWriter::waitForWrite(int msecs)
+/*!
+ Stops the asynchronous write sequence.
+ If the write sequence is running then the I/O operation is canceled.
+ */
+void QWindowsPipeWriter::stop()
{
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
- }
-
- if (!writeSequenceStarted)
- return false;
-
- if (!waitForNotification(msecs))
- return false;
+ if (stopped)
+ return;
- if (bytesWrittenPending) {
- emitPendingBytesWrittenValue();
- return true;
+ mutex.lock();
+ stopped = true;
+ if (writeSequenceStarted) {
+ // Trying to disable callback before canceling the operation.
+ // Callback invocation is unnecessary here.
+ SetThreadpoolWait(waitObject, NULL, NULL);
+ if (!CancelIoEx(handle, &overlapped)) {
+ const DWORD dwError = GetLastError();
+ if (dwError != ERROR_NOT_FOUND) {
+ qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
+ handle);
+ }
+ }
+ writeSequenceStarted = false;
}
+ mutex.unlock();
- return false;
+ WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
+/*!
+ Returns \c true if async operation is in progress or a bytesWritten
+ signal is pending.
+ */
+bool QWindowsPipeWriter::isWriteOperationActive() const
+{
+ QMutexLocker locker(&mutex);
+ return writeSequenceStarted || bytesWrittenPending;
+}
+
+/*!
+ Returns the number of bytes that are waiting to be written.
+ */
qint64 QWindowsPipeWriter::bytesToWrite() const
{
- return buffer.size() + pendingBytesWrittenValue;
+ QMutexLocker locker(&mutex);
+ return writeBuffer.size() + pendingBytesWrittenValue;
}
-void QWindowsPipeWriter::emitPendingBytesWrittenValue()
+/*!
+ Writes data to the pipe.
+ */
+bool QWindowsPipeWriter::write(const QByteArray &ba)
{
- if (bytesWrittenPending) {
- // Reset the state even if we don't emit bytesWritten().
- // It's a defined behavior to not re-emit this signal recursively.
- bytesWrittenPending = false;
- const qint64 bytes = pendingBytesWrittenValue;
- pendingBytesWrittenValue = 0;
-
- emit canWrite();
- if (!inBytesWritten) {
- QScopedValueRollback<bool> guard(inBytesWritten, true);
- emit bytesWritten(bytes);
- }
+ QMutexLocker locker(&mutex);
+
+ if (lastError != ERROR_SUCCESS)
+ return false;
+
+ writeBuffer.append(ba);
+ if (writeSequenceStarted)
+ return true;
+
+ stopped = false;
+ startAsyncWriteLocked();
+
+ // Do not post the event, if the write operation will be completed asynchronously.
+ if (!bytesWrittenPending)
+ return true;
+
+ if (!winEventActPosted) {
+ winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ SetEvent(syncHandle);
+ return true;
}
-void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase)
+/*!
+ Starts a new write sequence. Thread-safety should be ensured by the caller.
+ */
+void QWindowsPipeWriter::startAsyncWriteLocked()
{
- Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
- overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
+ while (!writeBuffer.isEmpty()) {
+ // WriteFile() returns true, if the write operation completes synchronously.
+ // We don't need to call GetOverlappedResult() additionally, because
+ // 'numberOfBytesWritten' is valid in this case.
+ DWORD numberOfBytesWritten;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
+ &numberOfBytesWritten, &overlapped)) {
+ errorCode = GetLastError();
+ if (errorCode == ERROR_IO_PENDING) {
+ // Operation has been queued and will complete in the future.
+ writeSequenceStarted = true;
+ SetThreadpoolWait(waitObject, eventHandle, NULL);
+ return;
+ }
+ }
+
+ if (!writeCompleted(errorCode, numberOfBytesWritten))
+ return;
+ }
}
/*!
- \internal
- Will be called whenever the write operation completes.
+ Thread pool callback procedure.
*/
-void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
+void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
- notifiedCalled = true;
- writeSequenceStarted = false;
- Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
- buffer.clear();
-
- switch (errorCode) {
- case ERROR_SUCCESS:
- break;
- case ERROR_OPERATION_ABORTED:
- if (stopped)
- break;
- Q_FALLTHROUGH();
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
- break;
- }
+ Q_UNUSED(instance);
+ Q_UNUSED(wait);
+ Q_UNUSED(waitResult);
+ QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
+
+ // Get the result of the asynchronous operation.
+ DWORD numberOfBytesTransfered = 0;
+ DWORD errorCode = ERROR_SUCCESS;
+ if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
+ &numberOfBytesTransfered, FALSE))
+ errorCode = GetLastError();
+
+ QMutexLocker locker(&pipeWriter->mutex);
// After the writer was stopped, the only reason why this function can be called is the
- // completion of a cancellation. No signals should be emitted, and no new write sequence should
- // be started in this case.
- if (stopped)
+ // completion of a cancellation. No signals should be emitted, and no new write sequence
+ // should be started in this case.
+ if (pipeWriter->stopped)
return;
- pendingBytesWrittenValue += qint64(numberOfBytesWritten);
- if (!bytesWrittenPending) {
- bytesWrittenPending = true;
- emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
+ pipeWriter->writeSequenceStarted = false;
+
+ if (pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered))
+ pipeWriter->startAsyncWriteLocked();
+
+ if (pipeWriter->lastError == ERROR_SUCCESS && !pipeWriter->winEventActPosted) {
+ pipeWriter->winEventActPosted = true;
+ locker.unlock();
+ QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
+ } else {
+ locker.unlock();
}
+
+ // We set the event only after unlocking to avoid additional context
+ // switches due to the released thread immediately running into the lock.
+ SetEvent(pipeWriter->syncHandle);
}
-bool QWindowsPipeWriter::waitForNotification(int timeout)
+/*!
+ Will be called whenever the write operation completes. Returns \c true if
+ no error occurred; otherwise returns \c false.
+ */
+bool QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
- QElapsedTimer t;
- t.start();
- notifiedCalled = false;
- int msecs = timeout;
- while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
- if (notifiedCalled)
- return true;
+ if (errorCode == ERROR_SUCCESS) {
+ Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
- // Some other I/O completion routine was called. Wait some more.
- msecs = qt_subtract_from_timeout(timeout, t.elapsed());
- if (!msecs)
- break;
+ bytesWrittenPending = true;
+ pendingBytesWrittenValue += numberOfBytesWritten;
+ writeBuffer.free(numberOfBytesWritten);
+ return true;
}
- return notifiedCalled;
+
+ lastError = errorCode;
+ writeBuffer.clear();
+ // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
+ if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
+ qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
+ return false;
}
-bool QWindowsPipeWriter::write(const QByteArray &ba)
+/*!
+ Receives notification that the write operation has completed.
+ */
+bool QWindowsPipeWriter::event(QEvent *e)
{
- if (writeSequenceStarted)
+ if (e->type() == QEvent::WinEventAct) {
+ consumePendingAndEmit(true);
+ return true;
+ }
+ return QObject::event(e);
+}
+
+/*!
+ Updates the state and emits pending signals in the main thread.
+ Returns \c true, if bytesWritten() was emitted.
+ */
+bool QWindowsPipeWriter::consumePendingAndEmit(bool allowWinActPosting)
+{
+ QMutexLocker locker(&mutex);
+
+ // Enable QEvent::WinEventAct posting.
+ if (allowWinActPosting)
+ winEventActPosted = false;
+
+ if (!bytesWrittenPending)
return false;
- overlapped.clear();
- buffer = ba;
- stopped = false;
- writeSequenceStarted = true;
- if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
- &overlapped, &writeFileCompleted)) {
- writeSequenceStarted = false;
- buffer.clear();
-
- const DWORD errorCode = GetLastError();
- switch (errorCode) {
- case ERROR_NO_DATA: // "The pipe is being closed."
- // The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
- break;
- default:
- qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
- }
+ // Reset the state even if we don't emit bytesWritten().
+ // It's a defined behavior to not re-emit this signal recursively.
+ bytesWrittenPending = false;
+ qint64 numberOfBytesWritten = pendingBytesWrittenValue;
+ pendingBytesWrittenValue = 0;
+
+ locker.unlock();
+
+ // Disable any further processing, if the pipe was stopped.
+ if (stopped)
return false;
+
+ emit canWrite();
+ if (!inBytesWritten) {
+ QScopedValueRollback<bool> guard(inBytesWritten, true);
+ emit bytesWritten(numberOfBytesWritten);
}
return true;
}
-void QWindowsPipeWriter::stop()
+bool QWindowsPipeWriter::waitForNotification(const QDeadlineTimer &deadline)
{
- stopped = true;
- bytesWrittenPending = false;
- pendingBytesWrittenValue = 0;
- if (writeSequenceStarted) {
- if (!CancelIoEx(handle, &overlapped)) {
- const DWORD dwError = GetLastError();
- if (dwError != ERROR_NOT_FOUND) {
- qErrnoWarning(dwError, "QWindowsPipeWriter: CancelIoEx on handle %p failed.",
- handle);
- }
- }
- waitForNotification(-1);
+ do {
+ DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
+ if (waitRet == WAIT_OBJECT_0)
+ return true;
+
+ if (waitRet != WAIT_IO_COMPLETION)
+ return false;
+
+ // Some I/O completion routine was called. Wait some more.
+ } while (!deadline.hasExpired());
+
+ return false;
+}
+
+/*!
+ Waits for the completion of the asynchronous write operation.
+ Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
+ or bytesWritten will be emitted by the event loop (recursive case).
+ */
+bool QWindowsPipeWriter::waitForWrite(int msecs)
+{
+ QDeadlineTimer timer(msecs);
+
+ // Make sure that 'syncHandle' was triggered by the thread pool callback.
+ while (isWriteOperationActive() && waitForNotification(timer)) {
+ if (consumePendingAndEmit(false))
+ return true;
}
+
+ return false;
}
QT_END_NAMESPACE
diff --git a/src/corelib/io/qwindowspipewriter_p.h b/src/corelib/io/qwindowspipewriter_p.h
index 39e8ffe40a..b648d7b846 100644
--- a/src/corelib/io/qwindowspipewriter_p.h
+++ b/src/corelib/io/qwindowspipewriter_p.h
@@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@@ -54,7 +55,10 @@
#include <QtCore/private/qglobal_p.h>
#include <qelapsedtimer.h>
#include <qobject.h>
-#include <qbytearray.h>
+#include <qdeadlinetimer.h>
+#include <qmutex.h>
+#include <private/qringbuffer_p.h>
+
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
@@ -117,39 +121,37 @@ public:
bool write(const QByteArray &ba);
void stop();
bool waitForWrite(int msecs);
- bool isWriteOperationActive() const { return writeSequenceStarted; }
+ bool isWriteOperationActive() const;
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
- void _q_queueBytesWritten(QPrivateSignal);
-private:
- static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
- OVERLAPPED *overlappedBase);
- void notified(DWORD errorCode, DWORD numberOfBytesWritten);
- bool waitForNotification(int timeout);
- void emitPendingBytesWrittenValue();
+protected:
+ bool event(QEvent *e) override;
- class Overlapped : public OVERLAPPED
- {
- Q_DISABLE_COPY_MOVE(Overlapped)
- public:
- explicit Overlapped(QWindowsPipeWriter *pipeWriter);
- void clear();
-
- QWindowsPipeWriter *pipeWriter;
- };
+private:
+ void startAsyncWriteLocked();
+ static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
+ PTP_WAIT wait, TP_WAIT_RESULT waitResult);
+ bool writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
+ bool waitForNotification(const QDeadlineTimer &deadline);
+ bool consumePendingAndEmit(bool allowWinActPosting);
HANDLE handle;
- Overlapped overlapped;
- QByteArray buffer;
+ HANDLE eventHandle;
+ HANDLE syncHandle;
+ PTP_WAIT waitObject;
+ OVERLAPPED overlapped;
+ QRingBuffer writeBuffer;
qint64 pendingBytesWrittenValue;
+ mutable QMutex mutex;
+ DWORD lastError;
bool stopped;
bool writeSequenceStarted;
- bool notifiedCalled;
bool bytesWrittenPending;
+ bool winEventActPosted;
bool inBytesWritten;
};
diff --git a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
index eba4aa4790..65f2329e3d 100644
--- a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
+++ b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp
@@ -29,12 +29,15 @@
#include <QTest>
#include <QEvent>
+#include <QtTest/QSignalSpy>
#include <QtCore/qthread.h>
#include <QtGui/qguiapplication.h>
#include <QtGui/qpainter.h>
#include <QtGui/qrasterwindow.h>
#include <QtNetwork/qtcpserver.h>
#include <QtNetwork/qtcpsocket.h>
+#include <QtNetwork/qlocalserver.h>
+#include <QtNetwork/qlocalsocket.h>
#include <QtCore/qelapsedtimer.h>
#include <QtCore/qtimer.h>
#include <QtCore/qwineventnotifier.h>
@@ -51,6 +54,7 @@ class tst_NoQtEventLoop : public QObject
private slots:
void consumeMouseEvents();
void consumeSocketEvents();
+ void consumeLocalSocketEvents();
void consumeWinEvents_data();
void consumeWinEvents();
void deliverEventsInLivelock();
@@ -318,6 +322,44 @@ void tst_NoQtEventLoop::consumeSocketEvents()
QVERIFY(server.hasPendingConnections());
}
+void tst_NoQtEventLoop::consumeLocalSocketEvents()
+{
+ int argc = 1;
+ char *argv[] = { const_cast<char *>("test"), 0 };
+ QGuiApplication app(argc, argv);
+ QLocalServer server;
+ QLocalSocket client;
+ QSignalSpy readyReadSpy(&client, &QIODevice::readyRead);
+
+ QVERIFY(server.listen("consumeLocalSocketEvents"));
+ client.connectToServer("consumeLocalSocketEvents");
+ QVERIFY(client.waitForConnected(200));
+ QVERIFY(server.waitForNewConnection(200));
+ QLocalSocket *clientSocket = server.nextPendingConnection();
+ QVERIFY(clientSocket);
+ QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten);
+ server.close();
+
+ bool timeExpired = false;
+ QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() {
+ timeExpired = true;
+ });
+ QVERIFY(clientSocket->putChar(0));
+
+ // Exec own message loop
+ MSG msg;
+ while (::GetMessage(&msg, NULL, 0, 0)) {
+ ::TranslateMessage(&msg);
+ ::DispatchMessage(&msg);
+
+ if (timeExpired || readyReadSpy.count() != 0)
+ break;
+ }
+ QVERIFY(!timeExpired);
+ QCOMPARE(bytesWrittenSpy.count(), 1);
+ QCOMPARE(readyReadSpy.count(), 1);
+}
+
void tst_NoQtEventLoop::consumeWinEvents_data()
{
QTest::addColumn<bool>("peeking");
diff --git a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
index c99ca990da..8b2b4ea4da 100644
--- a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
+++ b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp
@@ -639,26 +639,6 @@ void tst_QLocalSocket::readBufferOverflow()
QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));
// no more bytes available
QCOMPARE(client.bytesAvailable(), 0);
-
-#ifdef Q_OS_WIN
- serverSocket->write(buffer, readBufferSize);
- QVERIFY(serverSocket->waitForBytesWritten());
-
- // ensure the read completion routine is called
- SleepEx(100, true);
- QVERIFY(client.waitForReadyRead());
- QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));
-
- // Test overflow caused by an asynchronous pipe operation.
- client.setReadBufferSize(1);
- serverSocket->write(buffer, 2);
-
- QVERIFY(client.waitForReadyRead());
- // socket disconnects, if there any error on pipe
- QCOMPARE(client.state(), QLocalSocket::ConnectedState);
- QCOMPARE(client.bytesAvailable(), qint64(2));
- QCOMPARE(client.read(buffer, 2), qint64(2));
-#endif
}
static qint64 writeCommand(const QVariant &command, QIODevice *device, int commandCounter)
diff --git a/tests/benchmarks/network/socket/CMakeLists.txt b/tests/benchmarks/network/socket/CMakeLists.txt
index 6d54bc05f5..7c122a73ef 100644
--- a/tests/benchmarks/network/socket/CMakeLists.txt
+++ b/tests/benchmarks/network/socket/CMakeLists.txt
@@ -1,4 +1,5 @@
# Generated from socket.pro.
+add_subdirectory(qlocalsocket)
add_subdirectory(qtcpserver)
add_subdirectory(qudpsocket)
diff --git a/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt
new file mode 100644
index 0000000000..7c56b0b946
--- /dev/null
+++ b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt
@@ -0,0 +1,14 @@
+#####################################################################
+## tst_bench_qlocalsocket Binary:
+#####################################################################
+
+qt_internal_add_benchmark(tst_bench_qlocalsocket
+ SOURCES
+ tst_qlocalsocket.cpp
+ PUBLIC_LIBRARIES
+ Qt::Network
+ Qt::Test
+)
+
+#### Keys ignored in scope 1:.:.:qlocalsocket.pro:<TRUE>:
+# TEMPLATE = "app"
diff --git a/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp
new file mode 100644
index 0000000000..86112f442d
--- /dev/null
+++ b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp
@@ -0,0 +1,225 @@
+/****************************************************************************
+**
+** Copyright (C) 2021 The Qt Company Ltd.
+** Copyright (C) 2021 Alex Trotsenko <alex1973tr@gmail.com>
+** Contact: https://www.qt.io/licensing/
+**
+** This file is part of the test suite of the Qt Toolkit.
+**
+** $QT_BEGIN_LICENSE:GPL-EXCEPT$
+** Commercial License Usage
+** Licensees holding valid commercial Qt licenses may use this file in
+** accordance with the commercial license agreement provided with the
+** Software or, alternatively, in accordance with the terms contained in
+** a written agreement between you and The Qt Company. For licensing terms
+** and conditions see https://www.qt.io/terms-conditions. For further
+** information use the contact form at https://www.qt.io/contact-us.
+**
+** GNU General Public License Usage
+** Alternatively, this file may be used under the terms of the GNU
+** General Public License version 3 as published by the Free Software
+** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
+** included in the packaging of this file. Please review the following
+** information to ensure the GNU General Public License requirements will
+** be met: https://www.gnu.org/licenses/gpl-3.0.html.
+**
+** $QT_END_LICENSE$
+**
+****************************************************************************/
+
+#include <QTest>
+#include <QtCore/qglobal.h>
+#include <QtCore/qthread.h>
+#include <QtCore/qsemaphore.h>
+#include <QtCore/qbytearray.h>
+#include <QtCore/qeventloop.h>
+#include <QtCore/qvector.h>
+#include <QtCore/qelapsedtimer.h>
+#include <QtNetwork/qlocalsocket.h>
+#include <QtNetwork/qlocalserver.h>
+
+class tst_QLocalSocket : public QObject
+{
+ Q_OBJECT
+
+private slots:
+ void pingPong_data();
+ void pingPong();
+ void dataExchange_data();
+ void dataExchange();
+};
+
+class ServerThread : public QThread
+{
+public:
+ QSemaphore running;
+
+ explicit ServerThread(int chunkSize)
+ {
+ buffer.resize(chunkSize);
+ }
+
+ void run() override
+ {
+ QLocalServer server;
+
+ connect(&server, &QLocalServer::newConnection, [this, &server]() {
+ auto socket = server.nextPendingConnection();
+
+ connect(socket, &QLocalSocket::readyRead, [this, socket]() {
+ const qint64 bytesAvailable = socket->bytesAvailable();
+ Q_ASSERT(bytesAvailable <= this->buffer.size());
+
+ QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
+ QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
+ });
+ });
+
+ QVERIFY(server.listen("foo"));
+ running.release();
+ exec();
+ }
+
+protected:
+ QByteArray buffer;
+};
+
+class SocketFactory : public QObject
+{
+ Q_OBJECT
+
+public:
+ bool stopped = false;
+
+ explicit SocketFactory(int chunkSize, int connections)
+ {
+ buffer.resize(chunkSize);
+ for (int i = 0; i < connections; ++i) {
+ QLocalSocket *socket = new QLocalSocket(this);
+ Q_CHECK_PTR(socket);
+
+ connect(this, &SocketFactory::start, [this, socket]() {
+ QCOMPARE(socket->write(this->buffer), this->buffer.size());
+ });
+
+ connect(socket, &QLocalSocket::readyRead, [i, this, socket]() {
+ const qint64 bytesAvailable = socket->bytesAvailable();
+ Q_ASSERT(bytesAvailable <= this->buffer.size());
+
+ QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
+ emit this->bytesReceived(i, bytesAvailable);
+
+ if (!this->stopped)
+ QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
+ });
+
+ socket->connectToServer("foo");
+ QCOMPARE(socket->state(), QLocalSocket::ConnectedState);
+ }
+ }
+
+signals:
+ void start();
+ void bytesReceived(int channel, qint64 bytes);
+
+protected:
+ QByteArray buffer;
+};
+
+void tst_QLocalSocket::pingPong_data()
+{
+ QTest::addColumn<int>("connections");
+ for (int value : {10, 50, 100, 1000, 5000})
+ QTest::addRow("connections: %d", value) << value;
+}
+
+void tst_QLocalSocket::pingPong()
+{
+ QFETCH(int, connections);
+
+ const int iterations = 100000;
+ Q_ASSERT(iterations >= connections && connections > 0);
+
+ ServerThread serverThread(1);
+ serverThread.start();
+ // Wait for server to start.
+ QVERIFY(serverThread.running.tryAcquire(1, 3000));
+
+ SocketFactory factory(1, connections);
+ QEventLoop eventLoop;
+ QVector<qint64> bytesToRead;
+ QElapsedTimer timer;
+
+ bytesToRead.fill(iterations / connections, connections);
+ connect(&factory, &SocketFactory::bytesReceived,
+ [&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) {
+ Q_UNUSED(bytes);
+
+ if (--bytesToRead[channel] == 0 && --connections == 0) {
+ factory.stopped = true;
+ eventLoop.quit();
+ }
+ });
+
+ timer.start();
+ emit factory.start();
+ eventLoop.exec();
+
+ qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0);
+ serverThread.quit();
+ serverThread.wait();
+}
+
+void tst_QLocalSocket::dataExchange_data()
+{
+ QTest::addColumn<int>("connections");
+ QTest::addColumn<int>("chunkSize");
+ for (int connections : {1, 5, 10}) {
+ for (int chunkSize : {100, 1000, 10000, 100000}) {
+ QTest::addRow("connections: %d, chunk size: %d",
+ connections, chunkSize) << connections << chunkSize;
+ }
+ }
+}
+
+void tst_QLocalSocket::dataExchange()
+{
+ QFETCH(int, connections);
+ QFETCH(int, chunkSize);
+
+ Q_ASSERT(chunkSize > 0 && connections > 0);
+ const qint64 timeToTest = 5000;
+
+ ServerThread serverThread(chunkSize);
+ serverThread.start();
+ // Wait for server to start.
+ QVERIFY(serverThread.running.tryAcquire(1, 3000));
+
+ SocketFactory factory(chunkSize, connections);
+ QEventLoop eventLoop;
+ qint64 totalReceived = 0;
+ QElapsedTimer timer;
+
+ connect(&factory, &SocketFactory::bytesReceived,
+ [&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) {
+ Q_UNUSED(channel);
+
+ totalReceived += bytes;
+ if (timer.elapsed() >= timeToTest) {
+ factory.stopped = true;
+ eventLoop.quit();
+ }
+ });
+
+ timer.start();
+ emit factory.start();
+ eventLoop.exec();
+
+ qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed());
+ serverThread.quit();
+ serverThread.wait();
+}
+
+QTEST_MAIN(tst_QLocalSocket)
+
+#include "tst_qlocalsocket.moc"