summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJannis Voelker <jannis.voelker@basyskom.com>2018-02-22 08:26:09 +0100
committerFrank Meerkoetter <frank.meerkoetter@basyskom.com>2018-03-23 19:54:25 +0000
commit0741b0094a4a5a32f5e6e0d657c2fa943cb1ffb6 (patch)
tree1978a73de94c9f829ac4cfb065f77bc0edb2f2a7
parent55be10dcbb6cd78568b61fc73e3fb6a1f4af6b6a (diff)
Remove subscriptions on timeout (open62541, freeopcua)
If a subscription timeout is detected, the subscription is deleted and the associated monitored item information is removed from the backend to allow re-adding the item. A disableMonitoringFinished signal with BadTimeout is generated for each monitoring on the subscription. Change-Id: Ibe28873fec140faf24b1ebcd61a0ea5ef2eb6cea Reviewed-by: Frank Meerkoetter <frank.meerkoetter@basyskom.com>
-rw-r--r--src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp16
-rw-r--r--src/plugins/opcua/freeopcua/qfreeopcuasubscription.h9
-rw-r--r--src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp13
-rw-r--r--src/plugins/opcua/freeopcua/qfreeopcuaworker.h1
-rw-r--r--src/plugins/opcua/open62541/qopen62541backend.cpp16
-rw-r--r--src/plugins/opcua/open62541/qopen62541backend.h1
-rw-r--r--src/plugins/opcua/open62541/qopen62541subscription.cpp39
-rw-r--r--src/plugins/opcua/open62541/qopen62541subscription.h9
8 files changed, 92 insertions, 12 deletions
diff --git a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp
index 1005782..f15c9b6 100644
--- a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp
+++ b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.cpp
@@ -50,6 +50,7 @@ QFreeOpcUaSubscription::QFreeOpcUaSubscription(QFreeOpcUaWorker *backend, const
: m_interval(settings.publishingInterval())
, m_shared(settings.shared())
, m_backend(backend)
+ , m_timeout(false)
{
Q_ASSERT(m_backend);
}
@@ -90,7 +91,7 @@ bool QFreeOpcUaSubscription::removeOnServer()
for (auto it : qAsConst(m_itemIdToItemMapping)) {
QOpcUaMonitoringParameters s;
- s.setStatusCode(QOpcUa::UaStatusCode::BadDisconnect);
+ s.setStatusCode(m_timeout ? QOpcUa::UaStatusCode::BadTimeout : QOpcUa::UaStatusCode::BadDisconnect);
emit m_backend->monitoringEnableDisable(it->handle, it->attr, false, s);
}
@@ -104,8 +105,17 @@ bool QFreeOpcUaSubscription::removeOnServer()
void QFreeOpcUaSubscription::StatusChange(OpcUa::StatusCode status)
{
- if (status == OpcUa::StatusCode::BadDisconnect)
- removeOnServer();
+ if (status != OpcUa::StatusCode::BadTimeout)
+ return;
+
+ QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items;
+ for (auto it : qAsConst(m_handleToItemMapping)) {
+ for (auto item : it) {
+ items.push_back({item->handle, item->attr});
+ }
+ }
+ m_timeout = true;
+ emit timeout(this, items);
}
void QFreeOpcUaSubscription::modifyMonitoring(uintptr_t handle, QOpcUa::NodeAttribute attr, QOpcUaMonitoringParameters::Parameter item, QVariant value)
diff --git a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h
index 47264de..b8bf8e9 100644
--- a/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h
+++ b/src/plugins/opcua/freeopcua/qfreeopcuasubscription.h
@@ -47,8 +47,10 @@ QT_BEGIN_NAMESPACE
class QFreeOpcUaWorker;
-class QFreeOpcUaSubscription : public OpcUa::SubscriptionHandler
+class QFreeOpcUaSubscription : public QObject, public OpcUa::SubscriptionHandler
{
+ Q_OBJECT
+
public:
QFreeOpcUaSubscription(QFreeOpcUaWorker *backend, const QOpcUaMonitoringParameters &settings);
~QFreeOpcUaSubscription() override;
@@ -88,6 +90,9 @@ public:
{}
};
+signals:
+ void timeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items);
+
private:
MonitoredItem *getItemForAttribute(uintptr_t handle, QOpcUa::NodeAttribute attr);
@@ -99,6 +104,8 @@ private:
QHash<quint32, MonitoredItem *> m_itemIdToItemMapping;
QHash<uintptr_t, QHash<QOpcUa::NodeAttribute, MonitoredItem *>> m_handleToItemMapping;
+
+ bool m_timeout;
};
QT_END_NAMESPACE
diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp
index 2edce97..2f6ebd6 100644
--- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp
+++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.cpp
@@ -269,6 +269,7 @@ QFreeOpcUaSubscription *QFreeOpcUaWorker::getSubscription(const QOpcUaMonitoring
delete sub;
return nullptr;
}
+ QObject::connect(sub, &QFreeOpcUaSubscription::timeout, this, &QFreeOpcUaWorker::handleSubscriptionTimeout, Qt::QueuedConnection);
m_subscriptions[id] = sub;
return sub;
}
@@ -411,4 +412,16 @@ void QFreeOpcUaWorker::callMethod(uintptr_t handle, OpcUa::NodeId objectId, OpcU
}
}
+void QFreeOpcUaWorker::handleSubscriptionTimeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute> > items)
+{
+ for (auto it : items) {
+ auto item = m_attributeMapping.find(it.first);
+ if (item == m_attributeMapping.end())
+ continue;
+ item->remove(it.second);
+ }
+ m_subscriptions.remove(sub->subscriptionId());
+ delete sub;
+}
+
QT_END_NAMESPACE
diff --git a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h
index 7a28fb9..c3edb7d 100644
--- a/src/plugins/opcua/freeopcua/qfreeopcuaworker.h
+++ b/src/plugins/opcua/freeopcua/qfreeopcuaworker.h
@@ -74,6 +74,7 @@ public slots:
void modifyMonitoring(uintptr_t handle, QOpcUa::NodeAttribute attr, QOpcUaMonitoringParameters::Parameter item, QVariant value);
void callMethod(uintptr_t handle, OpcUa::NodeId objectId, OpcUa::NodeId methodId, QVector<QOpcUa::TypedVariant> args);
+ void handleSubscriptionTimeout(QFreeOpcUaSubscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items);
private:
QFreeOpcUaSubscription *getSubscriptionForItem(uintptr_t handle, QOpcUa::NodeAttribute attr);
diff --git a/src/plugins/opcua/open62541/qopen62541backend.cpp b/src/plugins/opcua/open62541/qopen62541backend.cpp
index 63289c4..29fe8d6 100644
--- a/src/plugins/opcua/open62541/qopen62541backend.cpp
+++ b/src/plugins/opcua/open62541/qopen62541backend.cpp
@@ -289,6 +289,8 @@ QOpen62541Subscription *Open62541AsyncBackend::getSubscription(const QOpcUaMonit
return nullptr;
}
m_subscriptions[id] = sub;
+ // This must be a queued connection to prevent the slot from being called while the client is inside UA_Client_runAsync().
+ QObject::connect(sub, &QOpen62541Subscription::timeout, this, &Open62541AsyncBackend::handleSubscriptionTimeout, Qt::QueuedConnection);
return sub;
}
@@ -299,6 +301,7 @@ bool Open62541AsyncBackend::removeSubscription(UA_UInt32 subscriptionId)
sub.value()->removeOnServer();
delete sub.value();
m_subscriptions.remove(subscriptionId);
+ modifyPublishRequests();
return true;
}
return false;
@@ -505,6 +508,19 @@ void Open62541AsyncBackend::modifyPublishRequests()
sendPublishRequest();
}
+void Open62541AsyncBackend::handleSubscriptionTimeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items)
+{
+ for (auto it : qAsConst(items)) {
+ auto item = m_attributeMapping.find(it.first);
+ if (item == m_attributeMapping.end())
+ continue;
+ item->remove(it.second);
+ }
+ m_subscriptions.remove(sub->subscriptionId());
+ delete sub;
+ modifyPublishRequests();
+}
+
QOpen62541Subscription *Open62541AsyncBackend::getSubscriptionForItem(uintptr_t handle, QOpcUa::NodeAttribute attr)
{
auto nodeEntry = m_attributeMapping.find(handle);
diff --git a/src/plugins/opcua/open62541/qopen62541backend.h b/src/plugins/opcua/open62541/qopen62541backend.h
index 1a2a46a..d5d6743 100644
--- a/src/plugins/opcua/open62541/qopen62541backend.h
+++ b/src/plugins/opcua/open62541/qopen62541backend.h
@@ -71,6 +71,7 @@ public Q_SLOTS:
bool removeSubscription(UA_UInt32 subscriptionId);
void sendPublishRequest();
void modifyPublishRequests();
+ void handleSubscriptionTimeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items);
public:
UA_Client *m_uaclient;
diff --git a/src/plugins/opcua/open62541/qopen62541subscription.cpp b/src/plugins/opcua/open62541/qopen62541subscription.cpp
index 36332f9..e184186 100644
--- a/src/plugins/opcua/open62541/qopen62541subscription.cpp
+++ b/src/plugins/opcua/open62541/qopen62541subscription.cpp
@@ -56,6 +56,18 @@ static void monitoredValueHandler(UA_Client *client, UA_UInt32 subId, void *subC
subscription->monitoredValueUpdated(monId, value);
}
+static void stateChangeHandler(UA_Client *client, UA_UInt32 subId, void *subContext, UA_StatusChangeNotification *notification)
+{
+ Q_UNUSED(client);
+ Q_UNUSED(subId);
+
+ if (notification->status != UA_STATUSCODE_BADTIMEOUT)
+ return;
+
+ QOpen62541Subscription *sub = static_cast<QOpen62541Subscription *>(subContext);
+ sub->sendTimeoutNotification();
+}
+
QOpen62541Subscription::QOpen62541Subscription(Open62541AsyncBackend *backend, const QOpcUaMonitoringParameters &settings)
: m_backend(backend)
, m_interval(settings.publishingInterval())
@@ -66,6 +78,7 @@ QOpen62541Subscription::QOpen62541Subscription(Open62541AsyncBackend *backend, c
, m_priority(settings.priority())
, m_maxNotificationsPerPublish(settings.maxNotificationsPerPublish())
, m_clientHandle(0)
+ , m_timeout(false)
{
}
@@ -82,7 +95,7 @@ UA_UInt32 QOpen62541Subscription::createOnServer()
req.requestedMaxKeepAliveCount = m_maxKeepaliveCount;
req.priority = m_priority;
req.maxNotificationsPerPublish = m_maxNotificationsPerPublish;
- UA_CreateSubscriptionResponse res = UA_Client_Subscriptions_create(m_backend->m_uaclient, req, this, NULL, NULL);
+ UA_CreateSubscriptionResponse res = UA_Client_Subscriptions_create(m_backend->m_uaclient, req, this, stateChangeHandler, NULL);
if (res.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
qCWarning(QT_OPCUA_PLUGINS_OPEN62541) << "Could not create subscription with interval" << m_interval << UA_StatusCode_name(res.responseHeader.serviceResult);
@@ -98,15 +111,15 @@ UA_UInt32 QOpen62541Subscription::createOnServer()
bool QOpen62541Subscription::removeOnServer()
{
- if (m_subscriptionId == 0)
- return false;
-
- UA_StatusCode res = UA_Client_Subscriptions_deleteSingle(m_backend->m_uaclient, m_subscriptionId);
- m_subscriptionId = 0;
+ UA_StatusCode res = UA_STATUSCODE_GOOD;
+ if (m_subscriptionId) {
+ res = UA_Client_Subscriptions_deleteSingle(m_backend->m_uaclient, m_subscriptionId);
+ m_subscriptionId = 0;
+ }
for (auto it : qAsConst(m_itemIdToItemMapping)) {
QOpcUaMonitoringParameters s;
- s.setStatusCode(QOpcUa::UaStatusCode::BadDisconnect);
+ s.setStatusCode(m_timeout ? QOpcUa::UaStatusCode::BadTimeout : QOpcUa::UaStatusCode::BadDisconnect);
emit m_backend->monitoringEnableDisable(it->handle, it->attr, false, s);
}
@@ -310,6 +323,18 @@ void QOpen62541Subscription::monitoredValueUpdated(UA_UInt32 monId, UA_DataValue
emit m_backend->attributeUpdated(item.value()->handle, res);
}
+void QOpen62541Subscription::sendTimeoutNotification()
+{
+ QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items;
+ for (auto it : qAsConst(m_handleToItemMapping)) {
+ for (auto item : it) {
+ items.push_back({item->handle, item->attr});
+ }
+ }
+ emit timeout(this, items);
+ m_timeout = true;
+}
+
double QOpen62541Subscription::interval() const
{
return m_interval;
diff --git a/src/plugins/opcua/open62541/qopen62541subscription.h b/src/plugins/opcua/open62541/qopen62541subscription.h
index 58dd407..a572817 100644
--- a/src/plugins/opcua/open62541/qopen62541subscription.h
+++ b/src/plugins/opcua/open62541/qopen62541subscription.h
@@ -44,8 +44,10 @@ QT_BEGIN_NAMESPACE
class Open62541AsyncBackend;
-class QOpen62541Subscription
+class QOpen62541Subscription : public QObject
{
+ Q_OBJECT
+
public:
QOpen62541Subscription(Open62541AsyncBackend *backend, const QOpcUaMonitoringParameters &settings);
~QOpen62541Subscription();
@@ -59,6 +61,7 @@ public:
bool removeAttributeMonitoredItem(uintptr_t handle, QOpcUa::NodeAttribute attr);
void monitoredValueUpdated(UA_UInt32 monId, UA_DataValue *value);
+ void sendTimeoutNotification();
struct MonitoredItem {
uintptr_t handle;
@@ -83,6 +86,9 @@ public:
QOpcUaMonitoringParameters::SubscriptionType shared() const;
+signals:
+ void timeout(QOpen62541Subscription *sub, QVector<QPair<uintptr_t, QOpcUa::NodeAttribute>> items);
+
private:
MonitoredItem *getItemForAttribute(uintptr_t handle, QOpcUa::NodeAttribute attr);
UA_ExtensionObject createFilter(const QVariant &filterData);
@@ -103,6 +109,7 @@ private:
QHash<UA_UInt32, MonitoredItem *> m_itemIdToItemMapping; // ItemId -> Item for fast lookup on data change
quint32 m_clientHandle;
+ bool m_timeout;
};
QT_END_NAMESPACE