summaryrefslogtreecommitdiffstats
path: root/src/mqtt/qmqttconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mqtt/qmqttconnection.cpp')
-rw-r--r--src/mqtt/qmqttconnection.cpp61
1 files changed, 49 insertions, 12 deletions
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp
index 69e153b..d86e8ca 100644
--- a/src/mqtt/qmqttconnection.cpp
+++ b/src/mqtt/qmqttconnection.cpp
@@ -31,6 +31,7 @@
#include "qmqttconnectionproperties_p.h"
#include "qmqttcontrolpacket_p.h"
#include "qmqttmessage_p.h"
+#include "qmqttpublishproperties_p.h"
#include "qmqttsubscription_p.h"
#include "qmqttclient_p.h"
@@ -892,6 +893,32 @@ void QMqttConnection::readConnackProperties()
m_clientPrivate->m_serverConnectionProperties = serverProperties;
}
+void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties)
+{
+ qint64 propertyLength = readVariableByteInteger();
+
+ m_missingData -= propertyLength;
+ while (propertyLength > 0) {
+ const quint8 propertyId = readBufferTyped<quint8>(&propertyLength);
+ switch (propertyId) {
+ case 0x1f: { // 3.4.2.2.2 Reason String
+ const QString content = readBufferTyped<QString>(&propertyLength);
+ properties.data->reasonString = content;
+ break;
+ }
+ case 0x26: { // 3.4.2.2.3 User Properites
+ const QString propertyName = readBufferTyped<QString>(&propertyLength);
+ const QString propertyValue = readBufferTyped<QString>(&propertyLength);
+ properties.data->userProperties.append(QMqttStringPair(propertyName, propertyValue));
+ break;
+ }
+ default:
+ qCDebug(lcMqttConnection) << "Unknown subscription property received.";
+ break;
+ }
+ }
+}
+
void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties)
{
qint32 propertySize = 0;
@@ -1537,6 +1564,11 @@ void QMqttConnection::finalize_publish()
m_currentPublish.dup, m_currentPublish.retain);
qmsg.d->m_publishProperties = publishProperties;
+ if (id != 0) {
+ QMqttMessageStatusProperties statusProp;
+ statusProp.data->userProperties = publishProperties.userProperties();
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Published, statusProp);
+ }
for (auto sub = m_activeSubscriptions.constBegin(); sub != m_activeSubscriptions.constEnd(); sub++) {
if (sub.key().match(topic))
@@ -1554,27 +1586,20 @@ void QMqttConnection::finalize_pubAckRecComp()
qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/COMP";
const quint16 id = readBufferTyped<quint16>(&m_missingData);
+ QMqttMessageStatusProperties properties;
if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
// Reason Code (1byte)
const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData);
- Q_UNUSED(reasonCode); // ### TODO: Do something with it, currently silences compiler
- if (m_missingData > 0) {
- // Property Length (Variable Int)
- qint32 byteCount = 0;
- const qint32 propertyLength = readVariableByteInteger(&byteCount);
- m_missingData -= byteCount;
- // ### TODO: Publish ACK/REC/COMP property handling
- if (propertyLength > 0) {
- readBuffer(quint64(propertyLength));
- m_missingData -= propertyLength;
- }
- }
+ properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
+ if (m_missingData > 0)
+ readMessageStatusProperties(properties);
}
if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) {
qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id;
auto pendingRelease = m_pendingReleaseMessages.take(id);
if (!pendingRelease)
qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message.";
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Completed, properties);
emit m_clientPrivate->m_client->messageSent(id);
return;
}
@@ -1587,9 +1612,11 @@ void QMqttConnection::finalize_pubAckRecComp()
if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id;
m_pendingReleaseMessages.insert(id, pendingMsg);
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Received, properties);
sendControlPublishRelease(id);
} else {
qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id;
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Acknowledged, properties);
emit m_clientPrivate->m_client->messageSent(id);
}
}
@@ -1600,6 +1627,16 @@ void QMqttConnection::finalize_pubrel()
qCDebug(lcMqttConnectionVerbose) << "Finalize PUBREL:" << id;
+ QMqttMessageStatusProperties properties;
+ if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
+ const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData);
+ properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
+ if (m_missingData > 0)
+ readMessageStatusProperties(properties);
+ }
+
+ emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Released, properties);
+
// ### TODO: send to our app now or not???
// See standard Figure 4.3 Method A or B ???
sendControlPublishComp(id);