diff options
Diffstat (limited to 'src/network/socket/qnativesocketengine_winrt.cpp')
-rw-r--r-- | src/network/socket/qnativesocketengine_winrt.cpp | 456 |
1 files changed, 274 insertions, 182 deletions
diff --git a/src/network/socket/qnativesocketengine_winrt.cpp b/src/network/socket/qnativesocketengine_winrt.cpp index 8257eec9ea..8b36406c67 100644 --- a/src/network/socket/qnativesocketengine_winrt.cpp +++ b/src/network/socket/qnativesocketengine_winrt.cpp @@ -126,6 +126,33 @@ static HRESULT qt_winrt_try_create_thread_network_context(QString host, ComPtr<I } #endif // _MSC_VER >= 1900 +typedef QHash<qintptr, IStreamSocket *> TcpSocketHash; + +struct SocketHandler +{ + SocketHandler() : socketCount(0) {} + qintptr socketCount; + TcpSocketHash pendingTcpSockets; +}; + +Q_GLOBAL_STATIC(SocketHandler, gSocketHandler) + +struct SocketGlobal +{ + SocketGlobal() + { + HRESULT hr; + hr = GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_Buffer).Get(), + &bufferFactory); + Q_ASSERT_SUCCEEDED(hr); + } + + ComPtr<IBufferFactory> bufferFactory; +}; +Q_GLOBAL_STATIC(SocketGlobal, g) + +#define READ_BUFFER_SIZE 65536 + static inline QString qt_QStringFromHString(const HString &string) { UINT32 length; @@ -136,8 +163,43 @@ static inline QString qt_QStringFromHString(const HString &string) class SocketEngineWorker : public QObject { Q_OBJECT +public: + SocketEngineWorker(QNativeSocketEnginePrivate *engine) + : enginePrivate(engine) + { + } + + ~SocketEngineWorker() + { + if (Q_UNLIKELY(initialReadOp)) { + ComPtr<IAsyncInfo> info; + HRESULT hr = initialReadOp.As(&info); + Q_ASSERT_SUCCEEDED(hr); + if (info) { + hr = info->Cancel(); + Q_ASSERT_SUCCEEDED(hr); + hr = info->Close(); + Q_ASSERT_SUCCEEDED(hr); + } + } + + if (readOp) { + ComPtr<IAsyncInfo> info; + HRESULT hr = readOp.As(&info); + Q_ASSERT_SUCCEEDED(hr); + if (info) { + hr = info->Cancel(); + Q_ASSERT_SUCCEEDED(hr); + hr = info->Close(); + Q_ASSERT_SUCCEEDED(hr); + } + } + } + signals: void newDatagramsReceived(const QList<WinRtDatagram> &datagram); + void newDataReceived(const QVector<QByteArray> &data); + void socketErrorOccured(QAbstractSocket::SocketError error); public slots: Q_INVOKABLE void notifyAboutNewDatagrams() @@ -148,7 +210,30 @@ public slots: emit newDatagramsReceived(datagrams); } + Q_INVOKABLE void notifyAboutNewData() + { + QMutexLocker locker(&mutex); + const QVector<QByteArray> newData = std::move(pendingData); + pendingData.clear(); + emit newDataReceived(newData); + } + public: + void startReading() + { + ComPtr<IBuffer> buffer; + HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer); + Q_ASSERT_SUCCEEDED(hr); + ComPtr<IInputStream> stream; + hr = tcpSocket->get_InputStream(&stream); + Q_ASSERT_SUCCEEDED(hr); + hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, initialReadOp.GetAddressOf()); + Q_ASSERT_SUCCEEDED(hr); + enginePrivate->socketState = QAbstractSocket::ConnectedState; + hr = initialReadOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &SocketEngineWorker::onReadyRead).Get()); + Q_ASSERT_SUCCEEDED(hr); + } + HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args) { WinRtDatagram datagram; @@ -184,9 +269,127 @@ public: return S_OK; } + HRESULT onReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status) + { + if (asyncInfo == initialReadOp.Get()) { + initialReadOp.Reset(); + } else if (asyncInfo == readOp.Get()) { + readOp.Reset(); + } else { + Q_ASSERT(false); + } + + // A read in UnconnectedState will close the socket and return -1 and thus tell the caller, + // that the connection was closed. The socket cannot be closed here, as the subsequent read + // might fail then. + if (status == Error || status == Canceled) { + emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError); + return S_OK; + } + + ComPtr<IBuffer> buffer; + HRESULT hr = asyncInfo->GetResults(&buffer); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get read results buffer"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + + UINT32 bufferLength; + hr = buffer->get_Length(&bufferLength); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get buffer length"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + // A zero sized buffer length signals, that the remote host closed the connection. The socket + // cannot be closed though, as the following read might have socket descriptor -1 and thus and + // the closing of the socket won't be communicated to the caller. So only the error is set. The + // actual socket close happens inside of read. + if (!bufferLength) { + emit socketErrorOccured(QAbstractSocket::RemoteHostClosedError); + return S_OK; + } + + ComPtr<Windows::Storage::Streams::IBufferByteAccess> byteArrayAccess; + hr = buffer.As(&byteArrayAccess); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get cast buffer"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + byte *data; + hr = byteArrayAccess->Buffer(&data); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to access buffer data"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + + QByteArray newData(reinterpret_cast<const char*>(data), qint64(bufferLength)); + QMutexLocker readLocker(&mutex); + if (pendingData.isEmpty()) + QMetaObject::invokeMethod(this, "notifyAboutNewData", Qt::QueuedConnection); + pendingData << newData; + readLocker.unlock(); + + hr = QEventDispatcherWinRT::runOnXamlThread([buffer, this]() { + UINT32 readBufferLength; + ComPtr<IInputStream> stream; + HRESULT hr = tcpSocket->get_InputStream(&stream); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to obtain input stream"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + + // Reuse the stream buffer + hr = buffer->get_Capacity(&readBufferLength); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get buffer capacity"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + hr = buffer->put_Length(0); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to set buffer length"); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + + hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp); + if (FAILED(hr)) { + qErrnoWarning(hr, "onReadyRead(): Could not read into socket stream buffer."); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &SocketEngineWorker::onReadyRead).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "onReadyRead(): Failed to set socket read callback."); + emit socketErrorOccured(QAbstractSocket::UnknownSocketError); + return S_OK; + } + return S_OK; + }); + Q_ASSERT_SUCCEEDED(hr); + return S_OK; + } + + void setTcpSocket(ComPtr<IStreamSocket> socket) { tcpSocket = socket; } + private: + ComPtr<IStreamSocket> tcpSocket; + QList<WinRtDatagram> pendingDatagrams; + QVector<QByteArray> pendingData; + + // Protects pendingData/pendingDatagrams which are accessed from native callbacks QMutex mutex; + + ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> initialReadOp; + ComPtr<IAsyncOperationWithProgress<IBuffer *, UINT32>> readOp; + + QNativeSocketEnginePrivate *enginePrivate; }; static QByteArray socketDescription(const QAbstractSocketEngine *s) @@ -239,33 +442,6 @@ static QByteArray socketDescription(const QAbstractSocketEngine *s) } } while (0) #define Q_TR(a) QT_TRANSLATE_NOOP(QNativeSocketEngine, a) -typedef QHash<qintptr, IStreamSocket *> TcpSocketHash; - -struct SocketHandler -{ - SocketHandler() : socketCount(0) {} - qintptr socketCount; - TcpSocketHash pendingTcpSockets; -}; - -Q_GLOBAL_STATIC(SocketHandler, gSocketHandler) - -struct SocketGlobal -{ - SocketGlobal() - { - HRESULT hr; - hr = GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_Buffer).Get(), - &bufferFactory); - Q_ASSERT_SUCCEEDED(hr); - } - - ComPtr<IBufferFactory> bufferFactory; -}; -Q_GLOBAL_STATIC(SocketGlobal, g) - -#define READ_BUFFER_SIZE 65536 - template <typename T> static AsyncStatus opStatus(const ComPtr<T> &op) { @@ -315,6 +491,10 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent) connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection); connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection); connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection); + connect(d->worker, &SocketEngineWorker::newDataReceived, + this, &QNativeSocketEngine::handleNewData, Qt::QueuedConnection); + connect(d->worker, &SocketEngineWorker::socketErrorOccured, + this, &QNativeSocketEngine::handleTcpError, Qt::QueuedConnection); } QNativeSocketEngine::~QNativeSocketEngine() @@ -358,23 +538,9 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket:: // Start processing incoming data if (d->socketType == QAbstractSocket::TcpSocket) { - HRESULT hr = QEventDispatcherWinRT::runOnXamlThread([d, socket, socketState, this]() { - ComPtr<IBuffer> buffer; - HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer); - RETURN_HR_IF_FAILED("initialize(): Could not create buffer"); - ComPtr<IInputStream> stream; - hr = socket->get_InputStream(&stream); - RETURN_HR_IF_FAILED("initialize(): Could not obtain input stream"); - ComPtr<IAsyncBufferOperation> readOp; - hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf()); - RETURN_HR_IF_FAILED_WITH_ARGS("initialize(): Failed to read from the socket buffer (%s).", - socketDescription(this).constData()); - QMutexLocker locker(&d->readOperationsMutex); - d->pendingReadOps.append(readOp); - d->socketState = socketState; - hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); - RETURN_HR_IF_FAILED_WITH_ARGS("initialize(): Failed to set socket read callback (%s).", - socketDescription(this).constData()); + HRESULT hr = QEventDispatcherWinRT::runOnXamlThread([d, socket, this]() { + d->worker->setTcpSocket(socket); + d->worker->startReading(); return S_OK; }); if (FAILED(hr)) @@ -639,20 +805,6 @@ void QNativeSocketEngine::close() } #endif // _MSC_VER >= 1900 - QMutexLocker locker(&d->readOperationsMutex); - for (ComPtr<IAsyncBufferOperation> readOp : d->pendingReadOps) { - ComPtr<IAsyncInfo> info; - hr = readOp.As(&info); - Q_ASSERT_SUCCEEDED(hr); - if (info) { - hr = info->Cancel(); - Q_ASSERT_SUCCEEDED(hr); - hr = info->Close(); - Q_ASSERT_SUCCEEDED(hr); - } - } - locker.unlock(); - if (d->socketDescriptor != -1) { ComPtr<IClosable> socket; if (d->socketType == QAbstractSocket::TcpSocket) { @@ -730,14 +882,32 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) // happens and there isn't anything left in the buffer, we have to return -1 in order to signal // the closing of the socket. QMutexLocker mutexLocker(&d->readMutex); - if (d->readBytes.pos() == d->readBytes.size() && d->socketState != QAbstractSocket::ConnectedState) { + if (d->pendingData.isEmpty() && d->socketState != QAbstractSocket::ConnectedState) { close(); return -1; } - qint64 b = d->readBytes.read(data, maxlen); - d->bytesAvailable = d->readBytes.size() - d->readBytes.pos(); - return b; + QByteArray readData; + qint64 leftToMaxLen = maxlen; + while (leftToMaxLen > 0 && !d->pendingData.isEmpty()) { + QByteArray pendingData = d->pendingData.takeFirst(); + // Do not read the whole data. Put the rest of it back into the "queue" + if (leftToMaxLen < pendingData.length()) { + readData += pendingData.left(leftToMaxLen); + pendingData = pendingData.remove(0, maxlen); + d->pendingData.prepend(pendingData); + break; + } else { + readData += pendingData; + leftToMaxLen -= pendingData.length(); + } + } + const int copyLength = qMin(maxlen, qint64(readData.length())); + d->bytesAvailable -= copyLength; + mutexLocker.unlock(); + + memcpy(data, readData, copyLength); + return copyLength; } qint64 QNativeSocketEngine::write(const char *data, qint64 len) @@ -913,7 +1083,7 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut) // If we are a client, we are ready to read if our buffer has data QMutexLocker locker(&d->readMutex); - if (!d->readBytes.atEnd()) + if (!d->pendingData.isEmpty()) return true; // Nothing to do, wait for more events @@ -1001,21 +1171,8 @@ void QNativeSocketEngine::establishRead() HRESULT hr; hr = QEventDispatcherWinRT::runOnXamlThread([d]() { - ComPtr<IInputStream> stream; - HRESULT hr = d->tcpSocket()->get_InputStream(&stream); - RETURN_HR_IF_FAILED("establishRead(): Failed to get socket input stream"); - - ComPtr<IBuffer> buffer; - hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer); - RETURN_HR_IF_FAILED("establishRead(): Failed to create buffer"); - - ComPtr<IAsyncBufferOperation> readOp; - hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf()); - RETURN_HR_IF_FAILED("establishRead(): Failed to initiate socket read"); - QMutexLocker locker(&d->readOperationsMutex); - d->pendingReadOps.append(readOp); - hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); - RETURN_HR_IF_FAILED("establishRead(): Failed to register read callback"); + d->worker->setTcpSocket(d->tcpSocket()); + d->worker->startReading(); return S_OK; }); Q_ASSERT_SUCCEEDED(hr); @@ -1032,6 +1189,32 @@ void QNativeSocketEngine::handleNewDatagrams(const QList<WinRtDatagram> &datagra emit readReady(); } +void QNativeSocketEngine::handleNewData(const QVector<QByteArray> &data) +{ + // Defer putting the data into the list until the next event loop iteration + // (where the readyRead signal is emitted as well) + QMetaObject::invokeMethod(this, "putIntoPendingData", Qt::QueuedConnection, + Q_ARG(QVector<QByteArray>, data)); +} + +void QNativeSocketEngine::handleTcpError(QAbstractSocket::SocketError error) +{ + Q_D(QNativeSocketEngine); + QNativeSocketEnginePrivate::ErrorString errorString; + switch (error) { + case QAbstractSocket::RemoteHostClosedError: + errorString = QNativeSocketEnginePrivate::RemoteHostClosedErrorString; + break; + default: + errorString = QNativeSocketEnginePrivate::UnknownSocketErrorString; + } + + d->setError(error, errorString); + d->socketState = QAbstractSocket::UnconnectedState; + if (d->notifyOnRead) + emit readReady(); +} + void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram> &datagrams) { Q_D(QNativeSocketEngine); @@ -1039,6 +1222,18 @@ void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram> d->pendingDatagrams.append(datagrams); } +void QNativeSocketEngine::putIntoPendingData(const QVector<QByteArray> &data) +{ + Q_D(QNativeSocketEngine); + QMutexLocker locker(&d->readMutex); + d->pendingData.append(data); + for (const QByteArray &newData : data) + d->bytesAvailable += newData.length(); + locker.unlock(); + if (d->notifyOnRead) + readNotification(); +} + bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol) { Q_UNUSED(socketProtocol); @@ -1093,7 +1288,7 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate() , notifyOnException(false) , closingDown(false) , socketDescriptor(-1) - , worker(new SocketEngineWorker) + , worker(new SocketEngineWorker(this)) , sslSocket(Q_NULLPTR) , connectionToken( { -1 } ) { @@ -1481,109 +1676,6 @@ HRESULT QNativeSocketEnginePrivate::handleConnectOpFinished(IAsyncAction *action return S_OK; } -HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status) -{ - if (closingDown || wasDeleted || isDeletingChildren - || socketState == QAbstractSocket::UnconnectedState) { - return S_OK; - } - - Q_Q(QNativeSocketEngine); - QMutexLocker locker(&readOperationsMutex); - for (int i = 0; i < pendingReadOps.count(); ++i) { - if (pendingReadOps.at(i).Get() == asyncInfo) { - pendingReadOps.takeAt(i); - break; - } - } - locker.unlock(); - - // A read in UnconnectedState will close the socket and return -1 and thus tell the caller, - // that the connection was closed. The socket cannot be closed here, as the subsequent read - // might fail then. - if (status == Error || status == Canceled) { - setError(QAbstractSocket::RemoteHostClosedError, RemoteHostClosedErrorString); - socketState = QAbstractSocket::UnconnectedState; - if (notifyOnRead) - emit q->readReady(); - return S_OK; - } - - ComPtr<IBuffer> buffer; - HRESULT hr = asyncInfo->GetResults(&buffer); - RETURN_OK_IF_FAILED("Failed to get read results buffer"); - - UINT32 bufferLength; - hr = buffer->get_Length(&bufferLength); - Q_ASSERT_SUCCEEDED(hr); - // A zero sized buffer length signals, that the remote host closed the connection. The socket - // cannot be closed though, as the following read might have socket descriptor -1 and thus and - // the closing of the socket won't be communicated to the caller. So only the error is set. The - // actual socket close happens inside of read. - if (!bufferLength) { - setError(QAbstractSocket::RemoteHostClosedError, RemoteHostClosedErrorString); - socketState = QAbstractSocket::UnconnectedState; - if (notifyOnRead) - emit q->readReady(); - return S_OK; - } - - ComPtr<Windows::Storage::Streams::IBufferByteAccess> byteArrayAccess; - hr = buffer.As(&byteArrayAccess); - Q_ASSERT_SUCCEEDED(hr); - byte *data; - hr = byteArrayAccess->Buffer(&data); - Q_ASSERT_SUCCEEDED(hr); - - QMutexLocker readLocker(&readMutex); - if (readBytes.atEnd()) // Everything has been read; the buffer is safe to reset - readBytes.close(); - if (!readBytes.isOpen()) - readBytes.open(QBuffer::ReadWrite|QBuffer::Truncate); - qint64 readPos = readBytes.pos(); - readBytes.seek(readBytes.size()); - Q_ASSERT(readBytes.atEnd()); - readBytes.write(reinterpret_cast<const char*>(data), qint64(bufferLength)); - readBytes.seek(readPos); - bytesAvailable = readBytes.size() - readBytes.pos(); - readLocker.unlock(); - - if (notifyOnRead) - emit q->readReady(); - - hr = QEventDispatcherWinRT::runOnXamlThread([buffer, q, this]() { - UINT32 readBufferLength; - ComPtr<IInputStream> stream; - HRESULT hr = tcpSocket()->get_InputStream(&stream); - RETURN_HR_IF_FAILED("handleReadyRead(): Could not obtain input stream"); - - // Reuse the stream buffer - hr = buffer->get_Capacity(&readBufferLength); - RETURN_HR_IF_FAILED("handleReadyRead(): Could not obtain buffer capacity"); - hr = buffer->put_Length(0); - RETURN_HR_IF_FAILED("handleReadyRead(): Could not set buffer length"); - - ComPtr<IAsyncBufferOperation> readOp; - hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp); - if (FAILED(hr)) { - qErrnoWarning(hr, "handleReadyRead(): Could not read into socket stream buffer (%s).", - socketDescription(q).constData()); - return S_OK; - } - QMutexLocker locker(&readOperationsMutex); - pendingReadOps.append(readOp); - hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &QNativeSocketEnginePrivate::handleReadyRead).Get()); - if (FAILED(hr)) { - qErrnoWarning(hr, "handleReadyRead(): Failed to set socket read callback (%s).", - socketDescription(q).constData()); - return S_OK; - } - return S_OK; - }); - Q_ASSERT_SUCCEEDED(hr); - return S_OK; -} - HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args) { Q_Q(QNativeSocketEngine); |