From 5ef7e409fb3b4a5f44a424b5f1998437478ccccc Mon Sep 17 00:00:00 2001 From: Maurice Kalinowski Date: Tue, 31 Jul 2018 12:24:08 +0200 Subject: Add support for message status updates A user might be interested in knowing the exact state of a message while publishing or when a message is received. Task-number: QTPM-1453 Task-number: QTBUG-66599 Change-Id: Ibf977f76ba6078b5b525ba00b6988d4b69960176 Reviewed-by: hjk --- src/mqtt/mqtt.pro | 1 + src/mqtt/qmqttclient.h | 1 + src/mqtt/qmqttconnection.cpp | 61 ++++++++++++++---- src/mqtt/qmqttconnection_p.h | 1 + src/mqtt/qmqttglobal.h | 22 +++++++ src/mqtt/qmqttpublishproperties.cpp | 46 ++++++++----- src/mqtt/qmqttpublishproperties.h | 19 ++++++ src/mqtt/qmqttpublishproperties_p.h | 75 ++++++++++++++++++++++ tests/auto/qmqttclient/tst_qmqttclient.cpp | 100 +++++++++++++++++++++++++++++ 9 files changed, 299 insertions(+), 27 deletions(-) create mode 100644 src/mqtt/qmqttpublishproperties_p.h diff --git a/src/mqtt/mqtt.pro b/src/mqtt/mqtt.pro index adbfc54..aa6c99c 100644 --- a/src/mqtt/mqtt.pro +++ b/src/mqtt/mqtt.pro @@ -26,6 +26,7 @@ PRIVATE_HEADERS += \ qmqttconnectionproperties_p.h \ qmqttcontrolpacket_p.h \ qmqttmessage_p.h \ + qmqttpublishproperties_p.h \ qmqttsubscription_p.h SOURCES += \ diff --git a/src/mqtt/qmqttclient.h b/src/mqtt/qmqttclient.h index d462e35..f818c3c 100644 --- a/src/mqtt/qmqttclient.h +++ b/src/mqtt/qmqttclient.h @@ -155,6 +155,7 @@ Q_SIGNALS: void connected(); void disconnected(); void messageReceived(const QByteArray &message, const QMqttTopicName &topic = QMqttTopicName()); + void messageStatusChanged(qint32 id, QMqtt::MessageStatus s, const QMqttMessageStatusProperties &properties); void messageSent(qint32 id); void pingResponseReceived(); void brokerSessionRestored(); diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp index 69e153b..d86e8ca 100644 --- a/src/mqtt/qmqttconnection.cpp +++ b/src/mqtt/qmqttconnection.cpp @@ -31,6 +31,7 @@ #include "qmqttconnectionproperties_p.h" #include "qmqttcontrolpacket_p.h" #include "qmqttmessage_p.h" +#include "qmqttpublishproperties_p.h" #include "qmqttsubscription_p.h" #include "qmqttclient_p.h" @@ -892,6 +893,32 @@ void QMqttConnection::readConnackProperties() m_clientPrivate->m_serverConnectionProperties = serverProperties; } +void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties) +{ + qint64 propertyLength = readVariableByteInteger(); + + m_missingData -= propertyLength; + while (propertyLength > 0) { + const quint8 propertyId = readBufferTyped(&propertyLength); + switch (propertyId) { + case 0x1f: { // 3.4.2.2.2 Reason String + const QString content = readBufferTyped(&propertyLength); + properties.data->reasonString = content; + break; + } + case 0x26: { // 3.4.2.2.3 User Properites + const QString propertyName = readBufferTyped(&propertyLength); + const QString propertyValue = readBufferTyped(&propertyLength); + properties.data->userProperties.append(QMqttStringPair(propertyName, propertyValue)); + break; + } + default: + qCDebug(lcMqttConnection) << "Unknown subscription property received."; + break; + } + } +} + void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties) { qint32 propertySize = 0; @@ -1537,6 +1564,11 @@ void QMqttConnection::finalize_publish() m_currentPublish.dup, m_currentPublish.retain); qmsg.d->m_publishProperties = publishProperties; + if (id != 0) { + QMqttMessageStatusProperties statusProp; + statusProp.data->userProperties = publishProperties.userProperties(); + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Published, statusProp); + } for (auto sub = m_activeSubscriptions.constBegin(); sub != m_activeSubscriptions.constEnd(); sub++) { if (sub.key().match(topic)) @@ -1554,27 +1586,20 @@ void QMqttConnection::finalize_pubAckRecComp() qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/COMP"; const quint16 id = readBufferTyped(&m_missingData); + QMqttMessageStatusProperties properties; if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { // Reason Code (1byte) const quint8 reasonCode = readBufferTyped(&m_missingData); - Q_UNUSED(reasonCode); // ### TODO: Do something with it, currently silences compiler - if (m_missingData > 0) { - // Property Length (Variable Int) - qint32 byteCount = 0; - const qint32 propertyLength = readVariableByteInteger(&byteCount); - m_missingData -= byteCount; - // ### TODO: Publish ACK/REC/COMP property handling - if (propertyLength > 0) { - readBuffer(quint64(propertyLength)); - m_missingData -= propertyLength; - } - } + properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); + if (m_missingData > 0) + readMessageStatusProperties(properties); } if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) { qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id; auto pendingRelease = m_pendingReleaseMessages.take(id); if (!pendingRelease) qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message."; + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Completed, properties); emit m_clientPrivate->m_client->messageSent(id); return; } @@ -1587,9 +1612,11 @@ void QMqttConnection::finalize_pubAckRecComp() if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id; m_pendingReleaseMessages.insert(id, pendingMsg); + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Received, properties); sendControlPublishRelease(id); } else { qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id; + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Acknowledged, properties); emit m_clientPrivate->m_client->messageSent(id); } } @@ -1600,6 +1627,16 @@ void QMqttConnection::finalize_pubrel() qCDebug(lcMqttConnectionVerbose) << "Finalize PUBREL:" << id; + QMqttMessageStatusProperties properties; + if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { + const quint8 reasonCode = readBufferTyped(&m_missingData); + properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); + if (m_missingData > 0) + readMessageStatusProperties(properties); + } + + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Released, properties); + // ### TODO: send to our app now or not??? // See standard Figure 4.3 Method A or B ??? sendControlPublishComp(id); diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h index e2dafa1..f703599 100644 --- a/src/mqtt/qmqttconnection_p.h +++ b/src/mqtt/qmqttconnection_p.h @@ -122,6 +122,7 @@ private: qint32 readVariableByteInteger(qint32 *byteCount = nullptr); void readAuthProperties(QMqttAuthenticationProperties &properties); void readConnackProperties(); + void readMessageStatusProperties(QMqttMessageStatusProperties &properties); void readPublishProperties(QMqttPublishProperties &properties); void readSubscriptionProperties(QMqttSubscription *sub); QByteArray writeConnectProperties(); diff --git a/src/mqtt/qmqttglobal.h b/src/mqtt/qmqttglobal.h index 1524947..dfe2971 100644 --- a/src/mqtt/qmqttglobal.h +++ b/src/mqtt/qmqttglobal.h @@ -50,6 +50,28 @@ enum class PayloadFormatIndicator : quint8 { Unspecified = 0, UTF8Encoded = 1 }; + +enum class MessageStatus : quint8 { + Unknown = 0, + Published, + Acknowledged, + Received, + Released, + Completed +}; + +enum class ReasonCode : quint16 { + Success = 0, + NoMatchingSubscriber = 0x10, + UnspecifiedError = 0x80, + ImplementationSpecificError = 0x83, + NotAuthorized = 0x87, + InvalidTopicName = 0x90, + MessageIdInUse = 0x91, + MessageIdNotFound = 0x92, + QuotaExceeded = 0x97, + InvalidPayloadFormat = 0x99 +}; } QT_END_NAMESPACE diff --git a/src/mqtt/qmqttpublishproperties.cpp b/src/mqtt/qmqttpublishproperties.cpp index c04cadd..77e2118 100644 --- a/src/mqtt/qmqttpublishproperties.cpp +++ b/src/mqtt/qmqttpublishproperties.cpp @@ -28,6 +28,7 @@ ******************************************************************************/ #include "qmqttpublishproperties.h" +#include "qmqttpublishproperties_p.h" #include "qmqtttype.h" #include @@ -84,23 +85,8 @@ Q_DECLARE_LOGGING_CATEGORY(lcMqttClient) A description of the content of the message. */ -class QMqttPublishPropertiesData : public QSharedData -{ -public: - QString responseTopic; - QString contentType; - QByteArray correlationData; - quint32 messageExpiry{0}; - QList subscriptionIdentifier; - QMqttPublishProperties::PublishPropertyDetails details{QMqttPublishProperties::None}; - quint16 topicAlias{0}; - QMqtt::PayloadFormatIndicator payloadIndicator{QMqtt::PayloadFormatIndicator::Unspecified}; - QMqttUserProperties userProperties; -}; - QMqttPublishProperties::QMqttPublishProperties() : data(new QMqttPublishPropertiesData) { - } /*! @@ -283,4 +269,34 @@ void QMqttPublishProperties::setContentType(const QString &type) data->contentType = type; } +QMqttMessageStatusProperties::QMqttMessageStatusProperties() : data(new QMqttMessageStatusPropertiesData) +{ + +} + +QMqttMessageStatusProperties &QMqttMessageStatusProperties::operator=(const QMqttMessageStatusProperties &rhs) +{ + if (this != &rhs) + data.operator=(rhs.data); + return *this; +} + +QMqtt::ReasonCode QMqttMessageStatusProperties::reasonCode() const +{ + return data->reasonCode; +} + +QString QMqttMessageStatusProperties::reason() const +{ + return data->reasonString; +} + +QMqttUserProperties QMqttMessageStatusProperties::userProperties() const +{ + return data->userProperties; +} + +QMqttMessageStatusProperties::~QMqttMessageStatusProperties() = default; +QMqttMessageStatusProperties::QMqttMessageStatusProperties(const QMqttMessageStatusProperties &) = default; + QT_END_NAMESPACE diff --git a/src/mqtt/qmqttpublishproperties.h b/src/mqtt/qmqttpublishproperties.h index 8549fc2..a239b4d 100644 --- a/src/mqtt/qmqttpublishproperties.h +++ b/src/mqtt/qmqttpublishproperties.h @@ -38,6 +38,7 @@ QT_BEGIN_NAMESPACE class QMqttPublishPropertiesData; +class QMqttMessageStatusPropertiesData; class Q_MQTT_EXPORT QMqttPublishProperties { @@ -93,6 +94,24 @@ private: Q_DECLARE_OPERATORS_FOR_FLAGS(QMqttPublishProperties::PublishPropertyDetails) +class Q_MQTT_EXPORT QMqttMessageStatusProperties +{ + Q_GADGET +public: + QMqttMessageStatusProperties(); + QMqttMessageStatusProperties(const QMqttMessageStatusProperties &); + QMqttMessageStatusProperties &operator=(const QMqttMessageStatusProperties &); + ~QMqttMessageStatusProperties(); + + QMqtt::ReasonCode reasonCode() const; + QString reason() const; + QMqttUserProperties userProperties() const; + +private: + friend class QMqttConnection; + QSharedDataPointer data; +}; + QT_END_NAMESPACE #endif // QMQTTPUBLISHPROPERTIES_H diff --git a/src/mqtt/qmqttpublishproperties_p.h b/src/mqtt/qmqttpublishproperties_p.h new file mode 100644 index 0000000..2fc9d21 --- /dev/null +++ b/src/mqtt/qmqttpublishproperties_p.h @@ -0,0 +1,75 @@ +/****************************************************************************** +** +** Copyright (C) 2018 The Qt Company Ltd. +** Contact: https://www.qt.io/licensing/ +** +** This file is part of the QtMqtt module. +** +** $QT_BEGIN_LICENSE:GPL$ +** Commercial License Usage +** Licensees holding valid commercial Qt licenses may use this file in +** accordance with the commercial license agreement provided with the +** Software or, alternatively, in accordance with the terms contained in +** a written agreement between you and The Qt Company. For licensing terms +** and conditions see https://www.qt.io/terms-conditions. For further +** information use the contact form at https://www.qt.io/contact-us. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 3 or (at your option) any later version +** approved by the KDE Free Qt Foundation. The licenses are as published by +** the Free Software Foundation and appearing in the file LICENSE.GPL3 +** included in the packaging of this file. Please review the following +** information to ensure the GNU General Public License requirements will +** be met: https://www.gnu.org/licenses/gpl-3.0.html. +** +** $QT_END_LICENSE$ +** +******************************************************************************/ + +#ifndef QMQTTPUBLISHPROPERTIES_P_H +#define QMQTTPUBLISHPROPERTIES_P_H + +// +// W A R N I N G +// ------------- +// +// This file is not part of the Qt API. It exists purely as an +// implementation detail. This header file may change from version to +// version without notice, or even be removed. +// +// We mean it. +// + +#include "qmqttglobal.h" +#include "qmqttpublishproperties.h" + +#include + +QT_BEGIN_NAMESPACE + +class QMqttPublishPropertiesData : public QSharedData +{ +public: + QString responseTopic; + QString contentType; + QByteArray correlationData; + quint32 messageExpiry{0}; + QList subscriptionIdentifier; + QMqttPublishProperties::PublishPropertyDetails details{QMqttPublishProperties::None}; + quint16 topicAlias{0}; + QMqtt::PayloadFormatIndicator payloadIndicator{QMqtt::PayloadFormatIndicator::Unspecified}; + QMqttUserProperties userProperties; +}; + +class QMqttMessageStatusPropertiesData : public QSharedData +{ +public: + QMqttUserProperties userProperties; + QString reasonString; + QMqtt::ReasonCode reasonCode{QMqtt::ReasonCode::Success}; +}; + +QT_END_NAMESPACE + +#endif // QMQTTPUBLISHPROPERTIES_P_H diff --git a/tests/auto/qmqttclient/tst_qmqttclient.cpp b/tests/auto/qmqttclient/tst_qmqttclient.cpp index f2e0888..e483da9 100644 --- a/tests/auto/qmqttclient/tst_qmqttclient.cpp +++ b/tests/auto/qmqttclient/tst_qmqttclient.cpp @@ -69,6 +69,10 @@ private Q_SLOTS: void staticProperties_QTBUG_67176_data(); void staticProperties_QTBUG_67176(); void authentication(); + void messageStatus_data(); + void messageStatus(); + void messageStatusReceive_data(); + void messageStatusReceive(); private: QProcess m_brokerProcess; QString m_testBroker; @@ -625,6 +629,102 @@ void Tst_QMqttClient::authentication() QTRY_COMPARE(client.state(), QMqttClient::Connected); } +Q_DECLARE_METATYPE(QMqtt::MessageStatus) + +void Tst_QMqttClient::messageStatus_data() +{ + QTest::addColumn("qos"); + QTest::addColumn>("expectedStatus"); + + QTest::newRow("QoS1") << 1 << (QList() << QMqtt::MessageStatus::Acknowledged); + QTest::newRow("QoS2") << 2 << (QList() << QMqtt::MessageStatus::Received + << QMqtt::MessageStatus::Completed); +} + +void Tst_QMqttClient::messageStatus() +{ + QFETCH(int, qos); + QFETCH(QList, expectedStatus); + + QMqttClient client; + client.setProtocolVersion(QMqttClient::MQTT_5_0); + client.setHostname(m_testBroker); + client.setPort(m_port); + + client.connectToHost(); + QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker."); + + const QString topic = QLatin1String("Qt/client/statusCheck"); + + connect(&client, &QMqttClient::messageStatusChanged, [&expectedStatus](qint32, + QMqtt::MessageStatus s, + const QMqttMessageStatusProperties &) + { + QCOMPARE(s, expectedStatus.first()); + expectedStatus.takeFirst(); + }); + + QSignalSpy publishSpy(&client, &QMqttClient::messageSent); + client.publish(topic, QByteArray("someContent"), quint8(qos)); + QTRY_VERIFY2(publishSpy.count() == 1, "Could not publish message"); + QTRY_VERIFY2(expectedStatus.isEmpty(), "Did not receive all status updates."); +} + +void Tst_QMqttClient::messageStatusReceive_data() +{ + QTest::addColumn("qos"); + QTest::addColumn>("expectedStatus"); + + QTest::newRow("QoS1") << 1 << (QList() << QMqtt::MessageStatus::Published); + QTest::newRow("QoS2") << 2 << (QList() << QMqtt::MessageStatus::Published + << QMqtt::MessageStatus::Released); +} + +void Tst_QMqttClient::messageStatusReceive() +{ + QFETCH(int, qos); + QFETCH(QList, expectedStatus); + + QMqttClient publisher; + publisher.setProtocolVersion(QMqttClient::MQTT_5_0); + publisher.setHostname(m_testBroker); + publisher.setPort(m_port); + + publisher.connectToHost(); + QTRY_VERIFY2(publisher.state() == QMqttClient::Connected, "Could not connect to broker."); + + QMqttClient subscriber; + subscriber.setProtocolVersion(QMqttClient::MQTT_5_0); + subscriber.setHostname(m_testBroker); + subscriber.setPort(m_port); + + subscriber.connectToHost(); + QTRY_VERIFY2(subscriber.state() == QMqttClient::Connected, "Could not connect to broker."); + + const QString topic = QLatin1String("Qt/client/statusCheckReceive"); + + auto subscription = subscriber.subscribe(topic, quint8(qos)); + QTRY_VERIFY2(subscription->state() == QMqttSubscription::Subscribed, "Could not subscribe to topic"); + QVERIFY(subscription->qos() >= qos); + + connect(&subscriber, &QMqttClient::messageStatusChanged, [&expectedStatus](qint32, + QMqtt::MessageStatus s, + const QMqttMessageStatusProperties &) + { + QCOMPARE(s, expectedStatus.first()); + expectedStatus.takeFirst(); + }); + + QSignalSpy publishSpy(&publisher, &QMqttClient::messageSent); + QSignalSpy receiveSpy(&subscriber, &QMqttClient::messageReceived); + + publisher.publish(topic, QByteArray("someContent"), quint8(qos)); + QTRY_VERIFY2(publishSpy.count() == 1, "Could not publish message"); + QTRY_VERIFY2(receiveSpy.count() == 1, "Did not receive message"); + + QTRY_VERIFY2(expectedStatus.isEmpty(), "Did not receive all status updates."); +} + QTEST_MAIN(Tst_QMqttClient) #include "tst_qmqttclient.moc" -- cgit v1.2.3