diff options
author | Maurice Kalinowski <maurice.kalinowski@qt.io> | 2017-11-14 07:21:46 +0100 |
---|---|---|
committer | Maurice Kalinowski <maurice.kalinowski@qt.io> | 2017-11-15 14:06:41 +0000 |
commit | 6b6d2749e6107a77390cc67572e86294115350a2 (patch) | |
tree | 980d73648c3210741bf4f951568d5203c5b495a3 | |
parent | 42c9128e48c8ec66dd8affd53d8e0513427c75a3 (diff) |
Fix subscriptions at reconnectv1.0
When a client reconnects with a cleanSession, all previous subscriptions
need to be disabled due to not being valid anymore. Furthermore, a
broker can decide to not restore a session even though cleanSession was
false. Take this into account by disabling existing subscriptions in
that case as well.
Task-number: QTBUG-64042
Change-Id: I618bc1fe4bb30651a34d70ec9d9e95e48b1c25d5
Reviewed-by: Alex Blasche <alexander.blasche@qt.io>
-rw-r--r-- | src/mqtt/qmqttclient.cpp | 3 | ||||
-rw-r--r-- | src/mqtt/qmqttconnection.cpp | 19 | ||||
-rw-r--r-- | src/mqtt/qmqttconnection_p.h | 2 | ||||
-rw-r--r-- | tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp | 76 |
4 files changed, 100 insertions, 0 deletions
diff --git a/src/mqtt/qmqttclient.cpp b/src/mqtt/qmqttclient.cpp index 22ee311..a765f60 100644 --- a/src/mqtt/qmqttclient.cpp +++ b/src/mqtt/qmqttclient.cpp @@ -421,6 +421,9 @@ void QMqttClient::connectToHost(bool encrypted, const QString &sslPeerName) } d->setStateAndError(Connecting); + if (d->m_cleanSession) + d->m_connection.cleanSubscriptions(); + if (!d->m_connection.ensureTransportOpen(sslPeerName)) { qWarning("Could not ensure that connection is open"); d->setStateAndError(Disconnected, TransportInvalid); diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp index 40e3908..622edba 100644 --- a/src/mqtt/qmqttconnection.cpp +++ b/src/mqtt/qmqttconnection.cpp @@ -471,6 +471,21 @@ quint16 QMqttConnection::unusedPacketIdentifier() const return packetIdentifierCounter; } +void QMqttConnection::cleanSubscriptions() +{ + for (auto item : m_pendingSubscriptionAck) + item->setState(QMqttSubscription::Unsubscribed); + m_pendingSubscriptionAck.clear(); + + for (auto item : m_pendingUnsubscriptions) + item->setState(QMqttSubscription::Unsubscribed); + m_pendingUnsubscriptions.clear(); + + for (auto item : m_activeSubscriptions) + item->setState(QMqttSubscription::Unsubscribed); + m_activeSubscriptions.clear(); +} + void QMqttConnection::transportConnectionEstablished() { if (m_internalState != BrokerConnecting) { @@ -540,6 +555,10 @@ void QMqttConnection::finalize_connack() emit m_clientPrivate->m_client->brokerSessionRestored(); if (m_clientPrivate->m_cleanSession) qWarning("Connected with a clean session, ack contains session present"); + } else { + // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side + // regardless whether cleanSession is false + cleanSubscriptions(); } quint8 connectResultValue; diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h index 128573c..cf9c912 100644 --- a/src/mqtt/qmqttconnection_p.h +++ b/src/mqtt/qmqttconnection_p.h @@ -92,6 +92,8 @@ public: inline quint16 unusedPacketIdentifier() const; inline InternalConnectionState internalState() const { return m_internalState; } + void cleanSubscriptions(); + public Q_SLOTS: void transportConnectionEstablished(); void transportConnectionClosed(); diff --git a/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp b/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp index 7f51dd5..fda8ef6 100644 --- a/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp +++ b/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp @@ -47,6 +47,7 @@ private Q_SLOTS: void getSetCheck(); void wildCards_data(); void wildCards(); + void reconnect(); private: QProcess m_brokerProcess; QString m_testBroker; @@ -141,6 +142,81 @@ void Tst_QMqttSubscription::wildCards() QTRY_VERIFY2(publisher.state() == QMqttClient::Disconnected, "Could not disconnect."); } +void Tst_QMqttSubscription::reconnect() +{ + // QTBUG-64042 + // - Connect with clean session + QMqttClient client; + + client.setHostname(m_testBroker); + client.setPort(m_port); + client.setCleanSession(true); + client.connectToHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker."); + + // - Subscribe to topic A + const QString subscription("topics/resub"); + auto sub = client.subscribe(subscription, 1); + QTRY_VERIFY2(sub->state() == QMqttSubscription::Subscribed, "Could not subscribe to topic."); + + // - Loose connection / connection drop + QAbstractSocket *transport = qobject_cast<QAbstractSocket *>(client.transport()); + QVERIFY2(transport, "Transport has to be QAbstractSocket-based."); + transport->disconnectFromHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "State not correctly switched."); + + // - Reconnect (keeping cleansession) + client.connectToHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker."); + // old subs should get updated / invalidated + QCOMPARE(sub->state(), QMqttSubscription::Unsubscribed); + + // - Resubscribe + auto reSub = client.subscribe(subscription, 1); + QTRY_VERIFY2(reSub->state() == QMqttSubscription::Subscribed, "Could not re-subscribe to topic."); + QSignalSpy receivalSpy(reSub, SIGNAL(messageReceived(QMqttMessage))); + + QSignalSpy pubSpy(&client, SIGNAL(messageSent(qint32))); + client.publish(subscription, "Sending after reconnect 1", 1); + QTRY_VERIFY2(pubSpy.size() == 1, "Could not publish message."); + + QTRY_VERIFY2(receivalSpy.size() == 1, "Did not receive message on re-subscribe."); + + // - Loose connection / connection drop + transport = qobject_cast<QAbstractSocket *>(client.transport()); + QVERIFY2(transport, "Transport has to be QAbstractSocket-based."); + transport->disconnectFromHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "State not correctly switched."); + + // - Reconnect (no cleansession) + QSignalSpy restoredSpy(&client, SIGNAL(brokerSessionRestored())); + + client.setCleanSession(false); + client.connectToHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker."); + // Could not identify a broker doing this. The specs states: + // [MQTT-4.1.0-1] The Client and Server MUST store Session state for the entire + // duration of the Session. + // [MQTT-4.1.0-2] A Session MUST last at least as long it has an active Network Connection. + // All testbrokers delete the session at transport disconnect, regardless of DISCONNECT been + // send before or not. + if (restoredSpy.count() > 0) { + QCOMPARE(reSub->state(), QMqttSubscription::Subscribed); + pubSpy.clear(); + receivalSpy.clear(); + client.publish(subscription, "Sending after reconnect 2", 1); + QTRY_VERIFY2(pubSpy.size() == 1, "Could not publish message."); + QTRY_VERIFY2(receivalSpy.size() == 1, "Did not receive message on re-subscribe."); + } else { + // No need to test this + qDebug() << "Test broker does not support long-livety sessions."; + } + // - Old subscription is still active + + client.disconnectFromHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "Could not disconnect."); +} + QTEST_MAIN(Tst_QMqttSubscription) #include "tst_qmqttsubscription.moc" |