summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaurice Kalinowski <maurice.kalinowski@qt.io>2018-07-31 12:24:08 +0200
committerMaurice Kalinowski <maurice.kalinowski@qt.io>2018-08-14 03:47:04 +0000
commit5ef7e409fb3b4a5f44a424b5f1998437478ccccc (patch)
treeae426afaad7db6468f147f8b71495471fcd90686
parentf4648a7df89f31907ea2a4718148daae06bec110 (diff)
Add support for message status updates
A user might be interested in knowing the exact state of a message while publishing or when a message is received. Task-number: QTPM-1453 Task-number: QTBUG-66599 Change-Id: Ibf977f76ba6078b5b525ba00b6988d4b69960176 Reviewed-by: hjk <hjk@qt.io>
-rw-r--r--src/mqtt/mqtt.pro1
-rw-r--r--src/mqtt/qmqttclient.h1
-rw-r--r--src/mqtt/qmqttconnection.cpp61
-rw-r--r--src/mqtt/qmqttconnection_p.h1
-rw-r--r--src/mqtt/qmqttglobal.h22
-rw-r--r--src/mqtt/qmqttpublishproperties.cpp46
-rw-r--r--src/mqtt/qmqttpublishproperties.h19
-rw-r--r--src/mqtt/qmqttpublishproperties_p.h75
-rw-r--r--tests/auto/qmqttclient/tst_qmqttclient.cpp100
9 files changed, 299 insertions, 27 deletions
diff --git a/src/mqtt/mqtt.pro b/src/mqtt/mqtt.pro
index adbfc54..aa6c99c 100644
--- a/src/mqtt/mqtt.pro
+++ b/src/mqtt/mqtt.pro
@@ -26,6 +26,7 @@ PRIVATE_HEADERS += \
qmqttconnectionproperties_p.h \
qmqttcontrolpacket_p.h \
qmqttmessage_p.h \
+ qmqttpublishproperties_p.h \
qmqttsubscription_p.h
SOURCES += \
diff --git a/src/mqtt/qmqttclient.h b/src/mqtt/qmqttclient.h
index d462e35..f818c3c 100644
--- a/src/mqtt/qmqttclient.h
+++ b/src/mqtt/qmqttclient.h
@@ -155,6 +155,7 @@ Q_SIGNALS:
void connected();
void disconnected();
void messageReceived(const QByteArray &message, const QMqttTopicName &topic = QMqttTopicName());
+ void messageStatusChanged(qint32 id, QMqtt::MessageStatus s, const QMqttMessageStatusProperties &properties);
void messageSent(qint32 id);
void pingResponseReceived();
void brokerSessionRestored();
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp
index 69e153b..d86e8ca 100644
--- a/src/mqtt/qmqttconnection.cpp
+++ b/src/mqtt/qmqttconnection.cpp
@@ -31,6 +31,7 @@
#include "qmqttconnectionproperties_p.h"
#include "qmqttcontrolpacket_p.h"
#include "qmqttmessage_p.h"
+#include "qmqttpublishproperties_p.h"
#include "qmqttsubscription_p.h"
#include "qmqttclient_p.h"
@@ -892,6 +893,32 @@ void QMqttConnection::readConnackProperties()
m_clientPrivate->m_serverConnectionProperties = serverProperties;
}
+void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties)
+{
+ qint64 propertyLength = readVariableByteInteger();
+
+ m_missingData -= propertyLength;
+ while (propertyLength > 0) {
+ const quint8 propertyId = readBufferTyped<quint8>(&propertyLength);
+ switch (propertyId) {
+ case 0x1f: { // 3.4.2.2.2 Reason String
+ const QString content = readBufferTyped<QString>(&propertyLength);
+ properties.data->reasonString = content;
+ break;
+ }
+ case 0x26: { // 3.4.2.2.3 User Properites
+ const QString propertyName = readBufferTyped<QString>(&propertyLength);
+ const QString propertyValue = readBufferTyped<QString>(&propertyLength);
+ properties.data->userProperties.append(QMqttStringPair(propertyName, propertyValue));
+ break;
+ }
+ default:
+ qCDebug(lcMqttConnection) << "Unknown subscription property received.";
+ break;
+ }
+ }
+}
+
void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties)
{
qint32 propertySize = 0;
@@ -1537,6 +1564,11 @@ void QMqttConnection::finalize_publish()
m_currentPublish.dup, m_currentPublish.retain);
qmsg.d->m_publishProperties = publishProperties;
+ if (id != 0) {
+ QMqttMessageStatusProperties statusProp;
+ statusProp.data->userProperties = publishProperties.userProperties();
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Published, statusProp);
+ }
for (auto sub = m_activeSubscriptions.constBegin(); sub != m_activeSubscriptions.constEnd(); sub++) {
if (sub.key().match(topic))
@@ -1554,27 +1586,20 @@ void QMqttConnection::finalize_pubAckRecComp()
qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/COMP";
const quint16 id = readBufferTyped<quint16>(&m_missingData);
+ QMqttMessageStatusProperties properties;
if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
// Reason Code (1byte)
const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData);
- Q_UNUSED(reasonCode); // ### TODO: Do something with it, currently silences compiler
- if (m_missingData > 0) {
- // Property Length (Variable Int)
- qint32 byteCount = 0;
- const qint32 propertyLength = readVariableByteInteger(&byteCount);
- m_missingData -= byteCount;
- // ### TODO: Publish ACK/REC/COMP property handling
- if (propertyLength > 0) {
- readBuffer(quint64(propertyLength));
- m_missingData -= propertyLength;
- }
- }
+ properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
+ if (m_missingData > 0)
+ readMessageStatusProperties(properties);
}
if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) {
qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id;
auto pendingRelease = m_pendingReleaseMessages.take(id);
if (!pendingRelease)
qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message.";
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Completed, properties);
emit m_clientPrivate->m_client->messageSent(id);
return;
}
@@ -1587,9 +1612,11 @@ void QMqttConnection::finalize_pubAckRecComp()
if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id;
m_pendingReleaseMessages.insert(id, pendingMsg);
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Received, properties);
sendControlPublishRelease(id);
} else {
qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id;
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Acknowledged, properties);
emit m_clientPrivate->m_client->messageSent(id);
}
}
@@ -1600,6 +1627,16 @@ void QMqttConnection::finalize_pubrel()
qCDebug(lcMqttConnectionVerbose) << "Finalize PUBREL:" << id;
+ QMqttMessageStatusProperties properties;
+ if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
+ const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData);
+ properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
+ if (m_missingData > 0)
+ readMessageStatusProperties(properties);
+ }
+
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Released, properties);
+
// ### TODO: send to our app now or not???
// See standard Figure 4.3 Method A or B ???
sendControlPublishComp(id);
diff --git a/src/mqtt/qmqttconnection_p.h b/src/mqtt/qmqttconnection_p.h
index e2dafa1..f703599 100644
--- a/src/mqtt/qmqttconnection_p.h
+++ b/src/mqtt/qmqttconnection_p.h
@@ -122,6 +122,7 @@ private:
qint32 readVariableByteInteger(qint32 *byteCount = nullptr);
void readAuthProperties(QMqttAuthenticationProperties &properties);
void readConnackProperties();
+ void readMessageStatusProperties(QMqttMessageStatusProperties &properties);
void readPublishProperties(QMqttPublishProperties &properties);
void readSubscriptionProperties(QMqttSubscription *sub);
QByteArray writeConnectProperties();
diff --git a/src/mqtt/qmqttglobal.h b/src/mqtt/qmqttglobal.h
index 1524947..dfe2971 100644
--- a/src/mqtt/qmqttglobal.h
+++ b/src/mqtt/qmqttglobal.h
@@ -50,6 +50,28 @@ enum class PayloadFormatIndicator : quint8 {
Unspecified = 0,
UTF8Encoded = 1
};
+
+enum class MessageStatus : quint8 {
+ Unknown = 0,
+ Published,
+ Acknowledged,
+ Received,
+ Released,
+ Completed
+};
+
+enum class ReasonCode : quint16 {
+ Success = 0,
+ NoMatchingSubscriber = 0x10,
+ UnspecifiedError = 0x80,
+ ImplementationSpecificError = 0x83,
+ NotAuthorized = 0x87,
+ InvalidTopicName = 0x90,
+ MessageIdInUse = 0x91,
+ MessageIdNotFound = 0x92,
+ QuotaExceeded = 0x97,
+ InvalidPayloadFormat = 0x99
+};
}
QT_END_NAMESPACE
diff --git a/src/mqtt/qmqttpublishproperties.cpp b/src/mqtt/qmqttpublishproperties.cpp
index c04cadd..77e2118 100644
--- a/src/mqtt/qmqttpublishproperties.cpp
+++ b/src/mqtt/qmqttpublishproperties.cpp
@@ -28,6 +28,7 @@
******************************************************************************/
#include "qmqttpublishproperties.h"
+#include "qmqttpublishproperties_p.h"
#include "qmqtttype.h"
#include <QtCore/QLoggingCategory>
@@ -84,23 +85,8 @@ Q_DECLARE_LOGGING_CATEGORY(lcMqttClient)
A description of the content of the message.
*/
-class QMqttPublishPropertiesData : public QSharedData
-{
-public:
- QString responseTopic;
- QString contentType;
- QByteArray correlationData;
- quint32 messageExpiry{0};
- QList<quint32> subscriptionIdentifier;
- QMqttPublishProperties::PublishPropertyDetails details{QMqttPublishProperties::None};
- quint16 topicAlias{0};
- QMqtt::PayloadFormatIndicator payloadIndicator{QMqtt::PayloadFormatIndicator::Unspecified};
- QMqttUserProperties userProperties;
-};
-
QMqttPublishProperties::QMqttPublishProperties() : data(new QMqttPublishPropertiesData)
{
-
}
/*!
@@ -283,4 +269,34 @@ void QMqttPublishProperties::setContentType(const QString &type)
data->contentType = type;
}
+QMqttMessageStatusProperties::QMqttMessageStatusProperties() : data(new QMqttMessageStatusPropertiesData)
+{
+
+}
+
+QMqttMessageStatusProperties &QMqttMessageStatusProperties::operator=(const QMqttMessageStatusProperties &rhs)
+{
+ if (this != &rhs)
+ data.operator=(rhs.data);
+ return *this;
+}
+
+QMqtt::ReasonCode QMqttMessageStatusProperties::reasonCode() const
+{
+ return data->reasonCode;
+}
+
+QString QMqttMessageStatusProperties::reason() const
+{
+ return data->reasonString;
+}
+
+QMqttUserProperties QMqttMessageStatusProperties::userProperties() const
+{
+ return data->userProperties;
+}
+
+QMqttMessageStatusProperties::~QMqttMessageStatusProperties() = default;
+QMqttMessageStatusProperties::QMqttMessageStatusProperties(const QMqttMessageStatusProperties &) = default;
+
QT_END_NAMESPACE
diff --git a/src/mqtt/qmqttpublishproperties.h b/src/mqtt/qmqttpublishproperties.h
index 8549fc2..a239b4d 100644
--- a/src/mqtt/qmqttpublishproperties.h
+++ b/src/mqtt/qmqttpublishproperties.h
@@ -38,6 +38,7 @@
QT_BEGIN_NAMESPACE
class QMqttPublishPropertiesData;
+class QMqttMessageStatusPropertiesData;
class Q_MQTT_EXPORT QMqttPublishProperties
{
@@ -93,6 +94,24 @@ private:
Q_DECLARE_OPERATORS_FOR_FLAGS(QMqttPublishProperties::PublishPropertyDetails)
+class Q_MQTT_EXPORT QMqttMessageStatusProperties
+{
+ Q_GADGET
+public:
+ QMqttMessageStatusProperties();
+ QMqttMessageStatusProperties(const QMqttMessageStatusProperties &);
+ QMqttMessageStatusProperties &operator=(const QMqttMessageStatusProperties &);
+ ~QMqttMessageStatusProperties();
+
+ QMqtt::ReasonCode reasonCode() const;
+ QString reason() const;
+ QMqttUserProperties userProperties() const;
+
+private:
+ friend class QMqttConnection;
+ QSharedDataPointer<QMqttMessageStatusPropertiesData> data;
+};
+
QT_END_NAMESPACE
#endif // QMQTTPUBLISHPROPERTIES_H
diff --git a/src/mqtt/qmqttpublishproperties_p.h b/src/mqtt/qmqttpublishproperties_p.h
new file mode 100644
index 0000000..2fc9d21
--- /dev/null
+++ b/src/mqtt/qmqttpublishproperties_p.h
@@ -0,0 +1,75 @@
+/******************************************************************************
+**
+** Copyright (C) 2018 The Qt Company Ltd.
+** Contact: https://www.qt.io/licensing/
+**
+** This file is part of the QtMqtt module.
+**
+** $QT_BEGIN_LICENSE:GPL$
+** Commercial License Usage
+** Licensees holding valid commercial Qt licenses may use this file in
+** accordance with the commercial license agreement provided with the
+** Software or, alternatively, in accordance with the terms contained in
+** a written agreement between you and The Qt Company. For licensing terms
+** and conditions see https://www.qt.io/terms-conditions. For further
+** information use the contact form at https://www.qt.io/contact-us.
+**
+** GNU General Public License Usage
+** Alternatively, this file may be used under the terms of the GNU
+** General Public License version 3 or (at your option) any later version
+** approved by the KDE Free Qt Foundation. The licenses are as published by
+** the Free Software Foundation and appearing in the file LICENSE.GPL3
+** included in the packaging of this file. Please review the following
+** information to ensure the GNU General Public License requirements will
+** be met: https://www.gnu.org/licenses/gpl-3.0.html.
+**
+** $QT_END_LICENSE$
+**
+******************************************************************************/
+
+#ifndef QMQTTPUBLISHPROPERTIES_P_H
+#define QMQTTPUBLISHPROPERTIES_P_H
+
+//
+// W A R N I N G
+// -------------
+//
+// This file is not part of the Qt API. It exists purely as an
+// implementation detail. This header file may change from version to
+// version without notice, or even be removed.
+//
+// We mean it.
+//
+
+#include "qmqttglobal.h"
+#include "qmqttpublishproperties.h"
+
+#include <QtCore/QSharedData>
+
+QT_BEGIN_NAMESPACE
+
+class QMqttPublishPropertiesData : public QSharedData
+{
+public:
+ QString responseTopic;
+ QString contentType;
+ QByteArray correlationData;
+ quint32 messageExpiry{0};
+ QList<quint32> subscriptionIdentifier;
+ QMqttPublishProperties::PublishPropertyDetails details{QMqttPublishProperties::None};
+ quint16 topicAlias{0};
+ QMqtt::PayloadFormatIndicator payloadIndicator{QMqtt::PayloadFormatIndicator::Unspecified};
+ QMqttUserProperties userProperties;
+};
+
+class QMqttMessageStatusPropertiesData : public QSharedData
+{
+public:
+ QMqttUserProperties userProperties;
+ QString reasonString;
+ QMqtt::ReasonCode reasonCode{QMqtt::ReasonCode::Success};
+};
+
+QT_END_NAMESPACE
+
+#endif // QMQTTPUBLISHPROPERTIES_P_H
diff --git a/tests/auto/qmqttclient/tst_qmqttclient.cpp b/tests/auto/qmqttclient/tst_qmqttclient.cpp
index f2e0888..e483da9 100644
--- a/tests/auto/qmqttclient/tst_qmqttclient.cpp
+++ b/tests/auto/qmqttclient/tst_qmqttclient.cpp
@@ -69,6 +69,10 @@ private Q_SLOTS:
void staticProperties_QTBUG_67176_data();
void staticProperties_QTBUG_67176();
void authentication();
+ void messageStatus_data();
+ void messageStatus();
+ void messageStatusReceive_data();
+ void messageStatusReceive();
private:
QProcess m_brokerProcess;
QString m_testBroker;
@@ -625,6 +629,102 @@ void Tst_QMqttClient::authentication()
QTRY_COMPARE(client.state(), QMqttClient::Connected);
}
+Q_DECLARE_METATYPE(QMqtt::MessageStatus)
+
+void Tst_QMqttClient::messageStatus_data()
+{
+ QTest::addColumn<int>("qos");
+ QTest::addColumn<QList<QMqtt::MessageStatus>>("expectedStatus");
+
+ QTest::newRow("QoS1") << 1 << (QList<QMqtt::MessageStatus>() << QMqtt::MessageStatus::Acknowledged);
+ QTest::newRow("QoS2") << 2 << (QList<QMqtt::MessageStatus>() << QMqtt::MessageStatus::Received
+ << QMqtt::MessageStatus::Completed);
+}
+
+void Tst_QMqttClient::messageStatus()
+{
+ QFETCH(int, qos);
+ QFETCH(QList<QMqtt::MessageStatus>, expectedStatus);
+
+ QMqttClient client;
+ client.setProtocolVersion(QMqttClient::MQTT_5_0);
+ client.setHostname(m_testBroker);
+ client.setPort(m_port);
+
+ client.connectToHost();
+ QTRY_VERIFY2(client.state() == QMqttClient::Connected, "Could not connect to broker.");
+
+ const QString topic = QLatin1String("Qt/client/statusCheck");
+
+ connect(&client, &QMqttClient::messageStatusChanged, [&expectedStatus](qint32,
+ QMqtt::MessageStatus s,
+ const QMqttMessageStatusProperties &)
+ {
+ QCOMPARE(s, expectedStatus.first());
+ expectedStatus.takeFirst();
+ });
+
+ QSignalSpy publishSpy(&client, &QMqttClient::messageSent);
+ client.publish(topic, QByteArray("someContent"), quint8(qos));
+ QTRY_VERIFY2(publishSpy.count() == 1, "Could not publish message");
+ QTRY_VERIFY2(expectedStatus.isEmpty(), "Did not receive all status updates.");
+}
+
+void Tst_QMqttClient::messageStatusReceive_data()
+{
+ QTest::addColumn<int>("qos");
+ QTest::addColumn<QList<QMqtt::MessageStatus>>("expectedStatus");
+
+ QTest::newRow("QoS1") << 1 << (QList<QMqtt::MessageStatus>() << QMqtt::MessageStatus::Published);
+ QTest::newRow("QoS2") << 2 << (QList<QMqtt::MessageStatus>() << QMqtt::MessageStatus::Published
+ << QMqtt::MessageStatus::Released);
+}
+
+void Tst_QMqttClient::messageStatusReceive()
+{
+ QFETCH(int, qos);
+ QFETCH(QList<QMqtt::MessageStatus>, expectedStatus);
+
+ QMqttClient publisher;
+ publisher.setProtocolVersion(QMqttClient::MQTT_5_0);
+ publisher.setHostname(m_testBroker);
+ publisher.setPort(m_port);
+
+ publisher.connectToHost();
+ QTRY_VERIFY2(publisher.state() == QMqttClient::Connected, "Could not connect to broker.");
+
+ QMqttClient subscriber;
+ subscriber.setProtocolVersion(QMqttClient::MQTT_5_0);
+ subscriber.setHostname(m_testBroker);
+ subscriber.setPort(m_port);
+
+ subscriber.connectToHost();
+ QTRY_VERIFY2(subscriber.state() == QMqttClient::Connected, "Could not connect to broker.");
+
+ const QString topic = QLatin1String("Qt/client/statusCheckReceive");
+
+ auto subscription = subscriber.subscribe(topic, quint8(qos));
+ QTRY_VERIFY2(subscription->state() == QMqttSubscription::Subscribed, "Could not subscribe to topic");
+ QVERIFY(subscription->qos() >= qos);
+
+ connect(&subscriber, &QMqttClient::messageStatusChanged, [&expectedStatus](qint32,
+ QMqtt::MessageStatus s,
+ const QMqttMessageStatusProperties &)
+ {
+ QCOMPARE(s, expectedStatus.first());
+ expectedStatus.takeFirst();
+ });
+
+ QSignalSpy publishSpy(&publisher, &QMqttClient::messageSent);
+ QSignalSpy receiveSpy(&subscriber, &QMqttClient::messageReceived);
+
+ publisher.publish(topic, QByteArray("someContent"), quint8(qos));
+ QTRY_VERIFY2(publishSpy.count() == 1, "Could not publish message");
+ QTRY_VERIFY2(receiveSpy.count() == 1, "Did not receive message");
+
+ QTRY_VERIFY2(expectedStatus.isEmpty(), "Did not receive all status updates.");
+}
+
QTEST_MAIN(Tst_QMqttClient)
#include "tst_qmqttclient.moc"