Skip to content

Commit

Permalink
Implement Event Subscription handling for channels
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Chen <[email protected]>
  • Loading branch information
jackchenjc committed Jan 18, 2025
1 parent 8cdefd8 commit 669ad21
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 51 deletions.
144 changes: 123 additions & 21 deletions examples/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/id"
"github.com/gopcua/opcua/monitor"
"github.com/gopcua/opcua/ua"
)
Expand All @@ -24,6 +25,7 @@ func main() {
keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None")
nodeID = flag.String("node", "", "node id to subscribe to")
interval = flag.Duration("interval", opcua.DefaultSubscriptionInterval, "subscription interval")
event = flag.Bool("event", false, "are you subscribing to events")
)
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging")
flag.Parse()
Expand Down Expand Up @@ -83,34 +85,117 @@ func main() {
})
wg := &sync.WaitGroup{}

// start callback-based subscription
wg.Add(1)
go startCallbackSub(ctx, m, *interval, 0, wg, *nodeID)
fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"}
selects := make([]*ua.SimpleAttributeOperand, len(fieldNames))
for i, name := range fieldNames {
selects[i] = &ua.SimpleAttributeOperand{
TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType),
BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}},
AttributeID: ua.AttributeIDValue,
}
}

wheres := &ua.ContentFilter{
Elements: []*ua.ContentFilterElement{
{
FilterOperator: ua.FilterOperatorGreaterThanOrEqual,
FilterOperands: []*ua.ExtensionObject{
{
EncodingMask: 1,
TypeID: &ua.ExpandedNodeID{
NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary),
},
Value: ua.SimpleAttributeOperand{
TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType),
BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: "Severity"}},
AttributeID: ua.AttributeIDValue,
},
},
{
EncodingMask: 1,
TypeID: &ua.ExpandedNodeID{
NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary),
},
Value: ua.LiteralOperand{
Value: ua.MustVariant(uint16(0)),
},
},
},
},
},
}

// start channel-based subscription
wg.Add(1)
go startChanSub(ctx, m, *interval, 0, wg, *nodeID)
filter := ua.EventFilter{
SelectClauses: selects,
WhereClause: wheres,
}

filterExtObj := ua.ExtensionObject{
EncodingMask: ua.ExtensionObjectBinary,
TypeID: &ua.ExpandedNodeID{
NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary),
},
Value: filter,
}

if *event {
// start callback-based subscription
wg.Add(1)
go startCallbackSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID)

// start channel-based subscription
wg.Add(1)
go startChanSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID)
} else {
// start callback-based subscription
wg.Add(1)
go startCallbackSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID)

// start channel-based subscription
wg.Add(1)
go startChanSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID)
}
<-ctx.Done()
wg.Wait()
}

func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) {
fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"}
sub, err := m.Subscribe(
ctx,
&opcua.SubscriptionParameters{
Interval: interval,
},
func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
if msg.Error != nil {
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), msg.Error)
} else {
log.Printf("[callback] sub=%d ts=%s node=%s value=%v", s.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value())
func(s *monitor.Subscription, msg monitor.Message) {
switch v := msg.(type) {
case *monitor.DataChangeMessage:
if v.Error != nil {
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error)
} else {
log.Printf("[callback] sub=%d ts=%s node=%s value=%v",
s.SubscriptionID(),
v.SourceTimestamp.UTC().Format(time.RFC3339),
v.NodeID,
v.Value.Value())
}
case *monitor.EventMessage:
if v.Error != nil {
log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error)
} else {
log.Printf("[callback] sub=%d event details:", s.SubscriptionID())
for i, field := range v.EventFields {
if i < len(fieldNames) {
fieldName := fieldNames[i]
log.Printf(" %s: %v", fieldName, field.Value.Value())
}
}
}
default:
log.Printf("[callback] sub=%d unknown message type=%T", s.SubscriptionID(), msg)
}
time.Sleep(lag)
},
nodes...)

isEvent, filter, nodes...)
if err != nil {
log.Fatal(err)
}
Expand All @@ -120,9 +205,9 @@ func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag
<-ctx.Done()
}

func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
ch := make(chan *monitor.DataChangeMessage, 16)
sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, nodes...)
func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) {
ch := make(chan monitor.Message, 16)
sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, isEvent, filter, nodes...)

if err != nil {
log.Fatal(err)
Expand All @@ -135,10 +220,27 @@ func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag tim
case <-ctx.Done():
return
case msg := <-ch:
if msg.Error != nil {
log.Printf("[channel ] sub=%d error=%s", sub.SubscriptionID(), msg.Error)
} else {
log.Printf("[channel ] sub=%d ts=%s node=%s value=%v", sub.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value())
switch v := msg.(type) {
case *monitor.DataChangeMessage:
if v.Error != nil {
log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error)
} else {
log.Printf("[channel] sub=%d ts=%s node=%s value=%v",
sub.SubscriptionID(),
v.SourceTimestamp.UTC().Format(time.RFC3339),
v.NodeID,
v.Value.Value())
}
case *monitor.EventMessage:
if v.Error != nil {
log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error)
} else {
out := v.EventFields[0].Value.Value()
log.Printf("[channel] sub=%d event fields=%d",
sub.SubscriptionID(), out)
}
default:
log.Printf("[channel] sub=%d unknown message type: %T", sub.SubscriptionID(), msg)
}
time.Sleep(lag)
}
Expand Down
6 changes: 5 additions & 1 deletion examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func main() {

func valueRequest(nodeID *ua.NodeID) *ua.MonitoredItemCreateRequest {
handle := uint32(42)
return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle)
return opcua.NewDefaultMonitoredItemCreateRequest(opcua.MonitoredItemCreateRequestArgs{
NodeID: nodeID,
AttributeID: ua.AttributeIDValue,
ClientHandle: handle,
})
}

func eventRequest(nodeID *ua.NodeID) (*ua.MonitoredItemCreateRequest, []string) {
Expand Down
6 changes: 5 additions & 1 deletion examples/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func main() {
}

miCreateRequests := []*ua.MonitoredItemCreateRequest{
opcua.NewMonitoredItemCreateRequestWithDefaults(triggeringNode, ua.AttributeIDValue, 42),
opcua.NewDefaultMonitoredItemCreateRequest(opcua.MonitoredItemCreateRequestArgs{
NodeID: triggeringNode,
AttributeID: ua.AttributeIDValue,
ClientHandle: 42,
}),
{
ItemToMonitor: &ua.ReadValueID{
NodeID: triggeredNode,
Expand Down
Loading

0 comments on commit 669ad21

Please sign in to comment.