summaryrefslogtreecommitdiffstats
path: root/src/network/socket/qnativesocketengine_winrt.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/socket/qnativesocketengine_winrt.cpp')
-rw-r--r--src/network/socket/qnativesocketengine_winrt.cpp167
1 files changed, 145 insertions, 22 deletions
diff --git a/src/network/socket/qnativesocketengine_winrt.cpp b/src/network/socket/qnativesocketengine_winrt.cpp
index 7c273f957b..f8e92d3f50 100644
--- a/src/network/socket/qnativesocketengine_winrt.cpp
+++ b/src/network/socket/qnativesocketengine_winrt.cpp
@@ -84,6 +84,69 @@ typedef IAsyncOperationWithProgress<IBuffer *, UINT32> IAsyncBufferOperation;
QT_BEGIN_NAMESPACE
+static inline QString qt_QStringFromHString(const HString &string)
+{
+ UINT32 length;
+ PCWSTR rawString = string.GetRawBuffer(&length);
+ return QString::fromWCharArray(rawString, length);
+}
+
+class SocketEngineWorker : public QObject
+{
+ Q_OBJECT
+signals:
+ void newDatagramsReceived(const QList<WinRtDatagram> &datagram);
+
+public slots:
+ Q_INVOKABLE void notifyAboutNewDatagrams()
+ {
+ QMutexLocker locker(&mutex);
+ QList<WinRtDatagram> datagrams = pendingDatagrams;
+ pendingDatagrams.clear();
+ emit newDatagramsReceived(datagrams);
+ }
+
+public:
+ HRESULT OnNewDatagramReceived(IDatagramSocket *, IDatagramSocketMessageReceivedEventArgs *args)
+ {
+ WinRtDatagram datagram;
+ QHostAddress returnAddress;
+ ComPtr<IHostName> remoteHost;
+ HRESULT hr = args->get_RemoteAddress(&remoteHost);
+ RETURN_OK_IF_FAILED("Could not obtain remote host");
+ HString remoteHostString;
+ hr = remoteHost->get_CanonicalName(remoteHostString.GetAddressOf());
+ RETURN_OK_IF_FAILED("Could not obtain remote host's canonical name");
+ returnAddress.setAddress(qt_QStringFromHString(remoteHostString));
+ datagram.header.senderAddress = returnAddress;
+ HString remotePort;
+ hr = args->get_RemotePort(remotePort.GetAddressOf());
+ RETURN_OK_IF_FAILED("Could not obtain remote port");
+ datagram.header.senderPort = qt_QStringFromHString(remotePort).toInt();
+
+ ComPtr<IDataReader> reader;
+ hr = args->GetDataReader(&reader);
+ RETURN_OK_IF_FAILED("Could not obtain data reader");
+ quint32 length;
+ hr = reader->get_UnconsumedBufferLength(&length);
+ RETURN_OK_IF_FAILED("Could not obtain unconsumed buffer length");
+ datagram.data.resize(length);
+ hr = reader->ReadBytes(length, reinterpret_cast<BYTE *>(datagram.data.data()));
+ RETURN_OK_IF_FAILED("Could not read datagram");
+ QMutexLocker locker(&mutex);
+ // Notify the engine about new datagrams being present at the next event loop iteration
+ if (pendingDatagrams.isEmpty())
+ QMetaObject::invokeMethod(this, "notifyAboutNewDatagrams", Qt::QueuedConnection);
+ pendingDatagrams << datagram;
+
+ return S_OK;
+ }
+
+private:
+ QList<WinRtDatagram> pendingDatagrams;
+ QMutex mutex;
+};
+
static QByteArray socketDescription(const QAbstractSocketEngine *s)
{
QByteArray result;
@@ -159,13 +222,6 @@ struct SocketGlobal
};
Q_GLOBAL_STATIC(SocketGlobal, g)
-static inline QString qt_QStringFromHString(const HString &string)
-{
- UINT32 length;
- PCWSTR rawString = string.GetRawBuffer(&length);
- return QString::fromWCharArray(rawString, length);
-}
-
#define READ_BUFFER_SIZE 65536
template <typename T>
@@ -206,6 +262,7 @@ static qint64 writeIOStream(ComPtr<IOutputStream> stream, const char *data, qint
QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
: QAbstractSocketEngine(*new QNativeSocketEnginePrivate(), parent)
{
+ qRegisterMetaType<WinRtDatagram>();
#ifndef QT_NO_SSL
Q_D(QNativeSocketEngine);
if (parent)
@@ -215,6 +272,7 @@ QNativeSocketEngine::QNativeSocketEngine(QObject *parent)
connect(this, SIGNAL(connectionReady()), SLOT(connectionNotification()), Qt::QueuedConnection);
connect(this, SIGNAL(readReady()), SLOT(readNotification()), Qt::QueuedConnection);
connect(this, SIGNAL(writeReady()), SLOT(writeNotification()), Qt::QueuedConnection);
+ connect(d->worker, &SocketEngineWorker::newDatagramsReceived, this, &QNativeSocketEngine::handleNewDatagrams, Qt::QueuedConnection);
}
QNativeSocketEngine::~QNativeSocketEngine()
@@ -259,26 +317,31 @@ bool QNativeSocketEngine::initialize(qintptr socketDescriptor, QAbstractSocket::
// Start processing incoming data
if (d->socketType == QAbstractSocket::TcpSocket) {
HRESULT hr;
- QEventDispatcherWinRT::runOnXamlThread([d, &hr, socket, this]() {
+ QEventDispatcherWinRT::runOnXamlThread([&hr, socket, socketState, this]() {
+ Q_D(QNativeSocketEngine);
ComPtr<IBuffer> buffer;
HRESULT hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
RETURN_OK_IF_FAILED("initialize(): Could not create buffer");
ComPtr<IInputStream> stream;
hr = socket->get_InputStream(&stream);
RETURN_OK_IF_FAILED("initialize(): Could not obtain input stream");
- hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, d->readOp.GetAddressOf());
+ ComPtr<IAsyncBufferOperation> readOp;
+ hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf());
RETURN_OK_IF_FAILED_WITH_ARGS("initialize(): Failed to read from the socket buffer (%s).",
socketDescription(this).constData());
- hr = d->readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
+ d->pendingReadOps.append(readOp);
+ d->socketState = socketState;
+ hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
RETURN_OK_IF_FAILED_WITH_ARGS("initialize(): Failed to set socket read callback (%s).",
socketDescription(this).constData());
return S_OK;
});
if (FAILED(hr))
return false;
+ } else {
+ d->socketState = socketState;
}
- d->socketState = socketState;
return true;
}
@@ -511,9 +574,9 @@ void QNativeSocketEngine::close()
}
#endif // _MSC_VER >= 1900
- if (d->readOp) {
+ for (ComPtr<IAsyncBufferOperation> readOp : d->pendingReadOps) {
ComPtr<IAsyncInfo> info;
- hr = d->readOp.As(&info);
+ hr = readOp.As(&info);
Q_ASSERT_SUCCEEDED(hr);
if (info) {
hr = info->Cancel();
@@ -587,7 +650,7 @@ qint64 QNativeSocketEngine::bytesAvailable() const
if (d->socketType != QAbstractSocket::TcpSocket)
return -1;
- return d->readBytes.size() - d->readBytes.pos();
+ return d->bytesAvailable;
}
qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
@@ -605,7 +668,9 @@ qint64 QNativeSocketEngine::read(char *data, qint64 maxlen)
}
QMutexLocker mutexLocker(&d->readMutex);
- return d->readBytes.read(data, maxlen);
+ qint64 b = d->readBytes.read(data, maxlen);
+ d->bytesAvailable = d->readBytes.size() - d->readBytes.pos();
+ return b;
}
qint64 QNativeSocketEngine::write(const char *data, qint64 len)
@@ -634,7 +699,9 @@ qint64 QNativeSocketEngine::write(const char *data, qint64 len)
qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHeader *header,
PacketHeaderOptions)
{
+#ifndef QT_NO_UDPSOCKET
Q_D(QNativeSocketEngine);
+ d->readMutex.lock();
if (d->socketType != QAbstractSocket::UdpSocket || d->pendingDatagrams.isEmpty()) {
if (header)
header->clear();
@@ -654,12 +721,20 @@ qint64 QNativeSocketEngine::readDatagram(char *data, qint64 maxlen, QIpPacketHea
} else {
readOrigin = datagram.data;
}
+ d->readMutex.unlock();
memcpy(data, readOrigin, qMin(maxlen, qint64(datagram.data.length())));
return readOrigin.length();
+#else
+ Q_UNUSED(data)
+ Q_UNUSED(maxlen)
+ Q_UNUSED(header)
+ return -1;
+#endif // QT_NO_UDPSOCKET
}
qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QIpPacketHeader &header)
{
+#ifndef QT_NO_UDPSOCKET
Q_D(QNativeSocketEngine);
if (d->socketType != QAbstractSocket::UdpSocket)
return -1;
@@ -686,6 +761,12 @@ qint64 QNativeSocketEngine::writeDatagram(const char *data, qint64 len, const QI
Q_ASSERT_SUCCEEDED(hr);
return writeIOStream(stream, data, len);
+#else
+ Q_UNUSED(data)
+ Q_UNUSED(len)
+ Q_UNUSED(header)
+ return -1;
+#endif // QT_NO_UDPSOCKET
}
bool QNativeSocketEngine::hasPendingDatagrams() const
@@ -859,15 +940,35 @@ void QNativeSocketEngine::establishRead()
hr = g->bufferFactory->Create(READ_BUFFER_SIZE, &buffer);
RETURN_HR_IF_FAILED("establishRead(): Failed to create buffer");
- hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, &d->readOp);
+ ComPtr<IAsyncBufferOperation> readOp;
+ hr = stream->ReadAsync(buffer.Get(), READ_BUFFER_SIZE, InputStreamOptions_Partial, readOp.GetAddressOf());
RETURN_HR_IF_FAILED("establishRead(): Failed to initiate socket read");
- hr = d->readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
+ d->pendingReadOps.append(readOp);
+ hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(d, &QNativeSocketEnginePrivate::handleReadyRead).Get());
RETURN_HR_IF_FAILED("establishRead(): Failed to register read callback");
return S_OK;
});
Q_ASSERT_SUCCEEDED(hr);
}
+void QNativeSocketEngine::handleNewDatagrams(const QList<WinRtDatagram> &datagrams)
+{
+ Q_D(QNativeSocketEngine);
+ // Defer putting the datagrams into the list until the next event loop iteration
+ // (where the readyRead signal is emitted as well)
+ QMetaObject::invokeMethod(this, "putIntoPendingDatagramsList", Qt::QueuedConnection,
+ Q_ARG(QList<WinRtDatagram>, datagrams));
+ if (d->notifyOnRead)
+ emit readReady();
+}
+
+void QNativeSocketEngine::putIntoPendingDatagramsList(const QList<WinRtDatagram> &datagrams)
+{
+ Q_D(QNativeSocketEngine);
+ QMutexLocker locker(&d->readMutex);
+ d->pendingDatagrams.append(datagrams);
+}
+
bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType socketType, QAbstractSocket::NetworkLayerProtocol &socketProtocol)
{
Q_UNUSED(socketProtocol);
@@ -887,8 +988,11 @@ bool QNativeSocketEnginePrivate::createNewSocket(QAbstractSocket::SocketType soc
RETURN_FALSE_IF_FAILED("createNewSocket: Could not create socket instance");
socketDescriptor = qintptr(socket.Detach());
QEventDispatcherWinRT::runOnXamlThread([&hr, this]() {
- hr = udpSocket()->add_MessageReceived(Callback<DatagramReceivedHandler>(this, &QNativeSocketEnginePrivate::handleNewDatagram).Get(), &connectionToken);
- RETURN_OK_IF_FAILED("createNewSocket: Could not add \"message received\" callback")
+ hr = udpSocket()->add_MessageReceived(Callback<DatagramReceivedHandler>(worker, &SocketEngineWorker::OnNewDatagramReceived).Get(), &connectionToken);
+ if (FAILED(hr)) {
+ qErrnoWarning(hr, "createNewSocket: Could not add \"message received\" callback");
+ return hr;
+ }
return S_OK;
});
if (FAILED(hr))
@@ -919,6 +1023,7 @@ QNativeSocketEnginePrivate::QNativeSocketEnginePrivate()
, notifyOnException(false)
, closingDown(false)
, socketDescriptor(-1)
+ , worker(new SocketEngineWorker)
, sslSocket(Q_NULLPTR)
, connectionToken( { -1 } )
{
@@ -935,6 +1040,8 @@ QNativeSocketEnginePrivate::~QNativeSocketEnginePrivate()
else if (socketType == QAbstractSocket::TcpSocket)
hr = tcpListener->remove_ConnectionReceived(connectionToken);
Q_ASSERT_SUCCEEDED(hr);
+
+ worker->deleteLater();
}
void QNativeSocketEnginePrivate::setError(QAbstractSocket::SocketError error, ErrorString errorString) const
@@ -1090,6 +1197,7 @@ int QNativeSocketEnginePrivate::option(QAbstractSocketEngine::SocketOption opt)
case QAbstractSocketEngine::MulticastTtlOption:
case QAbstractSocketEngine::MulticastLoopbackOption:
case QAbstractSocketEngine::TypeOfServiceOption:
+ case QAbstractSocketEngine::MaxStreamsSocketOption:
default:
return -1;
}
@@ -1148,6 +1256,7 @@ bool QNativeSocketEnginePrivate::setOption(QAbstractSocketEngine::SocketOption o
case QAbstractSocketEngine::MulticastTtlOption:
case QAbstractSocketEngine::MulticastLoopbackOption:
case QAbstractSocketEngine::TypeOfServiceOption:
+ case QAbstractSocketEngine::MaxStreamsSocketOption:
default:
return false;
}
@@ -1292,10 +1401,12 @@ HRESULT QNativeSocketEnginePrivate::handleConnectOpFinished(IAsyncAction *action
if (socketType != QAbstractSocket::TcpSocket)
return S_OK;
+#ifndef QT_NO_SSL
// Delay the reader so that the SSL socket can upgrade
if (sslSocket)
QObject::connect(qobject_cast<QSslSocket *>(sslSocket), &QSslSocket::encrypted, q, &QNativeSocketEngine::establishRead);
else
+#endif
q->establishRead();
return S_OK;
}
@@ -1308,7 +1419,15 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async
}
Q_Q(QNativeSocketEngine);
+ for (int i = 0; i < pendingReadOps.count(); ++i) {
+ if (pendingReadOps.at(i).Get() == asyncInfo) {
+ pendingReadOps.takeAt(i);
+ break;
+ }
+ }
+ static QMutex mutex;
+ mutex.lock();
// A read in UnconnectedState will close the socket and return -1 and thus tell the caller,
// that the connection was closed. The socket cannot be closed here, as the subsequent read
// might fail then.
@@ -1356,10 +1475,12 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async
Q_ASSERT(readBytes.atEnd());
readBytes.write(reinterpret_cast<const char*>(data), qint64(bufferLength));
readBytes.seek(readPos);
+ bytesAvailable = readBytes.size() - readBytes.pos();
readMutex.unlock();
if (notifyOnRead)
emit q->readReady();
+ mutex.unlock();
hr = QEventDispatcherWinRT::runOnXamlThread([buffer, q, this]() {
UINT32 readBufferLength;
@@ -1373,12 +1494,14 @@ HRESULT QNativeSocketEnginePrivate::handleReadyRead(IAsyncBufferOperation *async
hr = buffer->put_Length(0);
RETURN_HR_IF_FAILED("handleReadyRead(): Could not set buffer length");
+ ComPtr<IAsyncBufferOperation> readOp;
hr = stream->ReadAsync(buffer.Get(), readBufferLength, InputStreamOptions_Partial, &readOp);
if (FAILED(hr)) {
qErrnoWarning(hr, "handleReadyRead(): Could not read into socket stream buffer (%s).",
socketDescription(q).constData());
return S_OK;
}
+ pendingReadOps.append(readOp);
hr = readOp->put_Completed(Callback<SocketReadCompletedHandler>(this, &QNativeSocketEnginePrivate::handleReadyRead).Get());
if (FAILED(hr)) {
qErrnoWarning(hr, "handleReadyRead(): Failed to set socket read callback (%s).",
@@ -1420,11 +1543,11 @@ HRESULT QNativeSocketEnginePrivate::handleNewDatagram(IDatagramSocket *socket, I
datagram.data.resize(length);
hr = reader->ReadBytes(length, reinterpret_cast<BYTE *>(datagram.data.data()));
RETURN_OK_IF_FAILED("Could not read datagram");
- pendingDatagrams.append(datagram);
- if (notifyOnRead)
- emit q->readReady();
+ emit q->newDatagramReceived(datagram);
return S_OK;
}
QT_END_NAMESPACE
+
+#include "qnativesocketengine_winrt.moc"