summaryrefslogtreecommitdiffstats
path: root/src/mqtt/qmqttconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mqtt/qmqttconnection.cpp')
-rw-r--r--src/mqtt/qmqttconnection.cpp60
1 files changed, 13 insertions, 47 deletions
diff --git a/src/mqtt/qmqttconnection.cpp b/src/mqtt/qmqttconnection.cpp
index a9c1b35..40e3908 100644
--- a/src/mqtt/qmqttconnection.cpp
+++ b/src/mqtt/qmqttconnection.cpp
@@ -245,12 +245,12 @@ bool QMqttConnection::sendControlConnect()
return true;
}
-qint32 QMqttConnection::sendControlPublish(const QString &topic, const QByteArray &message, quint8 qos, bool retain)
+qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic, const QByteArray &message, quint8 qos, bool retain)
{
qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes."
<< "QoS:" << qos << " Retain:" << retain;
- if (topic.contains(QLatin1Char('#')) || topic.contains('+'))
+ if (!topic.isValid())
return -1;
quint8 header = QMqttControlPacket::PUBLISH;
@@ -263,15 +263,7 @@ qint32 QMqttConnection::sendControlPublish(const QString &topic, const QByteArra
header |= 0x01;
QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header));
-
- QByteArray topicArray = topic.toUtf8();
- const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max();
- if (topicArray.size() > u16max) {
- qWarning("Published topic is too long. Need to truncate");
- topicArray.truncate(u16max);
- }
-
- packet->append(topicArray);
+ packet->append(topic.name().toUtf8());
quint16 identifier = 0;
if (qos > 0) {
identifier = unusedPacketIdentifier();
@@ -322,7 +314,7 @@ bool QMqttConnection::sendControlPublishComp(quint16 id)
return writePacketToTransport(packet);
}
-QMqttSubscription *QMqttConnection::sendControlSubscribe(const QString &topic, quint8 qos)
+QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic, quint8 qos)
{
qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos;
@@ -340,13 +332,12 @@ QMqttSubscription *QMqttConnection::sendControlSubscribe(const QString &topic, q
packet.append(identifier);
// Overflow protection
- QByteArray topicArray = topic.toUtf8();
- if (topicArray.size() > std::numeric_limits<std::uint16_t>::max()) {
- qWarning("Subscribed topic is too long.");
+ if (!topic.isValid()) {
+ qWarning("Subscribed topic filter is not valid.");
return nullptr;
}
- packet.append(topicArray);
+ packet.append(topic.filter().toUtf8());
switch (qos) {
case 0: packet.append(char(0x0)); break;
@@ -356,7 +347,7 @@ QMqttSubscription *QMqttConnection::sendControlSubscribe(const QString &topic, q
}
auto result = new QMqttSubscription(this);
- result->setTopic(QString::fromUtf8(topicArray));
+ result->setTopic(topic);
result->setClient(m_clientPrivate->m_client);
result->setQos(qos);
result->setState(QMqttSubscription::SubscriptionPending);
@@ -372,12 +363,12 @@ QMqttSubscription *QMqttConnection::sendControlSubscribe(const QString &topic, q
return result;
}
-bool QMqttConnection::sendControlUnsubscribe(const QString &topic)
+bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic)
{
qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic;
// MQTT-3.10.3-2
- if (topic.isEmpty())
+ if (!topic.isValid())
return false;
if (!m_activeSubscriptions.contains(topic))
@@ -398,7 +389,7 @@ bool QMqttConnection::sendControlUnsubscribe(const QString &topic)
packet.append(identifier);
- packet.append(topic.toUtf8());
+ packet.append(topic.filter().toUtf8());
auto sub = m_activeSubscriptions[topic];
sub->setState(QMqttSubscription::UnsubscriptionPending);
@@ -619,7 +610,7 @@ void QMqttConnection::finalize_publish()
{
// String topic
const quint16 topicLength = qFromBigEndian<quint16>(reinterpret_cast<const quint16 *>(readBuffer(2).constData()));
- const QString topic = QString::fromUtf8(reinterpret_cast<const char *>(readBuffer(topicLength).constData()));
+ const QMqttTopicName topic = QString::fromUtf8(reinterpret_cast<const char *>(readBuffer(topicLength).constData()));
quint16 id = 0;
if (m_currentPublish.qos > 0) {
@@ -639,33 +630,8 @@ void QMqttConnection::finalize_publish()
m_currentPublish.dup, m_currentPublish.retain);
for (auto sub = m_activeSubscriptions.constBegin(); sub != m_activeSubscriptions.constEnd(); sub++) {
- const QString subTopic = sub.key();
-
- if (subTopic == topic) {
- emit sub.value()->messageReceived(qmsg);
- continue;
- } else if (subTopic.endsWith(QLatin1Char('#')) && topic.startsWith(subTopic.leftRef(subTopic.size() - 1))) {
+ if (sub.key().match(topic))
emit sub.value()->messageReceived(qmsg);
- continue;
- }
-
- if (!subTopic.contains(QLatin1Char('+')))
- continue;
-
- const QVector<QStringRef> subTopicSplit = subTopic.splitRef(QLatin1Char('/'));
- const QVector<QStringRef> topicSplit = topic.splitRef(QLatin1Char('/'));
- if (subTopicSplit.size() != topicSplit.size())
- continue;
- bool match = true;
- for (int i = 0; i < subTopicSplit.size() && match; ++i) {
- if (subTopicSplit.at(i) == QLatin1Char('+') || subTopicSplit.at(i) == topicSplit.at(i))
- continue;
- match = false;
- }
-
- if (match) {
- emit sub.value()->messageReceived(qmsg);
- }
}
if (m_currentPublish.qos == 1)