From fc35f9496439d7a236f7be1eecae53ad6ddd9112 Mon Sep 17 00:00:00 2001 From: Oliver Wolff Date: Thu, 8 Sep 2016 12:27:53 +0200 Subject: winrt: Rework handling of udp datagrams We may only emit the readReady signal once for every event loop iteration. The previous implementation lead to the situation that the socket engine stopped reading socket data when bursts of data was received. In this case several readReady signals were fired. The socket engine obtained the pending datagrams (not only the first one) and for the following readReady signal no datagram was present. In this case the socket engine stops reading and the engine stalls. The new approach emits the readyRead signal at the most once every event loop iteration. The list of new pending datagrams is queued to be added to the "real" pending datagram list at the same time as the readyRead signal. Thereby we avoid the situation that a client can read all the datagrams before readyRead is emitted. One more advantage of having the worker handle the pending datagrams is that we no longer have to access the socket engine's members inside the callback. Thus we avoid the situation, where a late callback can make the application crash when the socket engine has already been deleted. Task-number: QTBUG-53472 Task-number: QTBUG-53471 Task-number: QTBUG-55895 Change-Id: Ia6d21cb635a40e7bd9e0213bb3a5c54ebc1220eb Reviewed-by: Maurice Kalinowski --- src/network/socket/qnativesocketengine_winrt.cpp | 108 ++++++++++++++++++++--- src/network/socket/qnativesocketengine_winrt_p.h | 9 ++ 2 files changed, 105 insertions(+), 12 deletions(-) (limited to 'src/network') diff --git a/src/network/socket/qnativesocketengine_winrt.cpp b/src/network/socket/qnativesocketengine_winrt.cpp index 6b71912838..69ebfdfceb 100644 --- a/src/network/socket/qnativesocketengine_winrt.cpp +++ b/src/network/socket/qnativesocketengine_winrt.cpp @@ -84,6 +84,69 @@ typedef IAsyncOperationWithProgress IAsyncBufferOperation; QT_BEGIN_NAMESPACE +static inline QString qt_QStringFromHString(const HString &string) +{ + UINT32 length; + PCWSTR rawString = string.GetRawBuffer(&length); + return QString::fromWCharArray(rawString, length); +} + +class SocketEngineWorker : public QObject +{ + Q_OBJECT +signals: + void newDatagramsReceived(const QList &datagram); + +public slots: + Q_INVOKABLE void notifyAboutNewDatagrams() + { + QMutexLocker locker(&mutex); + QList datagrams = pendingDatagrams; + pendingDatagrams.clear(); + emit newDatagramsReceived(datagrams); + } + +public: + HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args) + { + WinRtDatagram datagram; + QHostAddress returnAddress; + ComPtr remoteHost; + HRESULT hr = args->get_RemoteAddress(&remoteHost); + RETURN_OK_IF_FAILED("Could not obtain remote host"); + HString remoteHostString; + hr = remoteHost->get_CanonicalName(remoteHostString.GetAddressOf()); + RETURN_OK_IF_FAILED("Could not obtain remote host's canonical name"); + returnAddress.setAddress(qt_QStringFromHString(remoteHostString)); + datagram.header.senderAddress = returnAddress; + HString remotePort; + hr = args->get_RemotePort(remotePort.GetAddressOf()); + RETURN_OK_IF_FAILED("Could not obtain remote port"); + datagram.header.senderPort = qt_QStringFromHString(remotePort).toInt(); + + ComPtr reader; + hr = args->GetDataReader(&reader); + RETURN_OK_IF_FAILED("Could not obtain data reader"); + quint32 length; + hr = reader->get_UnconsumedBufferLength(&length); + RETURN_OK_IF_FAILED("Could not obtain unconsumed buffer length"); + datagram.data.resize(length); + hr = reader->ReadBytes(length, reinterpret_cast(datagram.data.data())); + RETURN_OK_IF_FAILED("Could not read datagram"); + QMutexLocker locker(&mutex); + // Notify the engine about new datagrams being present at the next event loop iteration + if (pendingDatagrams.isEmpty()) + QMetaObject::invokeMethod(this, "notifyAboutNewDatagrams", Qt::QueuedConnection); + pendingDatagrams << datagram; + + return S_OK; + } + +private: + QList pendingDatagrams; + QMutex mutex; +}; + static QByteArray socketDescription(const QAbstractSocketEngine *s) { QByteArray result; @@ -159,13 +222,6 @@ struct SocketGlobal }; Q_GLOBAL_STATIC(SocketGlobal, g) -static inline QString qt_QStringFromHString(const HString &string) -{ - UINT32 length; - PCWSTR rawString = string.GetRawBuffer(&length); - return QString::fromWCharArray(rawString, length); -} - #define READ_BUFFER_SIZE 65536 template @@ -206,6 +262,7 @@ static qint64 writeIOStream(ComPtr stream, const char *data, qint QNativeSocketEngine::QNativeSocketEngine(QObject *parent) : QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent) { + qRegisterMetaType(); #ifndef QT_NO_SSL Q_D(QNativeSocketEngine); if (parent) @@ -215,6 +272,7 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent) connect(this, SIGNAL(connectionReady()), SLOT(connectionNotification()), Qt::QueuedConnection); connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection); connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection); + connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection); } QNativeSocketEngine::~QNativeSocketEngine() @@ -634,6 +692,7 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea { #ifndef QT_NO_UDPSOCKET Q_D(QNativeSocketEngine); + d->readMutex.lock(); if (d->socketType != QAbstractSocket::UdpSocket || d->pendingDatagrams.isEmpty()) { if (header) header->clear(); @@ -653,6 +712,7 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea } else { readOrigin = datagram.data; } + d->readMutex.unlock(); memcpy(data, readOrigin, qMin(maxlen, qint64(datagram.data.length()))); return readOrigin.length(); #else @@ -880,6 +940,24 @@ void QNativeSocketEngine::establishRead() Q_ASSERT_SUCCEEDED(hr); } +void QNativeSocketEngine::handleNewDatagrams(const QList &datagrams) +{ + Q_D(QNativeSocketEngine); + // Defer putting the datagrams into the list until the next event loop iteration + // (where the readyRead signal is emitted as well) + QMetaObject::invokeMethod(this, "putIntoPendingDatagramsList", Qt::QueuedConnection, + Q_ARG(QList, datagrams)); + if (d->notifyOnRead) + emit readReady(); +} + +void QNativeSocketEngine::putIntoPendingDatagramsList(const QList &datagrams) +{ + Q_D(QNativeSocketEngine); + QMutexLocker locker(&d->readMutex); + d->pendingDatagrams.append(datagrams); +} + bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol) { Q_UNUSED(socketProtocol); @@ -899,8 +977,11 @@ bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType soc RETURN_FALSE_IF_FAILED("createNewSocket: Could not create socket instance"); socketDescriptor = qintptr(socket.Detach()); QEventDispatcherWinRT::runOnXamlThread([&hr, this]() { - hr = udpSocket()->add_MessageReceived(Callback(this, &QNativeSocketEnginePrivate::handleNewDatagram).Get(), &connectionToken); - RETURN_OK_IF_FAILED("createNewSocket: Could not add \"message received\" callback") + hr = udpSocket()->add_MessageReceived(Callback(worker, &SocketEngineWorker::OnNewDatagramReceived).Get(), &connectionToken); + if (FAILED(hr)) { + qErrnoWarning(hr, "createNewSocket: Could not add \"message received\" callback"); + return hr; + } return S_OK; }); if (FAILED(hr)) @@ -931,6 +1012,7 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate() , notifyOnException(false) , closingDown(false) , socketDescriptor(-1) + , worker(new SocketEngineWorker) , sslSocket(Q_NULLPTR) , connectionToken( { -1 } ) { @@ -947,6 +1029,8 @@ QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate() else if (socketType == QAbstractSocket::TcpSocket) hr = tcpListener->remove_ConnectionReceived(connectionToken); Q_ASSERT_SUCCEEDED(hr); + + worker->deleteLater(); } void QNativeSocketEnginePrivate::setError(QAbstractSocket::SocketError error, ErrorString errorString) const @@ -1436,11 +1520,11 @@ HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, I datagram.data.resize(length); hr = reader->ReadBytes(length, reinterpret_cast(datagram.data.data())); RETURN_OK_IF_FAILED("Could not read datagram"); - pendingDatagrams.append(datagram); - if (notifyOnRead) - emit q->readReady(); + emit q->newDatagramReceived(datagram); return S_OK; } QT_END_NAMESPACE + +#include "qnativesocketengine_winrt.moc" diff --git a/src/network/socket/qnativesocketengine_winrt_p.h b/src/network/socket/qnativesocketengine_winrt_p.h index ef219e61df..6bc77c1529 100644 --- a/src/network/socket/qnativesocketengine_winrt_p.h +++ b/src/network/socket/qnativesocketengine_winrt_p.h @@ -63,6 +63,7 @@ QT_BEGIN_NAMESPACE class QNativeSocketEnginePrivate; +class SocketEngineWorker; struct WinRtDatagram { QByteArray data; @@ -137,11 +138,15 @@ signals: void connectionReady(); void readReady(); void writeReady(); + void newDatagramReceived(const WinRtDatagram &datagram); private slots: void establishRead(); + void handleNewDatagrams(const QList &datagram); private: + Q_INVOKABLE void putIntoPendingDatagramsList(const QList &datagrams); + Q_DECLARE_PRIVATE(QNativeSocketEngine) Q_DISABLE_COPY(QNativeSocketEngine) }; @@ -154,6 +159,7 @@ public: ~QNativeSocketEnginePrivate(); qintptr socketDescriptor; + SocketEngineWorker *worker; bool notifyOnRead, notifyOnWrite, notifyOnException; QAtomicInt closingDown; @@ -210,6 +216,7 @@ private: Microsoft::WRL::ComPtr> readOp; QBuffer readBytes; QMutex readMutex; + bool emitOnNewDatagram; QList pendingDatagrams; QList pendingConnections; @@ -228,4 +235,6 @@ private: QT_END_NAMESPACE +Q_DECLARE_METATYPE(WinRtDatagram) + #endif // QNATIVESOCKETENGINE_WINRT_P_H -- cgit v1.2.3