diff options
author | Jannis Voelker <jannis.voelker@basyskom.com> | 2018-02-22 08:26:09 +0100 |
---|---|---|
committer | Frank Meerkoetter <frank.meerkoetter@basyskom.com> | 2018-03-23 19:54:25 +0000 |
commit | 0741b0094a4a5a32f5e6e0d657c2fa943cb1ffb6 (patch) | |
tree | 1978a73de94c9f829ac4cfb065f77bc0edb2f2a7 | |
parent | 55be10dcbb6cd78568b61fc73e3fb6a1f4af6b6a (diff) |
Remove subscriptions on timeout (open62541, freeopcua)
If a subscription timeout is detected, the subscription is
deleted and the associated monitored item information is
removed from the backend to allow re-adding the item.
A disableMonitoringFinished signal with BadTimeout is
generated for each monitoring on the subscription.
Change-Id: Ibe28873fec140faf24b1ebcd61a0ea5ef2eb6cea
Reviewed-by: Frank Meerkoetter <frank.meerkoetter@basyskom.com>
8 files changed, 92 insertions, 12 deletions
diff --git a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp index 1005782..f15c9b6 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp +++ b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp @@ -50,6 +50,7 @@ QFreeOpcUaSubscription::QFreeOpcUaSubscription(QFreeOpcUaWorker *backend, const : m_interval(settings.publishingInterval()) , m_shared(settings.shared()) , m_backend(backend) + , m_timeout(false) { Q_ASSERT(m_backend); } @@ -90,7 +91,7 @@ bool QFreeOpcUaSubscription::removeOnServer() for (auto it : qAsConst(m_itemIdToItemMapping)) { QOpcUaMonitoringParameters s; - s.setStatusCode(QOpcUa::UaStatusCode::BadDisconnect); + s.setStatusCode(m_timeout ? QOpcUa::UaStatusCode::BadTimeout : QOpcUa::UaStatusCode::BadDisconnect); emit m_backend->monitoringEnableDisable(it->handle, it->attr, false, s); } @@ -104,8 +105,17 @@ bool QFreeOpcUaSubscription::removeOnServer() void QFreeOpcUaSubscription::StatusChange(OpcUa::StatusCode status) { - if (status == OpcUa::StatusCode::BadDisconnect) - removeOnServer(); + if (status != OpcUa::StatusCode::BadTimeout) + return; + + QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items; + for (auto it : qAsConst(m_handleToItemMapping)) { + for (auto item : it) { + items.push_back({item->handle, item->attr}); + } + } + m_timeout = true; + emit timeout(this, items); } void QFreeOpcUaSubscription::modifyMonitoring(uintptr_t handle, QOpcUa::NodeAttribute attr, QOpcUaMonitoringParameters::Parameter item, QVariant value) diff --git a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h index 47264de..b8bf8e9 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h +++ b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h @@ -47,8 +47,10 @@ QT_BEGIN_NAMESPACE class QFreeOpcUaWorker; -class QFreeOpcUaSubscription : public OpcUa::SubscriptionHandler +class QFreeOpcUaSubscription : public QObject, public OpcUa::SubscriptionHandler { + Q_OBJECT + public: QFreeOpcUaSubscription(QFreeOpcUaWorker *backend, const QOpcUaMonitoringParameters &settings); ~QFreeOpcUaSubscription() override; @@ -88,6 +90,9 @@ public: {} }; +signals: + void timeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); + private: MonitoredItem *getItemForAttribute(uintptr_t handle, QOpcUa::NodeAttribute attr); @@ -99,6 +104,8 @@ private: QHash<quint32, MonitoredItem *> m_itemIdToItemMapping; QHash<uintptr_t, QHash<QOpcUa::NodeAttribute, MonitoredItem *>> m_handleToItemMapping; + + bool m_timeout; }; QT_END_NAMESPACE diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp index 2edce97..2f6ebd6 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp +++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp @@ -269,6 +269,7 @@ QFreeOpcUaSubscription *QFreeOpcUaWorker::getSubscription(const QOpcUaMonitoring delete sub; return nullptr; } + QObject::connect(sub, &QFreeOpcUaSubscription::timeout, this, &QFreeOpcUaWorker::handleSubscriptionTimeout, Qt::QueuedConnection); m_subscriptions[id] = sub; return sub; } @@ -411,4 +412,16 @@ void QFreeOpcUaWorker::callMethod(uintptr_t handle, OpcUa::NodeId objectId, OpcU } } +void QFreeOpcUaWorker::handleSubscriptionTimeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute> > items) +{ + for (auto it : items) { + auto item = m_attributeMapping.find(it.first); + if (item == m_attributeMapping.end()) + continue; + item->remove(it.second); + } + m_subscriptions.remove(sub->subscriptionId()); + delete sub; +} + QT_END_NAMESPACE diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h index 7a28fb9..c3edb7d 100644 --- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h +++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h @@ -74,6 +74,7 @@ public slots: void modifyMonitoring(uintptr_t handle, QOpcUa::NodeAttribute attr, QOpcUaMonitoringParameters::Parameter item, QVariant value); void callMethod(uintptr_t handle, OpcUa::NodeId objectId, OpcUa::NodeId methodId, QVector<QOpcUa::TypedVariant> args); + void handleSubscriptionTimeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); private: QFreeOpcUaSubscription *getSubscriptionForItem(uintptr_t handle, QOpcUa::NodeAttribute attr); diff --git a/src/plugins/opcua/open62541/qopen62541backend.cpp b/src/plugins/opcua/open62541/qopen62541backend.cpp index 63289c4..29fe8d6 100644 --- a/src/plugins/opcua/open62541/qopen62541backend.cpp +++ b/src/plugins/opcua/open62541/qopen62541backend.cpp @@ -289,6 +289,8 @@ QOpen62541Subscription *Open62541AsyncBackend::getSubscription(const QOpcUaMonit return nullptr; } m_subscriptions[id] = sub; + // This must be a queued connection to prevent the slot from being called while the client is inside UA_Client_runAsync(). + QObject::connect(sub, &QOpen62541Subscription::timeout, this, &Open62541AsyncBackend::handleSubscriptionTimeout, Qt::QueuedConnection); return sub; } @@ -299,6 +301,7 @@ bool Open62541AsyncBackend::removeSubscription(UA_UInt32 subscriptionId) sub.value()->removeOnServer(); delete sub.value(); m_subscriptions.remove(subscriptionId); + modifyPublishRequests(); return true; } return false; @@ -505,6 +508,19 @@ void Open62541AsyncBackend::modifyPublishRequests() sendPublishRequest(); } +void Open62541AsyncBackend::handleSubscriptionTimeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items) +{ + for (auto it : qAsConst(items)) { + auto item = m_attributeMapping.find(it.first); + if (item == m_attributeMapping.end()) + continue; + item->remove(it.second); + } + m_subscriptions.remove(sub->subscriptionId()); + delete sub; + modifyPublishRequests(); +} + QOpen62541Subscription *Open62541AsyncBackend::getSubscriptionForItem(uintptr_t handle, QOpcUa::NodeAttribute attr) { auto nodeEntry = m_attributeMapping.find(handle); diff --git a/src/plugins/opcua/open62541/qopen62541backend.h b/src/plugins/opcua/open62541/qopen62541backend.h index 1a2a46a..d5d6743 100644 --- a/src/plugins/opcua/open62541/qopen62541backend.h +++ b/src/plugins/opcua/open62541/qopen62541backend.h @@ -71,6 +71,7 @@ public Q_SLOTS: bool removeSubscription(UA_UInt32 subscriptionId); void sendPublishRequest(); void modifyPublishRequests(); + void handleSubscriptionTimeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); public: UA_Client *m_uaclient; diff --git a/src/plugins/opcua/open62541/qopen62541subscription.cpp b/src/plugins/opcua/open62541/qopen62541subscription.cpp index 36332f9..e184186 100644 --- a/src/plugins/opcua/open62541/qopen62541subscription.cpp +++ b/src/plugins/opcua/open62541/qopen62541subscription.cpp @@ -56,6 +56,18 @@ static void monitoredValueHandler(UA_Client *client, UA_UInt32 subId, void *subC subscription->monitoredValueUpdated(monId, value); } +static void stateChangeHandler(UA_Client *client, UA_UInt32 subId, void *subContext, UA_StatusChangeNotification *notification) +{ + Q_UNUSED(client); + Q_UNUSED(subId); + + if (notification->status != UA_STATUSCODE_BADTIMEOUT) + return; + + QOpen62541Subscription *sub = static_cast<QOpen62541Subscription *>(subContext); + sub->sendTimeoutNotification(); +} + QOpen62541Subscription::QOpen62541Subscription(Open62541AsyncBackend *backend, const QOpcUaMonitoringParameters &settings) : m_backend(backend) , m_interval(settings.publishingInterval()) @@ -66,6 +78,7 @@ QOpen62541Subscription::QOpen62541Subscription(Open62541AsyncBackend *backend, c , m_priority(settings.priority()) , m_maxNotificationsPerPublish(settings.maxNotificationsPerPublish()) , m_clientHandle(0) + , m_timeout(false) { } @@ -82,7 +95,7 @@ UA_UInt32 QOpen62541Subscription::createOnServer() req.requestedMaxKeepAliveCount = m_maxKeepaliveCount; req.priority = m_priority; req.maxNotificationsPerPublish = m_maxNotificationsPerPublish; - UA_CreateSubscriptionResponse res = UA_Client_Subscriptions_create(m_backend->m_uaclient, req, this, NULL, NULL); + UA_CreateSubscriptionResponse res = UA_Client_Subscriptions_create(m_backend->m_uaclient, req, this, stateChangeHandler, NULL); if (res.responseHeader.serviceResult != UA_STATUSCODE_GOOD) { qCWarning(QT_OPCUA_PLUGINS_OPEN62541) << "Could not create subscription with interval" << m_interval << UA_StatusCode_name(res.responseHeader.serviceResult); @@ -98,15 +111,15 @@ UA_UInt32 QOpen62541Subscription::createOnServer() bool QOpen62541Subscription::removeOnServer() { - if (m_subscriptionId == 0) - return false; - - UA_StatusCode res = UA_Client_Subscriptions_deleteSingle(m_backend->m_uaclient, m_subscriptionId); - m_subscriptionId = 0; + UA_StatusCode res = UA_STATUSCODE_GOOD; + if (m_subscriptionId) { + res = UA_Client_Subscriptions_deleteSingle(m_backend->m_uaclient, m_subscriptionId); + m_subscriptionId = 0; + } for (auto it : qAsConst(m_itemIdToItemMapping)) { QOpcUaMonitoringParameters s; - s.setStatusCode(QOpcUa::UaStatusCode::BadDisconnect); + s.setStatusCode(m_timeout ? QOpcUa::UaStatusCode::BadTimeout : QOpcUa::UaStatusCode::BadDisconnect); emit m_backend->monitoringEnableDisable(it->handle, it->attr, false, s); } @@ -310,6 +323,18 @@ void QOpen62541Subscription::monitoredValueUpdated(UA_UInt32 monId, UA_DataValue emit m_backend->attributeUpdated(item.value()->handle, res); } +void QOpen62541Subscription::sendTimeoutNotification() +{ + QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items; + for (auto it : qAsConst(m_handleToItemMapping)) { + for (auto item : it) { + items.push_back({item->handle, item->attr}); + } + } + emit timeout(this, items); + m_timeout = true; +} + double QOpen62541Subscription::interval() const { return m_interval; diff --git a/src/plugins/opcua/open62541/qopen62541subscription.h b/src/plugins/opcua/open62541/qopen62541subscription.h index 58dd407..a572817 100644 --- a/src/plugins/opcua/open62541/qopen62541subscription.h +++ b/src/plugins/opcua/open62541/qopen62541subscription.h @@ -44,8 +44,10 @@ QT_BEGIN_NAMESPACE class Open62541AsyncBackend; -class QOpen62541Subscription +class QOpen62541Subscription : public QObject { + Q_OBJECT + public: QOpen62541Subscription(Open62541AsyncBackend *backend, const QOpcUaMonitoringParameters &settings); ~QOpen62541Subscription(); @@ -59,6 +61,7 @@ public: bool removeAttributeMonitoredItem(uintptr_t handle, QOpcUa::NodeAttribute attr); void monitoredValueUpdated(UA_UInt32 monId, UA_DataValue *value); + void sendTimeoutNotification(); struct MonitoredItem { uintptr_t handle; @@ -83,6 +86,9 @@ public: QOpcUaMonitoringParameters::SubscriptionType shared() const; +signals: + void timeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items); + private: MonitoredItem *getItemForAttribute(uintptr_t handle, QOpcUa::NodeAttribute attr); UA_ExtensionObject createFilter(const QVariant &filterData); @@ -103,6 +109,7 @@ private: QHash<UA_UInt32, MonitoredItem *> m_itemIdToItemMapping; // ItemId -> Item for fast lookup on data change quint32 m_clientHandle; + bool m_timeout; }; QT_END_NAMESPACE |