Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Durable Subscriptions #2978

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/buildandtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Build and Test .NET 9.0
on:
push:
pull_request:
branches: [ master, main ]
branches: [ master, main, develop/* ]
paths:
- '**.cs'
- '**.csproj'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: "CodeQL"

on:
push:
branches: [ master, main, release/* ]
branches: [ master, main, release/*, develop/* ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, main ]
Expand Down
221 changes: 209 additions & 12 deletions Applications/ConsoleReferenceClient/ClientSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
Expand Down Expand Up @@ -69,6 +70,13 @@ public ClientSamples(TextWriter output, Action<IList, IList> validateResponse, M
m_validateResponse = validateResponse ?? ClientBase.ValidateResponse;
m_quitEvent = quitEvent;
m_verbose = verbose;
m_desiredEventFields = new Dictionary<int, QualifiedNameCollection>();
int eventIndexCounter = 0;
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Time }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.ActiveState }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Message }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.CurrentState }));
m_desiredEventFields.Add(eventIndexCounter++, new QualifiedNameCollection(new QualifiedName[] { BrowseNames.LimitState, BrowseNames.LastTransition }));
}

#region Public Sample Methods
Expand Down Expand Up @@ -290,34 +298,107 @@ public void CallMethod(ISession session)
}

/// <summary>
/// Create Subscription and MonitoredItems for DataChanges
/// Call the Start method for Alarming to enable events
/// </summary>
public void SubscribeToDataChanges(ISession session, uint minLifeTime)
public void EnableEvents(ISession session, uint timeToRun)
{
if (session == null || session.Connected == false)
{
m_output.WriteLine("Session not connected!");
return;
}

try
{
// Define the UA Method to call
// Parent node - Objects\CTT\Alarms
// Method node - Objects\CTT\Alarms\Start
NodeId objectId = new NodeId("ns=7;s=Alarms");
NodeId methodId = new NodeId("ns=7;s=Alarms.Start");

// Define the method parameters
// Input argument requires a Float and an UInt32 value
object[] inputArguments = new object[] { timeToRun };
IList<object> outputArguments = null;

// Invoke Call service
m_output.WriteLine("Calling UAMethod for node {0} ...", methodId);
outputArguments = session.Call(objectId, methodId, inputArguments);

// Display results
m_output.WriteLine("Method call returned {0} output argument(s):", outputArguments.Count);

foreach (var outputArgument in outputArguments)
{
m_output.WriteLine(" OutputValue = {0}", outputArgument.ToString());
}
}
catch (Exception ex)
{
m_output.WriteLine("Method call error: {0}", ex.Message);
}
}

/// <summary>
/// Create Subscription and MonitoredItems for DataChanges
/// </summary>
public bool SubscribeToDataChanges(ISession session, uint minLifeTime, bool enableDurableSubscriptions)
{
bool isDurable = false;

if (session == null || session.Connected == false)
{
m_output.WriteLine("Session not connected!");
return isDurable;
}

try
{
// Create a subscription for receiving data change notifications
int subscriptionPublishingInterval = 1000;
int itemSamplingInterval = 1000;
uint queueSize = 10;
uint lifetime = minLifeTime;

if (enableDurableSubscriptions)
{
queueSize = 100;
lifetime = 20;
}

// Define Subscription parameters
Subscription subscription = new Subscription(session.DefaultSubscription) {
DisplayName = "Console ReferenceClient Subscription",
PublishingEnabled = true,
PublishingInterval = 1000,
PublishingInterval = subscriptionPublishingInterval,
LifetimeCount = 0,
MinLifetimeInterval = minLifeTime,
MinLifetimeInterval = lifetime,
KeepAliveCount = 5,
};

session.AddSubscription(subscription);

// Create the subscription on Server side
subscription.Create();
m_output.WriteLine("New Subscription created with SubscriptionId = {0}.", subscription.Id);
m_output.WriteLine("New Subscription created with SubscriptionId = {0}, Sampling Interval {1}, Publishing Interval {2}.",
subscription.Id, itemSamplingInterval, subscriptionPublishingInterval);

if (enableDurableSubscriptions)
{
uint revisedLifetimeInHours = 0;

if (subscription.SetSubscriptionDurable(1, out revisedLifetimeInHours))
{
isDurable = true;

m_output.WriteLine("Subscription {0} is now durable, Revised Lifetime {1} in hours.",
subscription.Id, revisedLifetimeInHours);
}
else
{
m_output.WriteLine("Subscription {0} failed durable call", subscription.Id);
}
}

// Create MonitoredItems for data changes (Reference Server)

Expand All @@ -326,8 +407,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
intMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Int32");
intMonitoredItem.AttributeId = Attributes.Value;
intMonitoredItem.DisplayName = "Int32 Variable";
intMonitoredItem.SamplingInterval = 1000;
intMonitoredItem.QueueSize = 10;
intMonitoredItem.SamplingInterval = itemSamplingInterval;
intMonitoredItem.QueueSize = queueSize;
intMonitoredItem.DiscardOldest = true;
intMonitoredItem.Notification += OnMonitoredItemNotification;

Expand All @@ -338,8 +419,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
floatMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_Float");
floatMonitoredItem.AttributeId = Attributes.Value;
floatMonitoredItem.DisplayName = "Float Variable";
floatMonitoredItem.SamplingInterval = 1000;
floatMonitoredItem.QueueSize = 10;
floatMonitoredItem.SamplingInterval = itemSamplingInterval;
floatMonitoredItem.QueueSize = queueSize;
floatMonitoredItem.Notification += OnMonitoredItemNotification;

subscription.AddItem(floatMonitoredItem);
Expand All @@ -349,12 +430,54 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
stringMonitoredItem.StartNodeId = new NodeId("ns=2;s=Scalar_Simulation_String");
stringMonitoredItem.AttributeId = Attributes.Value;
stringMonitoredItem.DisplayName = "String Variable";
stringMonitoredItem.SamplingInterval = 1000;
stringMonitoredItem.QueueSize = 10;
stringMonitoredItem.SamplingInterval = itemSamplingInterval;
stringMonitoredItem.QueueSize = queueSize;
stringMonitoredItem.Notification += OnMonitoredItemNotification;

subscription.AddItem(stringMonitoredItem);

MonitoredItem eventMonitoredItem = new MonitoredItem(subscription.DefaultItem);
eventMonitoredItem.StartNodeId = new NodeId(Opc.Ua.ObjectIds.Server);
eventMonitoredItem.AttributeId = Attributes.EventNotifier;
eventMonitoredItem.DisplayName = "Event Variable";
eventMonitoredItem.SamplingInterval = itemSamplingInterval;
eventMonitoredItem.QueueSize = queueSize;
eventMonitoredItem.Notification += OnMonitoredItemEventNotification;

EventFilter filter = new EventFilter();

SimpleAttributeOperandCollection simpleAttributeOperands = new SimpleAttributeOperandCollection();

foreach (QualifiedNameCollection desiredEventField in m_desiredEventFields.Values)
{
simpleAttributeOperands.Add(new SimpleAttributeOperand() {
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = desiredEventField
});
}
filter.SelectClauses = simpleAttributeOperands;

ContentFilter whereClause = new ContentFilter();
SimpleAttributeOperand existingEventType = new SimpleAttributeOperand() {
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.ExclusiveLevelAlarmType,
BrowsePath = new QualifiedNameCollection(new QualifiedName[] { "EventType" })
};
LiteralOperand desiredEventType = new LiteralOperand();
desiredEventType.Value = new Variant(new NodeId(Opc.Ua.ObjectTypeIds.ExclusiveLevelAlarmType));


whereClause.Push(FilterOperator.Equals, new FilterOperand[] { existingEventType, desiredEventType });

filter.WhereClause = whereClause;

eventMonitoredItem.Filter = filter;
eventMonitoredItem.NodeClass = NodeClass.Object;


subscription.AddItem(eventMonitoredItem);

// Create the monitored items on Server side
subscription.ApplyChanges();
m_output.WriteLine("MonitoredItems created for SubscriptionId = {0}.", subscription.Id);
Expand All @@ -363,6 +486,8 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
{
m_output.WriteLine("Subscribe error: {0}", ex.Message);
}

return isDurable;
}
#endregion

Expand Down Expand Up @@ -1186,14 +1311,83 @@ private void OnMonitoredItemNotification(MonitoredItem monitoredItem, MonitoredI
{
// Log MonitoredItem Notification event
MonitoredItemNotification notification = e.NotificationValue as MonitoredItemNotification;
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2}.", notification.Message.SequenceNumber, monitoredItem.ResolvedNodeId, notification.Value);
DateTime localTime = notification.Value.SourceTimestamp.ToLocalTime();
m_output.WriteLine("Notification: {0} \"{1}\" and Value = {2} at [{3}].",
notification.Message.SequenceNumber,
monitoredItem.ResolvedNodeId,
notification.Value,
localTime.ToLongTimeString());
}
catch (Exception ex)
{
m_output.WriteLine("OnMonitoredItemNotification error: {0}", ex.Message);
}
}

/// <summary>
/// Handle Requested Event notifications from Server
/// </summary>
private void OnMonitoredItemEventNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
{
try
{
// Log MonitoredItem Notification event
EventFieldList notification = e.NotificationValue as EventFieldList;

foreach (KeyValuePair<int, QualifiedNameCollection> entry in m_desiredEventFields)
{
Variant field = notification.EventFields[entry.Key];
if (field.TypeInfo.BuiltInType != BuiltInType.Null)
{
StringBuilder fieldPath = new StringBuilder();

int lastIndex = entry.Value.Count - 1;
for (int index = 0; index < entry.Value.Count; index++)
{
fieldPath.Append(entry.Value[index].Name);
if (index < lastIndex)
{
fieldPath.Append(".");
}
}

string fieldName = fieldPath.ToString();
if (fieldName.Equals("Time"))
{
try
{
DateTime currentTime = (DateTime)field.Value;
TimeSpan timeSpan = currentTime - m_lastEventTime;
m_lastEventTime = currentTime;
m_processedEvents++;
string timeBetweenEvents = "";
if (m_processedEvents > 1)
{
timeBetweenEvents = ", time since last event = " + timeSpan.Seconds.ToString() + " seconds";
}

m_output.WriteLine("Event Received - total count = {0}{1}",
m_processedEvents.ToString(),
timeBetweenEvents);
}
catch (Exception ex)
{
m_output.WriteLine("Unexpected error retrieving Event Time Field Value: {0}", ex.Message);
}
}

m_output.WriteLine("\tField [{0}] \"{1}\" = [{2}]",
entry.Key.ToString(), fieldName, field.Value);
}
}
}
catch (Exception ex)
{
m_output.WriteLine("OnMonitoredItemEventNotification error: {0}", ex.Message);
}
}


/// <summary>
/// Event handler to defer publish response sequence number acknowledge.
/// </summary>
Expand Down Expand Up @@ -1256,5 +1450,8 @@ private static ByteStringCollection PrepareBrowseNext(BrowseResultCollection bro
private readonly TextWriter m_output;
private readonly ManualResetEvent m_quitEvent;
private readonly bool m_verbose;
private Dictionary<int, QualifiedNameCollection> m_desiredEventFields = null;
private int m_processedEvents = 0;
private DateTime m_lastEventTime = DateTime.Now;
}
}
Loading
Loading