From 078c71ac8f0c74e06e0d81ea1a5a44126abc27b6 Mon Sep 17 00:00:00 2001 From: Andrew Knight Date: Tue, 22 Apr 2014 12:43:28 +0300 Subject: WinRT: Fix TCP socket reads All read calls are now pulled from an intermediate buffer which is populated from the asynchronous callback (this was a TODO previously, and was breaking downloads of large requests). As a side-benefit, the use of only async callbacks ensures fewer first-chance exceptions appear in the debug output. Task-number: QTBUG-30196 Change-Id: I5653742d8d94934a4b4a4227298865d20518bc4c Reviewed-by: Oliver Wolff --- src/network/socket/qnativesocketengine_winrt.cpp | 319 ++++++++++++++--------- 1 file changed, 200 insertions(+), 119 deletions(-) (limited to 'src/network/socket/qnativesocketengine_winrt.cpp') diff --git a/src/network/socket/qnativesocketengine_winrt.cpp b/src/network/socket/qnativesocketengine_winrt.cpp index 4b2d1c372e..db1d3dc96b 100644 --- a/src/network/socket/qnativesocketengine_winrt.cpp +++ b/src/network/socket/qnativesocketengine_winrt.cpp @@ -74,6 +74,7 @@ typedef ITypedEventHandler DatagramReceivedHandler; typedef IAsyncOperationWithProgressCompletedHandler SocketReadCompletedHandler; typedef IAsyncOperationWithProgressCompletedHandler SocketWriteCompletedHandler; +typedef IAsyncOperationWithProgress IAsyncBufferOperation; QT_BEGIN_NAMESPACE @@ -130,6 +131,8 @@ QString qt_QStringFromHSTRING(HSTRING string) return QString::fromWCharArray(rawString, length); } +#define READ_BUFFER_SIZE 8192 + class ByteArrayBuffer : public Microsoft::WRL::RuntimeClass, IBuffer, Windows::Storage::Streams::IBufferByteAccess> { @@ -167,16 +170,6 @@ public: return S_OK; } - QNativeSocketEngine *engine() const - { - return m_engine; - } - - void setEngine(QNativeSocketEngine *engine) - { - m_engine = engine; - } - ComPtr inputStream() const { return m_stream; @@ -190,13 +183,33 @@ public: private: QByteArray m_bytes; UINT32 m_length; - QPointer m_engine; ComPtr m_stream; }; +template +static AsyncStatus opStatus(const ComPtr &op) +{ + ComPtr info; + HRESULT hr = op.As(&info); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to cast op to IAsyncInfo."); + return Error; + } + AsyncStatus status; + hr = info->get_Status(&status); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get AsyncStatus."); + return Error; + } + return status; +} + QNativeSocketEngine::QNativeSocketEngine(QObject *parent) : QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent) { + connect(this, SIGNAL(connectionReady()), SLOT(connectionNotification()), Qt::QueuedConnection); + connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection); + connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection); } QNativeSocketEngine::~QNativeSocketEngine() @@ -230,7 +243,7 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket:: // Currently, only TCP sockets are initialized this way. SocketHandler *handler = gSocketHandler(); - d->tcp = handler->pendingTcpSockets.value(socketDescriptor, Q_NULLPTR); + d->tcp = handler->pendingTcpSockets.take(socketDescriptor); d->socketType = QAbstractSocket::TcpSocket; if (!d->tcp || !d->fetchConnectionParameters()) @@ -271,23 +284,33 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port) return false; } + ComPtr op; const QString portString = QString::number(port); HStringReference portReference(reinterpret_cast(portString.utf16())); - ComPtr action; HRESULT hr = E_FAIL; if (d->socketType == QAbstractSocket::TcpSocket) - hr = d->tcp->ConnectAsync(remoteHost.Get(), portReference.Get(), &action); + hr = d->tcp->ConnectAsync(remoteHost.Get(), portReference.Get(), &op); else if (d->socketType == QAbstractSocket::UdpSocket) - hr = d->udp->ConnectAsync(remoteHost.Get(), portReference.Get(), &action); + hr = d->udp->ConnectAsync(remoteHost.Get(), portReference.Get(), &op); if (FAILED(hr)) { qWarning("QNativeSocketEnginePrivate::nativeConnect:: Could not obtain connect action"); return false; } - action->put_Completed(Callback(&QNativeSocketEnginePrivate::interruptEventDispatcher).Get()); - hr = action->GetResults(); - while ((hr = action->GetResults()) == E_ILLEGAL_METHOD_CALL) - QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents | QEventLoop::WaitForMoreEvents); + hr = op->put_Completed(Callback( + d, &QNativeSocketEnginePrivate::handleConnectToHost).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Unable to set host connection callback."); + return false; + } + d->socketState = QAbstractSocket::ConnectingState; + while (opStatus(op) == Started) + d->eventLoop.processEvents(); + + AsyncStatus status = opStatus(op); + if (status == Error || status == Canceled) + return false; + if (hr == 0x8007274c) { // A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. d->setError(QAbstractSocket::NetworkError, d->ConnectionTimeOutErrorString); d->socketState = QAbstractSocket::UnconnectedState; @@ -305,24 +328,21 @@ bool QNativeSocketEngine::connectToHostByName(const QString &name, quint16 port) } if (d->socketType == QAbstractSocket::TcpSocket) { - UINT32 capacity; - hr = d->inputBuffer->get_Capacity(&capacity); - if (FAILED(hr)) - return false; IInputStream *stream; hr = d->tcp->get_InputStream(&stream); if (FAILED(hr)) return false; - ByteArrayBuffer *buffer = static_cast(d->inputBuffer.Get()); - buffer->setEngine(this); + ByteArrayBuffer *buffer = static_cast(d->readBuffer.Get()); buffer->setInputStream(stream); - ComPtr> op; - hr = stream->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op); + ComPtr op; + hr = stream->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op); if (FAILED(hr)) return false; - hr = op->put_Completed(Callback(&QNativeSocketEnginePrivate::handleReadyRead).Get()); - if (FAILED(hr)) + hr = op->put_Completed(Callback(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to set socket read callback."); return false; + } } d->socketState = QAbstractSocket::ConnectedState; return true; @@ -358,21 +378,35 @@ bool QNativeSocketEngine::bind(const QHostAddress &address, quint16 port) d->tcpListener->add_ConnectionReceived(Callback(d, &QNativeSocketEnginePrivate::handleClientConnection).Get(), &token); hr = d->tcpListener->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op); if (FAILED(hr)) { - qWarning("Unable to bind"); // ### Set error message + qErrnoWarning(hr, "Unable to bind socket."); // ### Set error message return false; } } else if (d->socketType == QAbstractSocket::UdpSocket) { hr = d->udp->BindEndpointAsync(hostAddress.Get(), portString.Get(), &op); if (FAILED(hr)) { - qWarning("unable to bind"); // ### Set error message + qErrnoWarning(hr, "Unable to bind socket."); // ### Set error message + return false; + } + hr = op->put_Completed(Callback(d, &QNativeSocketEnginePrivate::handleBindCompleted).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Unable to set bind callback."); return false; } } if (op) { - // Wait for connection to enter bound state - TODO: timeout, check result - while ((hr = op->GetResults()) == E_ILLEGAL_METHOD_CALL) - QCoreApplication::processEvents(); + while (opStatus(op) == Started) + d->eventLoop.processEvents(); + + AsyncStatus status = opStatus(op); + if (status == Error || status == Canceled) + return false; + + hr = op->GetResults(); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to bind socket"); + return false; + } d->socketState = QAbstractSocket::BoundState; d->fetchConnectionParameters(); @@ -410,17 +444,22 @@ int QNativeSocketEngine::accept() if (d->socketType == QAbstractSocket::TcpSocket) { IStreamSocket *socket = d->pendingConnections.takeFirst(); - UINT32 capacity; - d->inputBuffer->get_Capacity(&capacity); IInputStream *stream; socket->get_InputStream(&stream); // TODO: delete buffer and stream on socket close - ByteArrayBuffer *buffer = static_cast(d->inputBuffer.Get()); - buffer->setEngine(this); + ByteArrayBuffer *buffer = static_cast(d->readBuffer.Get()); buffer->setInputStream(stream); - ComPtr> op; - stream->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op); - op->put_Completed(Callback(&QNativeSocketEnginePrivate::handleReadyRead).Get()); + ComPtr op; + HRESULT hr = stream->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op); + if (FAILED(hr)) { + qErrnoWarning(hr, "Faild to read from the socket buffer."); + return -1; + } + hr = op->put_Completed(Callback(d, &QNativeSocketEnginePrivate::handleReadyRead).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to set socket read callback."); + return -1; + } d->currentConnections.append(socket); SocketHandler *handler = gSocketHandler(); @@ -445,7 +484,6 @@ void QNativeSocketEngine::close() d->closingDown = true; socket->Close(); socket->Release(); - closeNotification(); d->socketDescriptor = -1; } d->socketDescriptor = -1; @@ -493,13 +531,7 @@ qint64 QNativeSocketEngine::bytesAvailable() const if (d->socketType != QAbstractSocket::TcpSocket) return -1; - if (d->inputBuffer) { - UINT32 len; - d->inputBuffer->get_Length(&len); - return len; - } - - return -1; + return d->readBytes.size() - d->readBytes.pos(); } qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) @@ -508,54 +540,56 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen) if (d->socketType != QAbstractSocket::TcpSocket) return -1; - ComPtr dataReaderStatics; - GetActivationFactory(HString::MakeReference(RuntimeClass_Windows_Storage_Streams_DataReader).Get(), &dataReaderStatics); - ComPtr reader; - - dataReaderStatics->FromBuffer(d->inputBuffer.Get(), &reader); - - UINT32 bufferCapacity; - d->inputBuffer->get_Capacity(&bufferCapacity); - qint64 lengthToRead = maxlen < bufferCapacity ? maxlen : bufferCapacity; - - UINT32 bufferLength; - d->inputBuffer->get_Length(&bufferLength); - - lengthToRead = bufferLength < lengthToRead ? bufferLength : lengthToRead; - reader->ReadBytes(lengthToRead, (unsigned char*)data); - return lengthToRead; + QMutexLocker mutexLocker(&d->readMutex); + return d->readBytes.read(data, maxlen); } -template -static qint64 nativeWrite(T *socket, const char *data, qint64 len) +qint64 QNativeSocketEngine::write(const char *data, qint64 len) { + Q_D(QNativeSocketEngine); + HRESULT hr = E_FAIL; ComPtr stream; - HRESULT hr = socket->get_OutputStream(&stream); - if (FAILED(hr)) + if (d->socketType == QAbstractSocket::TcpSocket) + hr = d->tcp->get_OutputStream(&stream); + else if (d->socketType == QAbstractSocket::UdpSocket) + hr = d->udp->get_OutputStream(&stream); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get output stream to socket."); return -1; + } + ComPtr buffer = Make(data, len); ComPtr> op; hr = stream->WriteAsync(buffer.Get(), &op); - if (FAILED(hr)) + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to write to socket."); + return -1; + } + hr = op->put_Completed(Callback>( + d, &QNativeSocketEnginePrivate::handleWriteCompleted).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to set socket write callback."); return -1; + } + + while (opStatus(op) == Started) + d->eventLoop.processEvents(); + + AsyncStatus status = opStatus(op); + if (status == Error || status == Canceled) + return -1; + UINT32 bytesWritten; - while ((hr = op->GetResults(&bytesWritten)) == E_ILLEGAL_METHOD_CALL) - QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents); - return bytesWritten; -} + hr = op->GetResults(&bytesWritten); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get written socket length."); + return -1; + } -qint64 QNativeSocketEngine::write(const char *data, qint64 len) -{ - Q_D(QNativeSocketEngine); - qint64 bytesWritten = -1; - if (d->socketType == QAbstractSocket::TcpSocket) - bytesWritten = nativeWrite(d->tcp, data, len); - else if (d->socketType == QAbstractSocket::UdpSocket) - bytesWritten = nativeWrite(d->udp, data, len); - if (bytesWritten != -1 && d->notifyOnWrite) - writeNotification(); - return bytesWritten; + if (bytesWritten && d->notifyOnWrite) + emit writeReady(); + return bytesWritten; } qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QHostAddress *addr, quint16 *port) @@ -698,7 +732,7 @@ bool QNativeSocketEngine::setOption(QAbstractSocketEngine::SocketOption option, bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut) { - Q_D(const QNativeSocketEngine); + Q_D(QNativeSocketEngine); Q_CHECK_VALID_SOCKETLAYER(QNativeSocketEngine::waitForRead(), false); Q_CHECK_NOT_STATE(QNativeSocketEngine::waitForRead(), QAbstractSocket::UnconnectedState, false); @@ -714,14 +748,12 @@ bool QNativeSocketEngine::waitForRead(int msecs, bool *timedOut) return true; // If we are a client, we are ready to read if our buffer has data - UINT32 length; - if (FAILED(d->inputBuffer->get_Length(&length))) - return false; - if (length) + QMutexLocker locker(&d->readMutex); + if (!d->readBytes.atEnd()) return true; // Nothing to do, wait for more events - QCoreApplication::processEvents(QEventLoop::ExcludeUserInputEvents|QEventLoop::WaitForMoreEvents); + d->eventLoop.processEvents(); } d->setError(QAbstractSocket::SocketTimeoutError, @@ -832,8 +864,8 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate() , closingDown(false) , socketDescriptor(-1) { - ComPtr buffer = Make(8192); - inputBuffer = buffer; + ComPtr buffer = Make(READ_BUFFER_SIZE); + readBuffer = buffer; } QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate() @@ -1119,6 +1151,11 @@ bool QNativeSocketEnginePrivate::fetchConnectionParameters() return true; } +HRESULT QNativeSocketEnginePrivate::handleBindCompleted(IAsyncAction *, AsyncStatus) +{ + return S_OK; +} + HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener *listener, IStreamSocketListenerConnectionReceivedEventArgs *args) { Q_Q(QNativeSocketEngine); @@ -1126,47 +1163,91 @@ HRESULT QNativeSocketEnginePrivate::handleClientConnection(IStreamSocketListener IStreamSocket *socket; args->get_Socket(&socket); pendingConnections.append(socket); - q->connectionNotification(); - q->readNotification(); - return interruptEventDispatcher(0, Completed); + emit q->connectionReady(); + emit q->readReady(); + return S_OK; } -HRESULT QNativeSocketEnginePrivate::interruptEventDispatcher(IAsyncAction *, AsyncStatus) +HRESULT QNativeSocketEnginePrivate::handleConnectToHost(ABI::Windows::Foundation::IAsyncAction *, ABI::Windows::Foundation::AsyncStatus) { - if (QThread *thread = QThread::currentThread()) { - if (QAbstractEventDispatcher *dispatcher = thread->eventDispatcher()) - dispatcher->interrupt(); - } return S_OK; } -HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncOperationWithProgress *asyncInfo, AsyncStatus) +HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *asyncInfo, AsyncStatus status) { + Q_Q(QNativeSocketEngine); + if (wasDeleted || isDeletingChildren) + return S_OK; + + if (status == Error || status == Canceled) + return S_OK; + ByteArrayBuffer *buffer = 0; HRESULT hr = asyncInfo->GetResults((IBuffer **)&buffer); - if (FAILED(hr)) - return hr; + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get ready read results."); + return S_OK; + } UINT32 len; buffer->get_Length(&len); - QNativeSocketEngine *q = buffer->engine(); - if (!q) + if (!len) { + if (q->isReadNotificationEnabled()) + emit q->readReady(); + return S_OK; + } + + byte *data; + buffer->Buffer(&data); + + readMutex.lock(); + if (readBytes.atEnd()) // Everything has been read; the buffer is safe to reset + readBytes.close(); + if (!readBytes.isOpen()) + readBytes.open(QBuffer::ReadWrite|QBuffer::Truncate); + qint64 readPos = readBytes.pos(); + readBytes.seek(readBytes.size()); + Q_ASSERT(readBytes.atEnd()); + readBytes.write(reinterpret_cast(data), qint64(len)); + readBytes.seek(readPos); + readMutex.unlock(); + + if (q->isReadNotificationEnabled()) + emit q->readReady(); + + ComPtr op; + hr = buffer->inputStream()->ReadAsync(buffer, READ_BUFFER_SIZE, InputStreamOptions_Partial, &op); + if (FAILED(hr)) { + qErrnoWarning(hr, "Could not read into socket stream buffer."); return S_OK; - if (len > 0 && q->isReadNotificationEnabled()) { - q->readNotification(); } + hr = op->put_Completed(Callback(this, &QNativeSocketEnginePrivate::handleReadyRead).Get()); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to set socket read callback."); + return S_OK; + } + return S_OK; +} - // Continue reading ### TODO: read into offset!!! - UINT32 capacity; - buffer->get_Capacity(&capacity); - ComPtr> op; - if (SUCCEEDED(buffer->inputStream()->ReadAsync(buffer, capacity, InputStreamOptions_Partial, &op))) { - if (q) - return op->put_Completed(Callback(&QNativeSocketEnginePrivate::handleReadyRead).Get()); - else - return op->put_Completed(nullptr); +HRESULT QNativeSocketEnginePrivate::handleWriteCompleted(IAsyncOperationWithProgress *op, AsyncStatus status) +{ + if (status == Error) { + ComPtr info; + HRESULT hr = op->QueryInterface(IID_PPV_ARGS(&info)); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to cast operation."); + return S_OK; + } + HRESULT errorCode; + hr = info->get_ErrorCode(&errorCode); + if (FAILED(hr)) { + qErrnoWarning(hr, "Failed to get error code."); + return S_OK; + } + qErrnoWarning(errorCode, "A socket error occurred."); + return S_OK; } - return E_FAIL; + return S_OK; } HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, IDatagramSocketMessageReceivedEventArgs *args) @@ -1174,7 +1255,7 @@ HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, I Q_Q(QNativeSocketEngine); Q_ASSERT(udp == socket); pendingDatagrams.append(args); - q->readNotification(); + emit q->readReady(); return S_OK; } -- cgit v1.2.3