diff options
Diffstat (limited to 'src/mqtt/qmqttconnection.cpp')
-rw-r--r-- | src/mqtt/qmqttconnection.cpp | 61 |
1 files changed, 49 insertions, 12 deletions
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp index 69e153b..d86e8ca 100644 --- a/src/mqtt/qmqttconnection.cpp +++ b/src/mqtt/qmqttconnection.cpp @@ -31,6 +31,7 @@ #include "qmqttconnectionproperties_p.h" #include "qmqttcontrolpacket_p.h" #include "qmqttmessage_p.h" +#include "qmqttpublishproperties_p.h" #include "qmqttsubscription_p.h" #include "qmqttclient_p.h" @@ -892,6 +893,32 @@ void QMqttConnection::readConnackProperties() m_clientPrivate->m_serverConnectionProperties = serverProperties; } +void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties) +{ + qint64 propertyLength = readVariableByteInteger(); + + m_missingData -= propertyLength; + while (propertyLength > 0) { + const quint8 propertyId = readBufferTyped<quint8>(&propertyLength); + switch (propertyId) { + case 0x1f: { // 3.4.2.2.2 Reason String + const QString content = readBufferTyped<QString>(&propertyLength); + properties.data->reasonString = content; + break; + } + case 0x26: { // 3.4.2.2.3 User Properites + const QString propertyName = readBufferTyped<QString>(&propertyLength); + const QString propertyValue = readBufferTyped<QString>(&propertyLength); + properties.data->userProperties.append(QMqttStringPair(propertyName, propertyValue)); + break; + } + default: + qCDebug(lcMqttConnection) << "Unknown subscription property received."; + break; + } + } +} + void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties) { qint32 propertySize = 0; @@ -1537,6 +1564,11 @@ void QMqttConnection::finalize_publish() m_currentPublish.dup, m_currentPublish.retain); qmsg.d->m_publishProperties = publishProperties; + if (id != 0) { + QMqttMessageStatusProperties statusProp; + statusProp.data->userProperties = publishProperties.userProperties(); + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Published, statusProp); + } for (auto sub = m_activeSubscriptions.constBegin(); sub != m_activeSubscriptions.constEnd(); sub++) { if (sub.key().match(topic)) @@ -1554,27 +1586,20 @@ void QMqttConnection::finalize_pubAckRecComp() qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/COMP"; const quint16 id = readBufferTyped<quint16>(&m_missingData); + QMqttMessageStatusProperties properties; if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { // Reason Code (1byte) const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData); - Q_UNUSED(reasonCode); // ### TODO: Do something with it, currently silences compiler - if (m_missingData > 0) { - // Property Length (Variable Int) - qint32 byteCount = 0; - const qint32 propertyLength = readVariableByteInteger(&byteCount); - m_missingData -= byteCount; - // ### TODO: Publish ACK/REC/COMP property handling - if (propertyLength > 0) { - readBuffer(quint64(propertyLength)); - m_missingData -= propertyLength; - } - } + properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); + if (m_missingData > 0) + readMessageStatusProperties(properties); } if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) { qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id; auto pendingRelease = m_pendingReleaseMessages.take(id); if (!pendingRelease) qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message."; + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Completed, properties); emit m_clientPrivate->m_client->messageSent(id); return; } @@ -1587,9 +1612,11 @@ void QMqttConnection::finalize_pubAckRecComp() if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id; m_pendingReleaseMessages.insert(id, pendingMsg); + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Received, properties); sendControlPublishRelease(id); } else { qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id; + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Acknowledged, properties); emit m_clientPrivate->m_client->messageSent(id); } } @@ -1600,6 +1627,16 @@ void QMqttConnection::finalize_pubrel() qCDebug(lcMqttConnectionVerbose) << "Finalize PUBREL:" << id; + QMqttMessageStatusProperties properties; + if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { + const quint8 reasonCode = readBufferTyped<quint8>(&m_missingData); + properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); + if (m_missingData > 0) + readMessageStatusProperties(properties); + } + + emit m_clientPrivate->m_client->messageStatusChanged(id, QMqtt::MessageStatus::Released, properties); + // ### TODO: send to our app now or not??? // See standard Figure 4.3 Method A or B ??? sendControlPublishComp(id); |