diff options
-rw-r--r-- | src/opcua/client/qopcuabackend.cpp | 5 | ||||
-rw-r--r-- | src/opcua/client/qopcuabackend_p.h | 2 | ||||
-rw-r--r-- | src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp | 22 | ||||
-rw-r--r-- | src/plugins/opcua/freeopcua/qfreeopcuaworker.h | 3 | ||||
-rw-r--r-- | src/plugins/opcua/open62541/qopen62541backend.cpp | 28 | ||||
-rw-r--r-- | src/plugins/opcua/open62541/qopen62541backend.h | 3 | ||||
-rw-r--r-- | src/plugins/opcua/uacpp/quacppbackend.cpp | 8 | ||||
-rw-r--r-- | src/plugins/opcua/uacpp/quacppbackend.h | 1 | ||||
-rw-r--r-- | tests/auto/qopcuaclient/tst_client.cpp | 63 |
9 files changed, 122 insertions, 13 deletions
diff --git a/src/opcua/client/qopcuabackend.cpp b/src/opcua/client/qopcuabackend.cpp index 847e1eb..8dc3652 100644 --- a/src/opcua/client/qopcuabackend.cpp +++ b/src/opcua/client/qopcuabackend.cpp @@ -84,4 +84,9 @@ QOpcUa::Types QOpcUaBackend::attributeIdToTypeId(QOpcUa::NodeAttribute attr) } } +double QOpcUaBackend::revisePublishingInterval(double requestedValue, double minimumValue) +{ + return requestedValue == 0 ? 0 : std::max(requestedValue, minimumValue); +} + QT_END_NAMESPACE diff --git a/src/opcua/client/qopcuabackend_p.h b/src/opcua/client/qopcuabackend_p.h index acfc603..4f9ccab 100644 --- a/src/opcua/client/qopcuabackend_p.h +++ b/src/opcua/client/qopcuabackend_p.h @@ -74,6 +74,8 @@ public: QOpcUa::Types attributeIdToTypeId(QOpcUa::NodeAttribute attr); + double revisePublishingInterval(double requestedValue, double minimumValue); + Q_SIGNALS: void stateAndOrErrorChanged(QOpcUaClient::ClientState state, QOpcUaClient::ClientError error); diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp index 3acdf8b..75a4692 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp +++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp @@ -52,15 +52,18 @@ Q_DECLARE_LOGGING_CATEGORY(QT_OPCUA_PLUGINS_FREEOPCUA) QFreeOpcUaWorker::QFreeOpcUaWorker(QFreeOpcUaClientImpl *client) : QOpcUaBackend() , m_client(client) + , m_minPublishingInterval(0) {} QFreeOpcUaWorker::~QFreeOpcUaWorker() { - qDeleteAll(m_subscriptions); + cleanupSubscriptions(); } void QFreeOpcUaWorker::asyncConnectToEndpoint(const QUrl &url) { + cleanupSubscriptions(); + try { QString sNodeName = QHostInfo::localHostName(); SetApplicationURI(QString("urn:%1:%2:%3").arg( @@ -102,8 +105,7 @@ void QFreeOpcUaWorker::asyncDisconnectFromEndpoint() qCWarning(QT_OPCUA_PLUGINS_FREEOPCUA) << "Could not disconnect from endpoint:" << ex.what(); } - qDeleteAll(m_subscriptions); - m_subscriptions.clear(); + cleanupSubscriptions(); emit stateAndOrErrorChanged(QOpcUaClient::Disconnected, QOpcUaClient::UnknownError); } @@ -258,8 +260,10 @@ void QFreeOpcUaWorker::writeAttributes(uintptr_t handle, OpcUa::Node node, QOpcU QFreeOpcUaSubscription *QFreeOpcUaWorker::getSubscription(const QOpcUaMonitoringParameters &settings) { if (settings.shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) { + // Requesting multiple subscriptions with publishing interval < minimum publishing interval breaks subscription sharing + double interval = revisePublishingInterval(settings.publishingInterval(), m_minPublishingInterval); for (auto entry : qAsConst(m_subscriptions)) - if (entry->interval() == settings.publishingInterval() && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) + if (qFuzzyCompare(entry->interval(), interval) && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) return entry; } @@ -269,6 +273,8 @@ QFreeOpcUaSubscription *QFreeOpcUaWorker::getSubscription(const QOpcUaMonitoring delete sub; return nullptr; } + if (sub->interval() > settings.samplingInterval()) // The publishing interval has been revised by the server. + m_minPublishingInterval = sub->interval(); QObject::connect(sub, &QFreeOpcUaSubscription::timeout, this, &QFreeOpcUaWorker::handleSubscriptionTimeout, Qt::QueuedConnection); m_subscriptions[id] = sub; return sub; @@ -373,6 +379,14 @@ QFreeOpcUaSubscription *QFreeOpcUaWorker::getSubscriptionForItem(uintptr_t handl return subscription.value(); } +void QFreeOpcUaWorker::cleanupSubscriptions() +{ + qDeleteAll(m_subscriptions); + m_subscriptions.clear(); + m_attributeMapping.clear(); + m_minPublishingInterval = 0; +} + void QFreeOpcUaWorker::callMethod(uintptr_t handle, OpcUa::NodeId objectId, OpcUa::NodeId methodId, QVector<QOpcUa::TypedVariant> args) { try { diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h index c3edb7d..e65ca81 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h +++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h @@ -77,11 +77,14 @@ public slots: void handleSubscriptionTimeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); private: QFreeOpcUaSubscription *getSubscriptionForItem(uintptr_t handle, QOpcUa::NodeAttribute attr); + void cleanupSubscriptions(); QFreeOpcUaClientImpl *m_client; QHash<quint32, QFreeOpcUaSubscription *> m_subscriptions; QHash<uintptr_t, QHash<QOpcUa::NodeAttribute, QFreeOpcUaSubscription *>> m_attributeMapping; // Handle -> Attribute -> Subscription + + double m_minPublishingInterval; }; QT_END_NAMESPACE diff --git a/src/plugins/opcua/open62541/qopen62541backend.cpp b/src/plugins/opcua/open62541/qopen62541backend.cpp index 15d1166..93a568a 100644 --- a/src/plugins/opcua/open62541/qopen62541backend.cpp +++ b/src/plugins/opcua/open62541/qopen62541backend.cpp @@ -66,6 +66,7 @@ Open62541AsyncBackend::Open62541AsyncBackend(QOpen62541Client *parent) , m_useStateCallback(false) , m_subscriptionTimer(this) , m_sendPublishRequests(false) + , m_minPublishingInterval(0) { m_subscriptionTimer.setSingleShot(true); QObject::connect(&m_subscriptionTimer, &QTimer::timeout, @@ -74,7 +75,7 @@ Open62541AsyncBackend::Open62541AsyncBackend(QOpen62541Client *parent) Open62541AsyncBackend::~Open62541AsyncBackend() { - qDeleteAll(m_subscriptions); + cleanupSubscriptions(); if (m_uaclient) UA_Client_delete(m_uaclient); } @@ -280,8 +281,10 @@ void Open62541AsyncBackend::modifyMonitoring(uintptr_t handle, QOpcUa::NodeAttri QOpen62541Subscription *Open62541AsyncBackend::getSubscription(const QOpcUaMonitoringParameters &settings) { if (settings.shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) { + // Requesting multiple subscriptions with publishing interval < minimum publishing interval breaks subscription sharing + double interval = revisePublishingInterval(settings.publishingInterval(), m_minPublishingInterval); for (auto entry : qAsConst(m_subscriptions)) { - if (qFuzzyCompare(entry->interval(), settings.publishingInterval()) && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) + if (qFuzzyCompare(entry->interval(), interval) && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) return entry; } } @@ -293,6 +296,8 @@ QOpen62541Subscription *Open62541AsyncBackend::getSubscription(const QOpcUaMonit return nullptr; } m_subscriptions[id] = sub; + if (sub->interval() > settings.samplingInterval()) // The publishing interval has been revised by the server. + m_minPublishingInterval = sub->interval(); // This must be a queued connection to prevent the slot from being called while the client is inside UA_Client_runAsync(). QObject::connect(sub, &QOpen62541Subscription::timeout, this, &Open62541AsyncBackend::handleSubscriptionTimeout, Qt::QueuedConnection); return sub; @@ -429,11 +434,14 @@ static void clientStateCallback(UA_Client *client, UA_ClientState state) if (state == UA_CLIENTSTATE_DISCONNECTED) { emit backend->stateAndOrErrorChanged(QOpcUaClient::Disconnected, QOpcUaClient::ConnectionError); backend->m_useStateCallback = false; + backend->cleanupSubscriptions(); } } void Open62541AsyncBackend::connectToEndpoint(const QUrl &url) { + cleanupSubscriptions(); + if (m_uaclient) UA_Client_delete(m_uaclient); @@ -468,9 +476,7 @@ void Open62541AsyncBackend::connectToEndpoint(const QUrl &url) void Open62541AsyncBackend::disconnectFromEndpoint() { m_subscriptionTimer.stop(); - qDeleteAll(m_subscriptions); - m_subscriptions.clear(); - m_attributeMapping.clear(); + cleanupSubscriptions(); m_useStateCallback = false; @@ -500,9 +506,7 @@ void Open62541AsyncBackend::sendPublishRequest() if (UA_Client_runAsync(m_uaclient, 1) == UA_STATUSCODE_BADSERVERNOTCONNECTED) { qCWarning(QT_OPCUA_PLUGINS_OPEN62541) << "Unable to send publish request"; m_sendPublishRequests = false; - qDeleteAll(m_subscriptions); - m_subscriptions.clear(); - m_attributeMapping.clear(); + cleanupSubscriptions(); return; } @@ -549,4 +553,12 @@ QOpen62541Subscription *Open62541AsyncBackend::getSubscriptionForItem(uintptr_t return subscription.value(); } +void Open62541AsyncBackend::cleanupSubscriptions() +{ + qDeleteAll(m_subscriptions); + m_subscriptions.clear(); + m_attributeMapping.clear(); + m_minPublishingInterval = 0; +} + QT_END_NAMESPACE diff --git a/src/plugins/opcua/open62541/qopen62541backend.h b/src/plugins/opcua/open62541/qopen62541backend.h index d5d6743..3a4c1fa 100644 --- a/src/plugins/opcua/open62541/qopen62541backend.h +++ b/src/plugins/opcua/open62541/qopen62541backend.h @@ -72,6 +72,7 @@ public Q_SLOTS: void sendPublishRequest(); void modifyPublishRequests(); void handleSubscriptionTimeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); + void cleanupSubscriptions(); public: UA_Client *m_uaclient; @@ -88,6 +89,8 @@ private: QHash<uintptr_t, QHash<QOpcUa::NodeAttribute, QOpen62541Subscription *>> m_attributeMapping; // Handle -> Attribute -> Subscription bool m_sendPublishRequests; + + double m_minPublishingInterval; }; QT_END_NAMESPACE diff --git a/src/plugins/opcua/uacpp/quacppbackend.cpp b/src/plugins/opcua/uacpp/quacppbackend.cpp index e85150d..52ad6a5 100644 --- a/src/plugins/opcua/uacpp/quacppbackend.cpp +++ b/src/plugins/opcua/uacpp/quacppbackend.cpp @@ -57,6 +57,7 @@ Q_DECLARE_LOGGING_CATEGORY(QT_OPCUA_PLUGINS_UACPP) UACppAsyncBackend::UACppAsyncBackend(QUACppClient *parent) : QOpcUaBackend() , m_clientImpl(parent) + , m_minPublishingInterval(0) { QMutexLocker locker(&m_lifecycleMutex); if (!m_platformLayerInitialized) { @@ -456,8 +457,10 @@ void UACppAsyncBackend::callMethod(uintptr_t handle, const UaNodeId &objectId, c QUACppSubscription *UACppAsyncBackend::getSubscription(const QOpcUaMonitoringParameters &settings) { if (settings.shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) { + // Requesting multiple subscriptions with publishing interval < minimum publishing interval breaks subscription sharing + double interval = revisePublishingInterval(settings.publishingInterval(), m_minPublishingInterval); for (auto entry : qAsConst(m_subscriptions)) { - if (qFuzzyCompare(entry->interval(), settings.publishingInterval()) && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) + if (qFuzzyCompare(entry->interval(), interval) && entry->shared() == QOpcUaMonitoringParameters::SubscriptionType::Shared) return entry; } } @@ -468,6 +471,8 @@ QUACppSubscription *UACppAsyncBackend::getSubscription(const QOpcUaMonitoringPar delete sub; return nullptr; } + if (sub->interval() > settings.samplingInterval()) // The publishing interval has been revised by the server. + m_minPublishingInterval = sub->interval(); m_subscriptions[id] = sub; return sub; } @@ -489,6 +494,7 @@ void UACppAsyncBackend::cleanupSubscriptions() qDeleteAll(m_subscriptions); m_subscriptions.clear(); m_attributeMapping.clear(); + m_minPublishingInterval = 0; } bool UACppAsyncBackend::removeSubscription(quint32 subscriptionId) diff --git a/src/plugins/opcua/uacpp/quacppbackend.h b/src/plugins/opcua/uacpp/quacppbackend.h index 07ac784..a96db60 100644 --- a/src/plugins/opcua/uacpp/quacppbackend.h +++ b/src/plugins/opcua/uacpp/quacppbackend.h @@ -75,6 +75,7 @@ public: static quint32 m_numClients; static bool m_platformLayerInitialized; QMutex m_lifecycleMutex; + double m_minPublishingInterval; }; QT_END_NAMESPACE diff --git a/tests/auto/qopcuaclient/tst_client.cpp b/tests/auto/qopcuaclient/tst_client.cpp index aa601dd..7bf8736 100644 --- a/tests/auto/qopcuaclient/tst_client.cpp +++ b/tests/auto/qopcuaclient/tst_client.cpp @@ -206,6 +206,8 @@ private slots: void dataChangeSubscription(); defineDataMethod(dataChangeSubscriptionInvalidNode_data) void dataChangeSubscriptionInvalidNode(); + defineDataMethod(dataChangeSubscriptionSharing_data) + void dataChangeSubscriptionSharing(); defineDataMethod(methodCall_data) void methodCall(); defineDataMethod(methodCallInvalid_data) @@ -743,6 +745,67 @@ void Tst_QOpcUaClient::dataChangeSubscriptionInvalidNode() QVERIFY(noDataNode->monitoringStatus(QOpcUa::NodeAttribute::Value).subscriptionId() == 0); } +void Tst_QOpcUaClient::dataChangeSubscriptionSharing() +{ + // The open62541 test server has a minimum publishing interval of 100ms. + // This test verifies that monitorings with smaller requested publishing interval and shared flag + // share the same subscription. + + QFETCH(QOpcUaClient *, opcuaClient); + OpcuaConnector connector(opcuaClient, m_endpoint); + + QScopedPointer<QOpcUaNode> node(opcuaClient->node(readWriteNode)); + QVERIFY(node != 0); + QSignalSpy monitoringEnabledSpy(node.data(), &QOpcUaNode::enableMonitoringFinished); + + node->enableMonitoring(QOpcUa::NodeAttribute::Value, QOpcUaMonitoringParameters(50)); + monitoringEnabledSpy.wait(); + + QVERIFY(monitoringEnabledSpy.size() == 1); + QVERIFY(monitoringEnabledSpy.at(0).at(0).value<QOpcUa::NodeAttribute>() == QOpcUa::NodeAttribute::Value); + QVERIFY(node->monitoringStatus(QOpcUa::NodeAttribute::Value).statusCode() == QOpcUa::UaStatusCode::Good); + + QOpcUaMonitoringParameters valueStatus = node->monitoringStatus(QOpcUa::NodeAttribute::Value); + QVERIFY(valueStatus.subscriptionId() != 0); + QVERIFY(valueStatus.statusCode() == QOpcUa::UaStatusCode::Good); + + monitoringEnabledSpy.clear(); + + node->enableMonitoring(QOpcUa::NodeAttribute::DisplayName, QOpcUaMonitoringParameters(25)); + monitoringEnabledSpy.wait(); + + QVERIFY(monitoringEnabledSpy.size() == 1); + QVERIFY(monitoringEnabledSpy.at(0).at(0).value<QOpcUa::NodeAttribute>() == QOpcUa::NodeAttribute::DisplayName); + QVERIFY(node->monitoringStatus(QOpcUa::NodeAttribute::DisplayName).statusCode() == QOpcUa::UaStatusCode::Good); + + QOpcUaMonitoringParameters displayNameStatus = node->monitoringStatus(QOpcUa::NodeAttribute::DisplayName); + QVERIFY(displayNameStatus.subscriptionId() == valueStatus.subscriptionId()); + QVERIFY(displayNameStatus.statusCode() == QOpcUa::UaStatusCode::Good); + + QVERIFY(valueStatus.subscriptionId() == displayNameStatus.subscriptionId()); + QCOMPARE(valueStatus.publishingInterval(), displayNameStatus.publishingInterval()); + QCOMPARE(valueStatus.publishingInterval(), 100.0); + + QSignalSpy monitoringDisabledSpy(node.data(), &QOpcUaNode::disableMonitoringFinished); + + node->disableMonitoring(QOpcUa::NodeAttribute::Value | QOpcUa::NodeAttribute::DisplayName | QOpcUa::NodeAttribute::NodeId); + monitoringDisabledSpy.wait(); + if (monitoringDisabledSpy.size() < 2) + monitoringDisabledSpy.wait(); + + QVERIFY(monitoringDisabledSpy.size() == 2); + + QVector<QOpcUa::NodeAttribute> attrs = {QOpcUa::NodeAttribute::Value, QOpcUa::NodeAttribute::DisplayName}; + for (auto it : qAsConst(monitoringDisabledSpy)) { + auto temp = it.at(0).value<QOpcUa::NodeAttribute>(); + QVERIFY(attrs.contains(temp)); + QVERIFY(node->monitoringStatus(temp).subscriptionId() == 0); + QVERIFY(node->monitoringStatus(temp).statusCode() == QOpcUa::UaStatusCode::BadAttributeIdInvalid); + attrs.removeOne(temp); + } + QVERIFY(attrs.size() == 0); +} + void Tst_QOpcUaClient::methodCall() { QFETCH(QOpcUaClient *, opcuaClient); |