diff options
Diffstat (limited to 'src/network/socket/qnativesocketengine_winrt.cpp')
-rw-r--r-- | src/network/socket/qnativesocketengine_winrt.cpp | 167 |
1 files changed, 145 insertions, 22 deletions
diff --git a/src/network/socket/qnativesocketengine_winrt.cpp b/src/network/socket/qnativesocketengine_winrt.cpp index 7c273f957b..f8e92d3f50 100644 --- a/src/network/socket/qnativesocketengine_winrt.cpp +++ b/src/network/socket/qnativesocketengine_winrt.cpp @@ -84,6 +84,69 @@ typedef IAsyncOperationWithProgress<IBuffer *, UINT32> IAsyncBufferOperation; QT_BEGIN_NAMESPACE +static inline QString qt_QStringFromHString(const HString &string) +{ + UINT32 length; + PCWSTR rawString = string.GetRawBuffer(&length); + return QString::fromWCharArray(rawString, length); +} + +class SocketEngineWorker : public QObject +{ + Q_OBJECT +signals: + void newDatagramsReceived(const QList<WinRtDatagram> &datagram); + +public slots: + Q_INVOKABLE void notifyAboutNewDatagrams() + { + QMutexLocker locker(&mutex); + QList<WinRtDatagram> datagrams = pendingDatagrams; + pendingDatagrams.clear(); + emit newDatagramsReceived(datagrams); + } + +public: + HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args) + { + WinRtDatagram datagram; + QHostAddress returnAddress; + ComPtr<IHostName> remoteHost; + HRESULT hr = args->get_RemoteAddress(&remoteHost); + RETURN_OK_IF_FAILED("Could not obtain remote host"); + HString remoteHostString; + hr = remoteHost->get_CanonicalName(remoteHostString.GetAddressOf()); + RETURN_OK_IF_FAILED("Could not obtain remote host's canonical name"); + returnAddress.setAddress(qt_QStringFromHString(remoteHostString)); + datagram.header.senderAddress = returnAddress; + HString remotePort; + hr = args->get_RemotePort(remotePort.GetAddressOf()); + RETURN_OK_IF_FAILED("Could not obtain remote port"); + datagram.header.senderPort = qt_QStringFromHString(remotePort).toInt(); + + ComPtr<IDataReader> reader; + hr = args->GetDataReader(&reader); + RETURN_OK_IF_FAILED("Could not obtain data reader"); + quint32 length; + hr = reader->get_UnconsumedBufferLength(&length); + RETURN_OK_IF_FAILED("Could not obtain unconsumed buffer length"); + datagram.data.resize(length); + hr = reader->ReadBytes(length, reinterpret_cast<BYTE *>(datagram.data.data())); + RETURN_OK_IF_FAILED("Could not read datagram"); + QMutexLocker locker(&mutex); + // Notify the engine about new datagrams being present at the next event loop iteration + if (pendingDatagrams.isEmpty()) + QMetaObject::invokeMethod(this, "notifyAboutNewDatagrams", Qt::QueuedConnection); + pendingDatagrams << datagram; + + return S_OK; + } + +private: + QList<WinRtDatagram> pendingDatagrams; + QMutex mutex; +}; + static QByteArray socketDescription(const QAbstractSocketEngine *s) { QByteArray result; @@ -159,13 +222,6 @@ struct SocketGlobal }; Q_GLOBAL_STATIC(SocketGlobal, g) -static inline QString qt_QStringFromHString(const HString &string) -{ - UINT32 length; - PCWSTR rawString = string.GetRawBuffer(&length); - return QString::fromWCharArray(rawString, length); -} - #define READ_BUFFER_SIZE 65536 template <typename T> @@ -206,6 +262,7 @@ static qint64 writeIOStream(ComPtr<IOutputStream> stream, const char *data, qint QNativeSocketEngine::QNativeSocketEngine(QObject *parent) : QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent) { + qRegisterMetaType<WinRtDatagram>(); #ifndef QT_NO_SSL Q_D(QNativeSocketEngine); if (parent) @@ -215,6 +272,7 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent) connect(this, SIGNAL(connectionReady()), SLOT(connectionNotification()), Qt::QueuedConnection); connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection); connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection); + connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection); } QNativeSocketEngine::~QNativeSocketEngine() @@ -259,26 +317,31 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket:: // Start processing incoming data if (d->socketType == QAbstractSocket::TcpSocket) { HRESULT hr; - QEventDispatcherWinRT::runOnXamlThread([d, &hr, socket, this]() { + QEventDispatcherWinRT::runOnXamlThread([&hr, socket, socketState, this]() { + Q_D(QNativeSocketEngine); ComPtr<IBuffer> buffer; HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer); RETURN_OK_IF_FAILED("initialize(): Could not create buffer"); ComPtr<IInputStream> stream; hr = socket->get_InputStream(&stream); RETURN_OK_IF_FAILED("initialize(): Could not obtain input stream"); - hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, d->readOp.GetAddressOf()); + ComPtr<IAsyncBufferOperation> readOp; + hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf()); RETURN_OK_IF_FAILED_WITH_ARGS("initialize(): Failed to read from the socket buffer (%s).", socketDescription(this).constData()); - hr = d->readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); + d->pendingReadOps.append(readOp); + d->socketState = socketState; + hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); RETURN_OK_IF_FAILED_WITH_ARGS("initialize(): Failed to set socket read callback (%s).", socketDescription(this).constData()); return S_OK; }); if (FAILED(hr)) return false; + } else { + d->socketState = socketState; } - d->socketState = socketState; return true; } @@ -511,9 +574,9 @@ void QNativeSocketEngine::close() } #endif // _MSC_VER >= 1900 - if (d->readOp) { + for (ComPtr<IAsyncBufferOperation> readOp : d->pendingReadOps) { ComPtr<IAsyncInfo> info; - hr = d->readOp.As(&info); + hr = readOp.As(&info); Q_ASSERT_SUCCEEDED(hr); if (info) { hr = info->Cancel(); @@ -587,7 +650,7 @@ qint64 QNativeSocketEngine::bytesAvailable() const if (d->socketType != QAbstractSocket::TcpSocket) return -1; - return d->readBytes.size() - d->readBytes.pos(); + return d->bytesAvailable; } qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) @@ -605,7 +668,9 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) } QMutexLocker mutexLocker(&d->readMutex); - return d->readBytes.read(data, maxlen); + qint64 b = d->readBytes.read(data, maxlen); + d->bytesAvailable = d->readBytes.size() - d->readBytes.pos(); + return b; } qint64 QNativeSocketEngine::write(const char *data, qint64 len) @@ -634,7 +699,9 @@ qint64 QNativeSocketEngine::write(const char *data, qint64 len) qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHeader *header, PacketHeaderOptions) { +#ifndef QT_NO_UDPSOCKET Q_D(QNativeSocketEngine); + d->readMutex.lock(); if (d->socketType != QAbstractSocket::UdpSocket || d->pendingDatagrams.isEmpty()) { if (header) header->clear(); @@ -654,12 +721,20 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea } else { readOrigin = datagram.data; } + d->readMutex.unlock(); memcpy(data, readOrigin, qMin(maxlen, qint64(datagram.data.length()))); return readOrigin.length(); +#else + Q_UNUSED(data) + Q_UNUSED(maxlen) + Q_UNUSED(header) + return -1; +#endif // QT_NO_UDPSOCKET } qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QIpPacketHeader &header) { +#ifndef QT_NO_UDPSOCKET Q_D(QNativeSocketEngine); if (d->socketType != QAbstractSocket::UdpSocket) return -1; @@ -686,6 +761,12 @@ qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QI Q_ASSERT_SUCCEEDED(hr); return writeIOStream(stream, data, len); +#else + Q_UNUSED(data) + Q_UNUSED(len) + Q_UNUSED(header) + return -1; +#endif // QT_NO_UDPSOCKET } bool QNativeSocketEngine::hasPendingDatagrams() const @@ -859,15 +940,35 @@ void QNativeSocketEngine::establishRead() hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer); RETURN_HR_IF_FAILED("establishRead(): Failed to create buffer"); - hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, &d->readOp); + ComPtr<IAsyncBufferOperation> readOp; + hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf()); RETURN_HR_IF_FAILED("establishRead(): Failed to initiate socket read"); - hr = d->readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); + d->pendingReadOps.append(readOp); + hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); RETURN_HR_IF_FAILED("establishRead(): Failed to register read callback"); return S_OK; }); Q_ASSERT_SUCCEEDED(hr); } +void QNativeSocketEngine::handleNewDatagrams(const QList<WinRtDatagram> &datagrams) +{ + Q_D(QNativeSocketEngine); + // Defer putting the datagrams into the list until the next event loop iteration + // (where the readyRead signal is emitted as well) + QMetaObject::invokeMethod(this, "putIntoPendingDatagramsList", Qt::QueuedConnection, + Q_ARG(QList<WinRtDatagram>, datagrams)); + if (d->notifyOnRead) + emit readReady(); +} + +void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram> &datagrams) +{ + Q_D(QNativeSocketEngine); + QMutexLocker locker(&d->readMutex); + d->pendingDatagrams.append(datagrams); +} + bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol) { Q_UNUSED(socketProtocol); @@ -887,8 +988,11 @@ bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType soc RETURN_FALSE_IF_FAILED("createNewSocket: Could not create socket instance"); socketDescriptor = qintptr(socket.Detach()); QEventDispatcherWinRT::runOnXamlThread([&hr, this]() { - hr = udpSocket()->add_MessageReceived(Callback<DatagramReceivedHandler>(this, &QNativeSocketEnginePrivate::handleNewDatagram).Get(), &connectionToken); - RETURN_OK_IF_FAILED("createNewSocket: Could not add \"message received\" callback") + hr = udpSocket()->add_MessageReceived(Callback<DatagramReceivedHandler>(worker, &SocketEngineWorker::OnNewDatagramReceived).Get(), &connectionToken); + if (FAILED(hr)) { + qErrnoWarning(hr, "createNewSocket: Could not add \"message received\" callback"); + return hr; + } return S_OK; }); if (FAILED(hr)) @@ -919,6 +1023,7 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate() , notifyOnException(false) , closingDown(false) , socketDescriptor(-1) + , worker(new SocketEngineWorker) , sslSocket(Q_NULLPTR) , connectionToken( { -1 } ) { @@ -935,6 +1040,8 @@ QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate() else if (socketType == QAbstractSocket::TcpSocket) hr = tcpListener->remove_ConnectionReceived(connectionToken); Q_ASSERT_SUCCEEDED(hr); + + worker->deleteLater(); } void QNativeSocketEnginePrivate::setError(QAbstractSocket::SocketError error, ErrorString errorString) const @@ -1090,6 +1197,7 @@ int QNativeSocketEnginePrivate::option(QAbstractSocketEngine::SocketOption opt) case QAbstractSocketEngine::MulticastTtlOption: case QAbstractSocketEngine::MulticastLoopbackOption: case QAbstractSocketEngine::TypeOfServiceOption: + case QAbstractSocketEngine::MaxStreamsSocketOption: default: return -1; } @@ -1148,6 +1256,7 @@ bool QNativeSocketEnginePrivate::setOption(QAbstractSocketEngine::SocketOption o case QAbstractSocketEngine::MulticastTtlOption: case QAbstractSocketEngine::MulticastLoopbackOption: case QAbstractSocketEngine::TypeOfServiceOption: + case QAbstractSocketEngine::MaxStreamsSocketOption: default: return false; } @@ -1292,10 +1401,12 @@ HRESULT QNativeSocketEnginePrivate::handleConnectOpFinished(IAsyncAction *action if (socketType != QAbstractSocket::TcpSocket) return S_OK; +#ifndef QT_NO_SSL // Delay the reader so that the SSL socket can upgrade if (sslSocket) QObject::connect(qobject_cast<QSslSocket *>(sslSocket), &QSslSocket::encrypted, q, &QNativeSocketEngine::establishRead); else +#endif q->establishRead(); return S_OK; } @@ -1308,7 +1419,15 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async } Q_Q(QNativeSocketEngine); + for (int i = 0; i < pendingReadOps.count(); ++i) { + if (pendingReadOps.at(i).Get() == asyncInfo) { + pendingReadOps.takeAt(i); + break; + } + } + static QMutex mutex; + mutex.lock(); // A read in UnconnectedState will close the socket and return -1 and thus tell the caller, // that the connection was closed. The socket cannot be closed here, as the subsequent read // might fail then. @@ -1356,10 +1475,12 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async Q_ASSERT(readBytes.atEnd()); readBytes.write(reinterpret_cast<const char*>(data), qint64(bufferLength)); readBytes.seek(readPos); + bytesAvailable = readBytes.size() - readBytes.pos(); readMutex.unlock(); if (notifyOnRead) emit q->readReady(); + mutex.unlock(); hr = QEventDispatcherWinRT::runOnXamlThread([buffer, q, this]() { UINT32 readBufferLength; @@ -1373,12 +1494,14 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async hr = buffer->put_Length(0); RETURN_HR_IF_FAILED("handleReadyRead(): Could not set buffer length"); + ComPtr<IAsyncBufferOperation> readOp; hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp); if (FAILED(hr)) { qErrnoWarning(hr, "handleReadyRead(): Could not read into socket stream buffer (%s).", socketDescription(q).constData()); return S_OK; } + pendingReadOps.append(readOp); hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &QNativeSocketEnginePrivate::handleReadyRead).Get()); if (FAILED(hr)) { qErrnoWarning(hr, "handleReadyRead(): Failed to set socket read callback (%s).", @@ -1420,11 +1543,11 @@ HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, I datagram.data.resize(length); hr = reader->ReadBytes(length, reinterpret_cast<BYTE *>(datagram.data.data())); RETURN_OK_IF_FAILED("Could not read datagram"); - pendingDatagrams.append(datagram); - if (notifyOnRead) - emit q->readReady(); + emit q->newDatagramReceived(datagram); return S_OK; } QT_END_NAMESPACE + +#include "qnativesocketengine_winrt.moc" |