From f265c87e015c26225b80297f5c9f430ea7030214 Mon Sep 17 00:00:00 2001 From: Alex Trotsenko Date: Sat, 23 Jan 2021 13:00:22 +0200 Subject: Allow QWindowsPipe{Reader|Writer} to work with foreign event loops, take 2 When a foreign event loop that does not enter an alertable wait state is running (which is also the case when a native dialog window is modal), pipe handlers would freeze temporarily due to their APC callbacks not being invoked. We address this problem by moving the I/O callbacks to the Windows thread pool, and only posting completion events to the main loop from there. That makes the actual I/O completely independent from any main loop, while the signal delivery works also with foreign loops (because Qt event delivery uses Windows messages, which foreign loops typically handle correctly). As a nice side effect, performance (and in particular scalability) is improved. Several other approaches have been tried: 1) Using QWinEventNotifier was about a quarter slower and scaled much worse. Additionally, it also required a rather egregious hack to handle the (pathological) case of a single thread talking to both ends of a QLocalSocket synchronously. 2) Queuing APCs from the thread pool to the main thread and also posting wake-up events to its event loop, and handling I/O on the main thread; this performed roughly like this solution, but scaled half as well, and the separate wake-up path was still deemed hacky. 3) Only posting wake-up events to the main thread from the thread pool, and still handling I/O on the main thread; this still performed comparably to 2), and the pathological case was not handled at all. 4) Using this approach for reads and that of 3) for writes was slightly faster with big amounts of data, but scaled slightly worse, and the diverging implementations were deemed not desirable. Fixes: QTBUG-64443 Change-Id: I66443c3021d6ba98639a214c3e768be97d2cf14b Reviewed-by: Oswald Buddenhagen --- .../gui/kernel/noqteventloop/tst_noqteventloop.cpp | 42 ++++ .../socket/qlocalsocket/tst_qlocalsocket.cpp | 20 -- tests/benchmarks/network/socket/CMakeLists.txt | 1 + .../network/socket/qlocalsocket/CMakeLists.txt | 14 ++ .../socket/qlocalsocket/tst_qlocalsocket.cpp | 225 +++++++++++++++++++++ 5 files changed, 282 insertions(+), 20 deletions(-) create mode 100644 tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt create mode 100644 tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp (limited to 'tests') diff --git a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp index eba4aa4790..65f2329e3d 100644 --- a/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp +++ b/tests/auto/gui/kernel/noqteventloop/tst_noqteventloop.cpp @@ -29,12 +29,15 @@ #include #include +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -51,6 +54,7 @@ class tst_NoQtEventLoop : public QObject private slots: void consumeMouseEvents(); void consumeSocketEvents(); + void consumeLocalSocketEvents(); void consumeWinEvents_data(); void consumeWinEvents(); void deliverEventsInLivelock(); @@ -318,6 +322,44 @@ void tst_NoQtEventLoop::consumeSocketEvents() QVERIFY(server.hasPendingConnections()); } +void tst_NoQtEventLoop::consumeLocalSocketEvents() +{ + int argc = 1; + char *argv[] = { const_cast("test"), 0 }; + QGuiApplication app(argc, argv); + QLocalServer server; + QLocalSocket client; + QSignalSpy readyReadSpy(&client, &QIODevice::readyRead); + + QVERIFY(server.listen("consumeLocalSocketEvents")); + client.connectToServer("consumeLocalSocketEvents"); + QVERIFY(client.waitForConnected(200)); + QVERIFY(server.waitForNewConnection(200)); + QLocalSocket *clientSocket = server.nextPendingConnection(); + QVERIFY(clientSocket); + QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten); + server.close(); + + bool timeExpired = false; + QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() { + timeExpired = true; + }); + QVERIFY(clientSocket->putChar(0)); + + // Exec own message loop + MSG msg; + while (::GetMessage(&msg, NULL, 0, 0)) { + ::TranslateMessage(&msg); + ::DispatchMessage(&msg); + + if (timeExpired || readyReadSpy.count() != 0) + break; + } + QVERIFY(!timeExpired); + QCOMPARE(bytesWrittenSpy.count(), 1); + QCOMPARE(readyReadSpy.count(), 1); +} + void tst_NoQtEventLoop::consumeWinEvents_data() { QTest::addColumn("peeking"); diff --git a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp index c99ca990da..8b2b4ea4da 100644 --- a/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp +++ b/tests/auto/network/socket/qlocalsocket/tst_qlocalsocket.cpp @@ -639,26 +639,6 @@ void tst_QLocalSocket::readBufferOverflow() QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize)); // no more bytes available QCOMPARE(client.bytesAvailable(), 0); - -#ifdef Q_OS_WIN - serverSocket->write(buffer, readBufferSize); - QVERIFY(serverSocket->waitForBytesWritten()); - - // ensure the read completion routine is called - SleepEx(100, true); - QVERIFY(client.waitForReadyRead()); - QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize)); - - // Test overflow caused by an asynchronous pipe operation. - client.setReadBufferSize(1); - serverSocket->write(buffer, 2); - - QVERIFY(client.waitForReadyRead()); - // socket disconnects, if there any error on pipe - QCOMPARE(client.state(), QLocalSocket::ConnectedState); - QCOMPARE(client.bytesAvailable(), qint64(2)); - QCOMPARE(client.read(buffer, 2), qint64(2)); -#endif } static qint64 writeCommand(const QVariant &command, QIODevice *device, int commandCounter) diff --git a/tests/benchmarks/network/socket/CMakeLists.txt b/tests/benchmarks/network/socket/CMakeLists.txt index 6d54bc05f5..7c122a73ef 100644 --- a/tests/benchmarks/network/socket/CMakeLists.txt +++ b/tests/benchmarks/network/socket/CMakeLists.txt @@ -1,4 +1,5 @@ # Generated from socket.pro. +add_subdirectory(qlocalsocket) add_subdirectory(qtcpserver) add_subdirectory(qudpsocket) diff --git a/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt new file mode 100644 index 0000000000..7c56b0b946 --- /dev/null +++ b/tests/benchmarks/network/socket/qlocalsocket/CMakeLists.txt @@ -0,0 +1,14 @@ +##################################################################### +## tst_bench_qlocalsocket Binary: +##################################################################### + +qt_internal_add_benchmark(tst_bench_qlocalsocket + SOURCES + tst_qlocalsocket.cpp + PUBLIC_LIBRARIES + Qt::Network + Qt::Test +) + +#### Keys ignored in scope 1:.:.:qlocalsocket.pro:: +# TEMPLATE = "app" diff --git a/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp new file mode 100644 index 0000000000..86112f442d --- /dev/null +++ b/tests/benchmarks/network/socket/qlocalsocket/tst_qlocalsocket.cpp @@ -0,0 +1,225 @@ +/**************************************************************************** +** +** Copyright (C) 2021 The Qt Company Ltd. +** Copyright (C) 2021 Alex Trotsenko +** Contact: https://www.qt.io/licensing/ +** +** This file is part of the test suite of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:GPL-EXCEPT$ +** Commercial License Usage +** Licensees holding valid commercial Qt licenses may use this file in +** accordance with the commercial license agreement provided with the +** Software or, alternatively, in accordance with the terms contained in +** a written agreement between you and The Qt Company. For licensing terms +** and conditions see https://www.qt.io/terms-conditions. For further +** information use the contact form at https://www.qt.io/contact-us. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 3 as published by the Free Software +** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT +** included in the packaging of this file. Please review the following +** information to ensure the GNU General Public License requirements will +** be met: https://www.gnu.org/licenses/gpl-3.0.html. +** +** $QT_END_LICENSE$ +** +****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class tst_QLocalSocket : public QObject +{ + Q_OBJECT + +private slots: + void pingPong_data(); + void pingPong(); + void dataExchange_data(); + void dataExchange(); +}; + +class ServerThread : public QThread +{ +public: + QSemaphore running; + + explicit ServerThread(int chunkSize) + { + buffer.resize(chunkSize); + } + + void run() override + { + QLocalServer server; + + connect(&server, &QLocalServer::newConnection, [this, &server]() { + auto socket = server.nextPendingConnection(); + + connect(socket, &QLocalSocket::readyRead, [this, socket]() { + const qint64 bytesAvailable = socket->bytesAvailable(); + Q_ASSERT(bytesAvailable <= this->buffer.size()); + + QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable); + QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable); + }); + }); + + QVERIFY(server.listen("foo")); + running.release(); + exec(); + } + +protected: + QByteArray buffer; +}; + +class SocketFactory : public QObject +{ + Q_OBJECT + +public: + bool stopped = false; + + explicit SocketFactory(int chunkSize, int connections) + { + buffer.resize(chunkSize); + for (int i = 0; i < connections; ++i) { + QLocalSocket *socket = new QLocalSocket(this); + Q_CHECK_PTR(socket); + + connect(this, &SocketFactory::start, [this, socket]() { + QCOMPARE(socket->write(this->buffer), this->buffer.size()); + }); + + connect(socket, &QLocalSocket::readyRead, [i, this, socket]() { + const qint64 bytesAvailable = socket->bytesAvailable(); + Q_ASSERT(bytesAvailable <= this->buffer.size()); + + QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable); + emit this->bytesReceived(i, bytesAvailable); + + if (!this->stopped) + QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable); + }); + + socket->connectToServer("foo"); + QCOMPARE(socket->state(), QLocalSocket::ConnectedState); + } + } + +signals: + void start(); + void bytesReceived(int channel, qint64 bytes); + +protected: + QByteArray buffer; +}; + +void tst_QLocalSocket::pingPong_data() +{ + QTest::addColumn("connections"); + for (int value : {10, 50, 100, 1000, 5000}) + QTest::addRow("connections: %d", value) << value; +} + +void tst_QLocalSocket::pingPong() +{ + QFETCH(int, connections); + + const int iterations = 100000; + Q_ASSERT(iterations >= connections && connections > 0); + + ServerThread serverThread(1); + serverThread.start(); + // Wait for server to start. + QVERIFY(serverThread.running.tryAcquire(1, 3000)); + + SocketFactory factory(1, connections); + QEventLoop eventLoop; + QVector bytesToRead; + QElapsedTimer timer; + + bytesToRead.fill(iterations / connections, connections); + connect(&factory, &SocketFactory::bytesReceived, + [&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) { + Q_UNUSED(bytes); + + if (--bytesToRead[channel] == 0 && --connections == 0) { + factory.stopped = true; + eventLoop.quit(); + } + }); + + timer.start(); + emit factory.start(); + eventLoop.exec(); + + qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0); + serverThread.quit(); + serverThread.wait(); +} + +void tst_QLocalSocket::dataExchange_data() +{ + QTest::addColumn("connections"); + QTest::addColumn("chunkSize"); + for (int connections : {1, 5, 10}) { + for (int chunkSize : {100, 1000, 10000, 100000}) { + QTest::addRow("connections: %d, chunk size: %d", + connections, chunkSize) << connections << chunkSize; + } + } +} + +void tst_QLocalSocket::dataExchange() +{ + QFETCH(int, connections); + QFETCH(int, chunkSize); + + Q_ASSERT(chunkSize > 0 && connections > 0); + const qint64 timeToTest = 5000; + + ServerThread serverThread(chunkSize); + serverThread.start(); + // Wait for server to start. + QVERIFY(serverThread.running.tryAcquire(1, 3000)); + + SocketFactory factory(chunkSize, connections); + QEventLoop eventLoop; + qint64 totalReceived = 0; + QElapsedTimer timer; + + connect(&factory, &SocketFactory::bytesReceived, + [&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) { + Q_UNUSED(channel); + + totalReceived += bytes; + if (timer.elapsed() >= timeToTest) { + factory.stopped = true; + eventLoop.quit(); + } + }); + + timer.start(); + emit factory.start(); + eventLoop.exec(); + + qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed()); + serverThread.quit(); + serverThread.wait(); +} + +QTEST_MAIN(tst_QLocalSocket) + +#include "tst_qlocalsocket.moc" -- cgit v1.2.3