diff options
author | Qt Forward Merge Bot <qt_forward_merge_bot@qt-project.org> | 2019-07-09 03:00:18 +0200 |
---|---|---|
committer | Edward Welbourne <edward.welbourne@qt.io> | 2019-07-11 14:45:03 +0200 |
commit | 15853d77b19c0439e424947d81b52fafa32d45c8 (patch) | |
tree | 2e5d593506c624d816a88dc583e4cee4e5c4f471 | |
parent | 2c24a9ec62741cbfc6a9fe0f5c05ff370a9e9414 (diff) | |
parent | 9a2e77fc1fe85ccfcb76dcae386a85ad96e66769 (diff) |
Merge "Merge remote-tracking branch 'origin/5.13' into dev"
-rw-r--r-- | src/mqtt/qmqttclient.cpp | 4 | ||||
-rw-r--r-- | src/mqtt/qmqttconnection.cpp | 54 | ||||
-rw-r--r-- | src/mqtt/qmqttconnection_p.h | 19 | ||||
-rw-r--r-- | src/mqtt/qmqttconnectionproperties.cpp | 28 | ||||
-rw-r--r-- | src/mqtt/qmqttconnectionproperties.h | 2 | ||||
-rw-r--r-- | tests/auto/qmqttclient/tst_qmqttclient.cpp | 15 |
6 files changed, 82 insertions, 40 deletions
diff --git a/src/mqtt/qmqttclient.cpp b/src/mqtt/qmqttclient.cpp index 11d3af3..bd5bb12 100644 --- a/src/mqtt/qmqttclient.cpp +++ b/src/mqtt/qmqttclient.cpp @@ -90,9 +90,7 @@ Q_LOGGING_CATEGORY(lcMqttClient, "qt.mqtt.client") frequent updates to propagate it can still be reached. The interval between those updates is specified by this property. - The interval is specified in milliseconds. However, most brokers are not - capable of using such a high granularity and will fall back to an interval - specified in seconds. + The interval is specified in seconds. If the broker does not respond within a grace period the connection will be closed. diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp index bb27f66..2f9fb78 100644 --- a/src/mqtt/qmqttconnection.cpp +++ b/src/mqtt/qmqttconnection.cpp @@ -94,8 +94,6 @@ QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize) QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent) { - m_pingTimer.setSingleShot(false); - m_pingTimer.connect(&m_pingTimer, &QTimer::timeout, this, &QMqttConnection::sendControlPingRequest); } QMqttConnection::~QMqttConnection() @@ -107,6 +105,16 @@ QMqttConnection::~QMqttConnection() delete m_transport; } +void QMqttConnection::timerEvent(QTimerEvent *event) +{ + if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) { + sendControlPingRequest(); + return; + } + + QObject::timerEvent(event); +} + void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport) { qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport; @@ -255,7 +263,7 @@ bool QMqttConnection::sendControlConnect() if (m_clientPrivate->m_cleanSession) flags |= 1 << 1; - if (!m_clientPrivate->m_willMessage.isEmpty()) { + if (!m_clientPrivate->m_willTopic.isEmpty()) { flags |= 1 << 2; if (m_clientPrivate->m_willQoS > 2) { qCDebug(lcMqttConnection) << "Invalid Will QoS specified."; @@ -292,7 +300,7 @@ bool QMqttConnection::sendControlConnect() packet.append(char(0)); } - if (!m_clientPrivate->m_willMessage.isEmpty()) { + if (!m_clientPrivate->m_willTopic.isEmpty()) { if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) packet.appendRaw(writeLastWillProperties()); @@ -468,15 +476,22 @@ QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos; if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { - if (!topic.sharedSubscriptionName().isEmpty()) { + const QString sharedSubscriptionName = topic.sharedSubscriptionName(); + if (!sharedSubscriptionName.isEmpty()) { const QMqttTopicFilter filter(topic.filter().section(QLatin1Char('/'), 2)); - if (m_activeSubscriptions.contains(filter) - && m_activeSubscriptions.value(filter)->sharedSubscriptionName() == topic.sharedSubscriptionName()) - return m_activeSubscriptions[filter]; - } else if (m_activeSubscriptions.contains(topic) && !m_activeSubscriptions.value(topic)->isSharedSubscription()) - return m_activeSubscriptions[topic]; - } else if (m_activeSubscriptions.contains(topic)) - return m_activeSubscriptions[topic]; + auto it = m_activeSubscriptions.constFind(filter); + if (it != m_activeSubscriptions.cend() && (*it)->sharedSubscriptionName() == sharedSubscriptionName) + return *it; + } else { + auto it = m_activeSubscriptions.constFind(topic); + if (it != m_activeSubscriptions.cend() && !(*it)->isSharedSubscription()) + return *it; + } + } else { + auto it = m_activeSubscriptions.constFind(topic); + if (it != m_activeSubscriptions.cend()) + return *it; + } // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead? // MQTT-3.8.1-1 @@ -1457,20 +1472,19 @@ void QMqttConnection::finalize_connack() m_internalState = BrokerConnected; m_clientPrivate->setStateAndError(QMqttClient::Connected); - m_pingTimer.setInterval(m_clientPrivate->m_keepAlive * 1000); - m_pingTimer.start(); + m_pingTimer.start(m_clientPrivate->m_keepAlive * 1000, this); } void QMqttConnection::finalize_suback() { const quint16 id = readBufferTyped<quint16>(&m_missingData); - if (!m_pendingSubscriptionAck.contains(id)) { + + auto sub = m_pendingSubscriptionAck.take(id); + if (Q_UNLIKELY(sub == nullptr)) { qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request."; return; } - auto sub = m_pendingSubscriptionAck.take(id); - if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) readSubscriptionProperties(sub); @@ -1498,11 +1512,13 @@ void QMqttConnection::finalize_unsuback() { const quint16 id = readBufferTyped<quint16>(&m_missingData); qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id; - if (!m_pendingUnsubscriptions.contains(id)) { + + auto sub = m_pendingUnsubscriptions.take(id); + if (Q_UNLIKELY(sub == nullptr)) { qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request."; return; } - auto sub = m_pendingUnsubscriptions.take(id); + sub->setState(QMqttSubscription::Unsubscribed); m_activeSubscriptions.remove(sub->topic()); } diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h index 626addc..91c6b77 100644 --- a/src/mqtt/qmqttconnection_p.h +++ b/src/mqtt/qmqttconnection_p.h @@ -45,11 +45,11 @@ #include "qmqttcontrolpacket_p.h" #include "qmqttmessage.h" #include "qmqttsubscription.h" +#include <QtCore/QBasicTimer> #include <QtCore/QBuffer> -#include <QtCore/QMap> +#include <QtCore/QHash> #include <QtCore/QObject> #include <QtCore/QSharedPointer> -#include <QtCore/QTimer> #include <QtCore/QtEndian> QT_BEGIN_NAMESPACE @@ -102,6 +102,9 @@ public Q_SLOTS: void transportReadReady(); void transportError(QAbstractSocket::SocketError e); +protected: + void timerEvent(QTimerEvent *event) override; + public: QIODevice *m_transport{nullptr}; QMqttClient::TransportType m_transportType{QMqttClient::IODevice}; @@ -150,13 +153,13 @@ private: QMqttControlPacket::PacketType m_currentPacket{QMqttControlPacket::UNKNOWN}; bool writePacketToTransport(const QMqttControlPacket &p); - QMap<quint16, QMqttSubscription *> m_pendingSubscriptionAck; - QMap<quint16, QMqttSubscription *> m_pendingUnsubscriptions; - QMap<QMqttTopicFilter, QMqttSubscription *> m_activeSubscriptions; - QMap<quint16, QSharedPointer<QMqttControlPacket>> m_pendingMessages; - QMap<quint16, QSharedPointer<QMqttControlPacket>> m_pendingReleaseMessages; + QHash<quint16, QMqttSubscription *> m_pendingSubscriptionAck; + QHash<quint16, QMqttSubscription *> m_pendingUnsubscriptions; + QHash<QMqttTopicFilter, QMqttSubscription *> m_activeSubscriptions; + QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingMessages; + QHash<quint16, QSharedPointer<QMqttControlPacket>> m_pendingReleaseMessages; InternalConnectionState m_internalState{BrokerDisconnected}; - QTimer m_pingTimer; + QBasicTimer m_pingTimer; int m_pingTimeout{0}; QVector<QMqttTopicName> m_receiveAliases; diff --git a/src/mqtt/qmqttconnectionproperties.cpp b/src/mqtt/qmqttconnectionproperties.cpp index aae5b58..815588d 100644 --- a/src/mqtt/qmqttconnectionproperties.cpp +++ b/src/mqtt/qmqttconnectionproperties.cpp @@ -311,17 +311,19 @@ void QMqttConnectionProperties::setSessionExpiryInterval(quint32 expiry) } /*! - Sets the maximum QoS level the client is allowed to receive to \a qos. + Sets the maximum amount of QoS 1 and QoS 2 publications + that the client is willing to process concurrently for this session + to \a maximumReceive. A maximum receive value of 0 is not allowed. */ -void QMqttConnectionProperties::setMaximumReceive(quint16 qos) +void QMqttConnectionProperties::setMaximumReceive(quint16 maximumReceive) { - if (qos == 0) { + if (maximumReceive == 0) { qCDebug(lcMqttConnection) << "Maximum Receive is not allowed to be 0."; return; } - data->maximumReceive = qos; + data->maximumReceive = maximumReceive; } /*! @@ -430,7 +432,10 @@ quint32 QMqttConnectionProperties::sessionExpiryInterval() const } /*! - Returns the maximum QoS level the client can receive. + Returns the maximum amount of QoS 1 and QoS 2 publications + that the client (when obtained from \l QMqttClient::connectionProperties()) + or the server (when obtained from \l QMqttClient::serverConnectionProperties()) + is willing to process concurrently for this session. */ quint16 QMqttConnectionProperties::maximumReceive() const { @@ -542,8 +547,17 @@ bool QMqttServerConnectionProperties::isValid() const } /*! - Returns the maximum QoS level the server is able to understand. - The default value is 2. + Returns the maximum QoS level the server supports for publishing messages. + Publishing messages with QoS level exceeding the maximum QoS level reported by the server + is a protocol violation. + + If the client does not need to support QoS 1 or QoS 2, it should restrict the maximum QoS level + in any subscription it does to a value it can support; the server would then publish messages + with the maximum of supported and restricted QoS levels. + + The default value is \c 2. + + \sa QMqttClient::publish(), QMqttClient::subscribe() */ quint8 QMqttServerConnectionProperties::maximumQoS() const { diff --git a/src/mqtt/qmqttconnectionproperties.h b/src/mqtt/qmqttconnectionproperties.h index fb41853..fd7301a 100644 --- a/src/mqtt/qmqttconnectionproperties.h +++ b/src/mqtt/qmqttconnectionproperties.h @@ -89,7 +89,7 @@ public: QByteArray authenticationData() const; void setSessionExpiryInterval(quint32 expiry); - void setMaximumReceive(quint16 qos); + void setMaximumReceive(quint16 maximumReceive); void setMaximumPacketSize(quint32 packetSize); void setMaximumTopicAlias(quint16 alias); void setRequestResponseInformation(bool response); diff --git a/tests/auto/qmqttclient/tst_qmqttclient.cpp b/tests/auto/qmqttclient/tst_qmqttclient.cpp index c4da40d..21bd512 100644 --- a/tests/auto/qmqttclient/tst_qmqttclient.cpp +++ b/tests/auto/qmqttclient/tst_qmqttclient.cpp @@ -263,14 +263,25 @@ void Tst_QMqttClient::retainMessage() publisher.disconnect(); } -DefaultVersionTestData(Tst_QMqttClient::willMessage_data) +void Tst_QMqttClient::willMessage_data() +{ + QTest::addColumn<QMqttClient::ProtocolVersion>("mqttVersion"); + QTest::addColumn<QByteArray>("willMessage"); + + QList<QMqttClient::ProtocolVersion> versions{QMqttClient::MQTT_3_1_1, QMqttClient::MQTT_5_0}; + + for (int i = 0; i < 2; ++i) { + QTest::newRow(qPrintable(QString::number(versions[i]) + ":simple")) << versions[i] << QByteArray("The client connection is gone."); + QTest::newRow(qPrintable(QString::number(versions[i]) + ":empty")) << versions[i] << QByteArray(); + } +} void Tst_QMqttClient::willMessage() { QFETCH(QMqttClient::ProtocolVersion, mqttVersion); + QFETCH(QByteArray, willMessage); const QString willTopic = QLatin1String("Qt/QMqttClient/will/topic"); - const QByteArray willMessage("The client died...."); // Client A connects VersionClient(mqttVersion, client1); |