summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorQt Forward Merge Bot <qt_forward_merge_bot@qt-project.org>2019-07-09 03:00:18 +0200
committerEdward Welbourne <edward.welbourne@qt.io>2019-07-11 14:45:03 +0200
commit15853d77b19c0439e424947d81b52fafa32d45c8 (patch)
tree2e5d593506c624d816a88dc583e4cee4e5c4f471
parent2c24a9ec62741cbfc6a9fe0f5c05ff370a9e9414 (diff)
parent9a2e77fc1fe85ccfcb76dcae386a85ad96e66769 (diff)
Merge "Merge remote-tracking branch 'origin/5.13' into dev"
-rw-r--r--src/mqtt/qmqttclient.cpp4
-rw-r--r--src/mqtt/qmqttconnection.cpp54
-rw-r--r--src/mqtt/qmqttconnection_p.h19
-rw-r--r--src/mqtt/qmqttconnectionproperties.cpp28
-rw-r--r--src/mqtt/qmqttconnectionproperties.h2
-rw-r--r--tests/auto/qmqttclient/tst_qmqttclient.cpp15
6 files changed, 82 insertions, 40 deletions
diff --git a/src/mqtt/qmqttclient.cpp b/src/mqtt/qmqttclient.cpp
index 11d3af3..bd5bb12 100644
--- a/src/mqtt/qmqttclient.cpp
+++ b/src/mqtt/qmqttclient.cpp
@@ -90,9 +90,7 @@ Q_LOGGING_CATEGORY(lcMqttClient, "qt.mqtt.client")
frequent updates to propagate it can still be reached. The interval between
those updates is specified by this property.
- The interval is specified in milliseconds. However, most brokers are not
- capable of using such a high granularity and will fall back to an interval
- specified in seconds.
+ The interval is specified in seconds.
If the broker does not respond within a grace period the connection will be
closed.
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp
index bb27f66..2f9fb78 100644
--- a/src/mqtt/qmqttconnection.cpp
+++ b/src/mqtt/qmqttconnection.cpp
@@ -94,8 +94,6 @@ QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize)
QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent)
{
- m_pingTimer.setSingleShot(false);
- m_pingTimer.connect(&m_pingTimer, &QTimer::timeout, this, &QMqttConnection::sendControlPingRequest);
}
QMqttConnection::~QMqttConnection()
@@ -107,6 +105,16 @@ QMqttConnection::~QMqttConnection()
delete m_transport;
}
+void QMqttConnection::timerEvent(QTimerEvent *event)
+{
+ if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) {
+ sendControlPingRequest();
+ return;
+ }
+
+ QObject::timerEvent(event);
+}
+
void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport)
{
qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport;
@@ -255,7 +263,7 @@ bool QMqttConnection::sendControlConnect()
if (m_clientPrivate->m_cleanSession)
flags |= 1 << 1;
- if (!m_clientPrivate->m_willMessage.isEmpty()) {
+ if (!m_clientPrivate->m_willTopic.isEmpty()) {
flags |= 1 << 2;
if (m_clientPrivate->m_willQoS > 2) {
qCDebug(lcMqttConnection) << "Invalid Will QoS specified.";
@@ -292,7 +300,7 @@ bool QMqttConnection::sendControlConnect()
packet.append(char(0));
}
- if (!m_clientPrivate->m_willMessage.isEmpty()) {
+ if (!m_clientPrivate->m_willTopic.isEmpty()) {
if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
packet.appendRaw(writeLastWillProperties());
@@ -468,15 +476,22 @@ QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter
qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos;
if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
- if (!topic.sharedSubscriptionName().isEmpty()) {
+ const QString sharedSubscriptionName = topic.sharedSubscriptionName();
+ if (!sharedSubscriptionName.isEmpty()) {
const QMqttTopicFilter filter(topic.filter().section(QLatin1Char('/'), 2));
- if (m_activeSubscriptions.contains(filter)
- && m_activeSubscriptions.value(filter)->sharedSubscriptionName() == topic.sharedSubscriptionName())
- return m_activeSubscriptions[filter];
- } else if (m_activeSubscriptions.contains(topic) && !m_activeSubscriptions.value(topic)->isSharedSubscription())
- return m_activeSubscriptions[topic];
- } else if (m_activeSubscriptions.contains(topic))
- return m_activeSubscriptions[topic];
+ auto it = m_activeSubscriptions.constFind(filter);
+ if (it != m_activeSubscriptions.cend() && (*it)->sharedSubscriptionName() == sharedSubscriptionName)
+ return *it;
+ } else {
+ auto it = m_activeSubscriptions.constFind(topic);
+ if (it != m_activeSubscriptions.cend() && !(*it)->isSharedSubscription())
+ return *it;
+ }
+ } else {
+ auto it = m_activeSubscriptions.constFind(topic);
+ if (it != m_activeSubscriptions.cend())
+ return *it;
+ }
// has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead?
// MQTT-3.8.1-1
@@ -1457,20 +1472,19 @@ void QMqttConnection::finalize_connack()
m_internalState = BrokerConnected;
m_clientPrivate->setStateAndError(QMqttClient::Connected);
- m_pingTimer.setInterval(m_clientPrivate->m_keepAlive * 1000);
- m_pingTimer.start();
+ m_pingTimer.start(m_clientPrivate->m_keepAlive * 1000, this);
}
void QMqttConnection::finalize_suback()
{
const quint16 id = readBufferTyped<quint16>(&m_missingData);
- if (!m_pendingSubscriptionAck.contains(id)) {
+
+ auto sub = m_pendingSubscriptionAck.take(id);
+ if (Q_UNLIKELY(sub == nullptr)) {
qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request.";
return;
}
- auto sub = m_pendingSubscriptionAck.take(id);
-
if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
readSubscriptionProperties(sub);
@@ -1498,11 +1512,13 @@ void QMqttConnection::finalize_unsuback()
{
const quint16 id = readBufferTyped<quint16>(&m_missingData);
qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id;
- if (!m_pendingUnsubscriptions.contains(id)) {
+
+ auto sub = m_pendingUnsubscriptions.take(id);
+ if (Q_UNLIKELY(sub == nullptr)) {
qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request.";
return;
}
- auto sub = m_pendingUnsubscriptions.take(id);
+
sub->setState(QMqttSubscription::Unsubscribed);
m_activeSubscriptions.remove(sub->topic());
}
diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h
index 626addc..91c6b77 100644
--- a/src/mqtt/qmqttconnection_p.h
+++ b/src/mqtt/qmqttconnection_p.h
@@ -45,11 +45,11 @@
#include "qmqttcontrolpacket_p.h"
#include "qmqttmessage.h"
#include "qmqttsubscription.h"
+#include <QtCore/QBasicTimer>
#include <QtCore/QBuffer>
-#include <QtCore/QMap>
+#include <QtCore/QHash>
#include <QtCore/QObject>
#include <QtCore/QSharedPointer>
-#include <QtCore/QTimer>
#include <QtCore/QtEndian>
QT_BEGIN_NAMESPACE
@@ -102,6 +102,9 @@ public Q_SLOTS:
void transportReadReady();
void transportError(QAbstractSocket::SocketError e);
+protected:
+ void timerEvent(QTimerEvent *event) override;
+
public:
QIODevice *m_transport{nullptr};
QMqttClient::TransportType m_transportType{QMqttClient::IODevice};
@@ -150,13 +153,13 @@ private:
QMqttControlPacket::PacketType m_currentPacket{QMqttControlPacket::UNKNOWN};
bool writePacketToTransport(const QMqttControlPacket &p);
- QMap<quint16, QMqttSubscription *> m_pendingSubscriptionAck;
- QMap<quint16, QMqttSubscription *> m_pendingUnsubscriptions;
- QMap<QMqttTopicFilter, QMqttSubscription *> m_activeSubscriptions;
- QMap<quint16, QSharedPointer<QMqttControlPacket>> m_pendingMessages;
- QMap<quint16, QSharedPointer<QMqttControlPacket>> m_pendingReleaseMessages;
+ QHash<quint16, QMqttSubscription *> m_pendingSubscriptionAck;
+ QHash<quint16, QMqttSubscription *> m_pendingUnsubscriptions;
+ QHash<QMqttTopicFilter, QMqttSubscription *> m_activeSubscriptions;
+ QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingMessages;
+ QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingReleaseMessages;
InternalConnectionState m_internalState{BrokerDisconnected};
- QTimer m_pingTimer;
+ QBasicTimer m_pingTimer;
int m_pingTimeout{0};
QVector<QMqttTopicName> m_receiveAliases;
diff --git a/src/mqtt/qmqttconnectionproperties.cpp b/src/mqtt/qmqttconnectionproperties.cpp
index aae5b58..815588d 100644
--- a/src/mqtt/qmqttconnectionproperties.cpp
+++ b/src/mqtt/qmqttconnectionproperties.cpp
@@ -311,17 +311,19 @@ void QMqttConnectionProperties::setSessionExpiryInterval(quint32 expiry)
}
/*!
- Sets the maximum QoS level the client is allowed to receive to \a qos.
+ Sets the maximum amount of QoS 1 and QoS 2 publications
+ that the client is willing to process concurrently for this session
+ to \a maximumReceive.
A maximum receive value of 0 is not allowed.
*/
-void QMqttConnectionProperties::setMaximumReceive(quint16 qos)
+void QMqttConnectionProperties::setMaximumReceive(quint16 maximumReceive)
{
- if (qos == 0) {
+ if (maximumReceive == 0) {
qCDebug(lcMqttConnection) << "Maximum Receive is not allowed to be 0.";
return;
}
- data->maximumReceive = qos;
+ data->maximumReceive = maximumReceive;
}
/*!
@@ -430,7 +432,10 @@ quint32 QMqttConnectionProperties::sessionExpiryInterval() const
}
/*!
- Returns the maximum QoS level the client can receive.
+ Returns the maximum amount of QoS 1 and QoS 2 publications
+ that the client (when obtained from \l QMqttClient::connectionProperties())
+ or the server (when obtained from \l QMqttClient::serverConnectionProperties())
+ is willing to process concurrently for this session.
*/
quint16 QMqttConnectionProperties::maximumReceive() const
{
@@ -542,8 +547,17 @@ bool QMqttServerConnectionProperties::isValid() const
}
/*!
- Returns the maximum QoS level the server is able to understand.
- The default value is 2.
+ Returns the maximum QoS level the server supports for publishing messages.
+ Publishing messages with QoS level exceeding the maximum QoS level reported by the server
+ is a protocol violation.
+
+ If the client does not need to support QoS 1 or QoS 2, it should restrict the maximum QoS level
+ in any subscription it does to a value it can support; the server would then publish messages
+ with the maximum of supported and restricted QoS levels.
+
+ The default value is \c 2.
+
+ \sa QMqttClient::publish(), QMqttClient::subscribe()
*/
quint8 QMqttServerConnectionProperties::maximumQoS() const
{
diff --git a/src/mqtt/qmqttconnectionproperties.h b/src/mqtt/qmqttconnectionproperties.h
index fb41853..fd7301a 100644
--- a/src/mqtt/qmqttconnectionproperties.h
+++ b/src/mqtt/qmqttconnectionproperties.h
@@ -89,7 +89,7 @@ public:
QByteArray authenticationData() const;
void setSessionExpiryInterval(quint32 expiry);
- void setMaximumReceive(quint16 qos);
+ void setMaximumReceive(quint16 maximumReceive);
void setMaximumPacketSize(quint32 packetSize);
void setMaximumTopicAlias(quint16 alias);
void setRequestResponseInformation(bool response);
diff --git a/tests/auto/qmqttclient/tst_qmqttclient.cpp b/tests/auto/qmqttclient/tst_qmqttclient.cpp
index c4da40d..21bd512 100644
--- a/tests/auto/qmqttclient/tst_qmqttclient.cpp
+++ b/tests/auto/qmqttclient/tst_qmqttclient.cpp
@@ -263,14 +263,25 @@ void Tst_QMqttClient::retainMessage()
publisher.disconnect();
}
-DefaultVersionTestData(Tst_QMqttClient::willMessage_data)
+void Tst_QMqttClient::willMessage_data()
+{
+ QTest::addColumn<QMqttClient::ProtocolVersion>("mqttVersion");
+ QTest::addColumn<QByteArray>("willMessage");
+
+ QList<QMqttClient::ProtocolVersion> versions{QMqttClient::MQTT_3_1_1, QMqttClient::MQTT_5_0};
+
+ for (int i = 0; i < 2; ++i) {
+ QTest::newRow(qPrintable(QString::number(versions[i]) + ":simple")) << versions[i] << QByteArray("The client connection is gone.");
+ QTest::newRow(qPrintable(QString::number(versions[i]) + ":empty")) << versions[i] << QByteArray();
+ }
+}
void Tst_QMqttClient::willMessage()
{
QFETCH(QMqttClient::ProtocolVersion, mqttVersion);
+ QFETCH(QByteArray, willMessage);
const QString willTopic = QLatin1String("Qt/QMqttClient/will/topic");
- const QByteArray willMessage("The client died....");
// Client A connects
VersionClient(mqttVersion, client1);