summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaurice Kalinowski <maurice.kalinowski@qt.io>2017-11-14 07:21:46 +0100
committerMaurice Kalinowski <maurice.kalinowski@qt.io>2017-11-15 14:06:41 +0000
commit6b6d2749e6107a77390cc67572e86294115350a2 (patch)
tree980d73648c3210741bf4f951568d5203c5b495a3
parent42c9128e48c8ec66dd8affd53d8e0513427c75a3 (diff)
Fix subscriptions at reconnectv1.0
When a client reconnects with a cleanSession, all previous subscriptions need to be disabled due to not being valid anymore. Furthermore, a broker can decide to not restore a session even though cleanSession was false. Take this into account by disabling existing subscriptions in that case as well. Task-number: QTBUG-64042 Change-Id: I618bc1fe4bb30651a34d70ec9d9e95e48b1c25d5 Reviewed-by: Alex Blasche <alexander.blasche@qt.io>
-rw-r--r--src/mqtt/qmqttclient.cpp3
-rw-r--r--src/mqtt/qmqttconnection.cpp19
-rw-r--r--src/mqtt/qmqttconnection_p.h2
-rw-r--r--tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp76
4 files changed, 100 insertions, 0 deletions
diff --git a/src/mqtt/qmqttclient.cpp b/src/mqtt/qmqttclient.cpp
index 22ee311..a765f60 100644
--- a/src/mqtt/qmqttclient.cpp
+++ b/src/mqtt/qmqttclient.cpp
@@ -421,6 +421,9 @@ void QMqttClient::connectToHost(bool encrypted, const QString &sslPeerName)
}
d->setStateAndError(Connecting);
+ if (d->m_cleanSession)
+ d->m_connection.cleanSubscriptions();
+
if (!d->m_connection.ensureTransportOpen(sslPeerName)) {
qWarning("Could not ensure that connection is open");
d->setStateAndError(Disconnected, TransportInvalid);
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp
index 40e3908..622edba 100644
--- a/src/mqtt/qmqttconnection.cpp
+++ b/src/mqtt/qmqttconnection.cpp
@@ -471,6 +471,21 @@ quint16 QMqttConnection::unusedPacketIdentifier() const
return packetIdentifierCounter;
}
+void QMqttConnection::cleanSubscriptions()
+{
+ for (auto item : m_pendingSubscriptionAck)
+ item->setState(QMqttSubscription::Unsubscribed);
+ m_pendingSubscriptionAck.clear();
+
+ for (auto item : m_pendingUnsubscriptions)
+ item->setState(QMqttSubscription::Unsubscribed);
+ m_pendingUnsubscriptions.clear();
+
+ for (auto item : m_activeSubscriptions)
+ item->setState(QMqttSubscription::Unsubscribed);
+ m_activeSubscriptions.clear();
+}
+
void QMqttConnection::transportConnectionEstablished()
{
if (m_internalState != BrokerConnecting) {
@@ -540,6 +555,10 @@ void QMqttConnection::finalize_connack()
emit m_clientPrivate->m_client->brokerSessionRestored();
if (m_clientPrivate->m_cleanSession)
qWarning("Connected with a clean session, ack contains session present");
+ } else {
+ // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side
+ // regardless whether cleanSession is false
+ cleanSubscriptions();
}
quint8 connectResultValue;
diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h
index 128573c..cf9c912 100644
--- a/src/mqtt/qmqttconnection_p.h
+++ b/src/mqtt/qmqttconnection_p.h
@@ -92,6 +92,8 @@ public:
inline quint16 unusedPacketIdentifier() const;
inline InternalConnectionState internalState() const { return m_internalState; }
+ void cleanSubscriptions();
+
public Q_SLOTS:
void transportConnectionEstablished();
void transportConnectionClosed();
diff --git a/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp b/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp
index 7f51dd5..fda8ef6 100644
--- a/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp
+++ b/tests/auto/qmqttsubscription/tst_qmqttsubscription.cpp
@@ -47,6 +47,7 @@ private Q_SLOTS:
void getSetCheck();
void wildCards_data();
void wildCards();
+ void reconnect();
private:
QProcess m_brokerProcess;
QString m_testBroker;
@@ -141,6 +142,81 @@ void Tst_QMqttSubscription::wildCards()
QTRY_VERIFY2(publisher.state() == QMqttClient::Disconnected, "Could not disconnect.");
}
+void Tst_QMqttSubscription::reconnect()
+{
+ // QTBUG-64042
+ // - Connect with clean session
+ QMqttClient client;
+
+ client.setHostname(m_testBroker);
+ client.setPort(m_port);
+ client.setCleanSession(true);
+ client.connectToHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker.");
+
+ // - Subscribe to topic A
+ const QString subscription("topics/resub");
+ auto sub = client.subscribe(subscription, 1);
+ QTRY_VERIFY2(sub->state() == QMqttSubscription::Subscribed, "Could not subscribe to topic.");
+
+ // - Loose connection / connection drop
+ QAbstractSocket *transport = qobject_cast<QAbstractSocket *>(client.transport());
+ QVERIFY2(transport, "Transport has to be QAbstractSocket-based.");
+ transport->disconnectFromHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "State not correctly switched.");
+
+ // - Reconnect (keeping cleansession)
+ client.connectToHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker.");
+ // old subs should get updated / invalidated
+ QCOMPARE(sub->state(), QMqttSubscription::Unsubscribed);
+
+ // - Resubscribe
+ auto reSub = client.subscribe(subscription, 1);
+ QTRY_VERIFY2(reSub->state() == QMqttSubscription::Subscribed, "Could not re-subscribe to topic.");
+ QSignalSpy receivalSpy(reSub, SIGNAL(messageReceived(QMqttMessage)));
+
+ QSignalSpy pubSpy(&client, SIGNAL(messageSent(qint32)));
+ client.publish(subscription, "Sending after reconnect 1", 1);
+ QTRY_VERIFY2(pubSpy.size() == 1, "Could not publish message.");
+
+ QTRY_VERIFY2(receivalSpy.size() == 1, "Did not receive message on re-subscribe.");
+
+ // - Loose connection / connection drop
+ transport = qobject_cast<QAbstractSocket *>(client.transport());
+ QVERIFY2(transport, "Transport has to be QAbstractSocket-based.");
+ transport->disconnectFromHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "State not correctly switched.");
+
+ // - Reconnect (no cleansession)
+ QSignalSpy restoredSpy(&client, SIGNAL(brokerSessionRestored()));
+
+ client.setCleanSession(false);
+ client.connectToHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker.");
+ // Could not identify a broker doing this. The specs states:
+ // [MQTT-4.1.0-1] The Client and Server MUST store Session state for the entire
+ // duration of the Session.
+ // [MQTT-4.1.0-2] A Session MUST last at least as long it has an active Network Connection.
+ // All testbrokers delete the session at transport disconnect, regardless of DISCONNECT been
+ // send before or not.
+ if (restoredSpy.count() > 0) {
+ QCOMPARE(reSub->state(), QMqttSubscription::Subscribed);
+ pubSpy.clear();
+ receivalSpy.clear();
+ client.publish(subscription, "Sending after reconnect 2", 1);
+ QTRY_VERIFY2(pubSpy.size() == 1, "Could not publish message.");
+ QTRY_VERIFY2(receivalSpy.size() == 1, "Did not receive message on re-subscribe.");
+ } else {
+ // No need to test this
+ qDebug() << "Test broker does not support long-livety sessions.";
+ }
+ // - Old subscription is still active
+
+ client.disconnectFromHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Disconnected, "Could not disconnect.");
+}
+
QTEST_MAIN(Tst_QMqttSubscription)
#include "tst_qmqttsubscription.moc"