-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
93 lines (75 loc) · 3.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import pyuaf
from pyuaf.client import Client
from pyuaf.client.settings import ClientSettings, SessionSettings
from pyuaf.util import Address, NodeId, ExpandedNodeId, opcuaidentifiers
from pyuaf.util.errors import UafError, ConnectionError
import pyuaf.util.primitives
import pdb
from pyuaf.util import nodeclasses
server_uri='urn:AtlasDcs:OpcUaCanOpenServer'
server_address='opc.tcp://pcatldcsp11.cern.ch:4841'
client = None
unfold = True
def connect( ):
global client
cs = ClientSettings("myClient", [server_address])
client = Client(cs)
rootNode = Address( NodeId(opcuaidentifiers.OpcUaId_RootFolder, 0), server_uri )
result=client.browse ([ rootNode ])
class ObjectifiedNode():
pass
def __init__(self,addr):
self.a = addr
self.contained = {}
self.variables = {}
# execute browse
browse_result = client.browse([ self.a ])
for ri in xrange(len(browse_result.targets[0].references)):
ref = browse_result.targets[0].references[ri]
print ref.displayName
# now we have information - node class
if ref.nodeClass == nodeclasses.Variable:
self.variables[str(ref.displayName.text())] = Address(ExpandedNodeId(ref.nodeId))
elif ref.nodeClass == nodeclasses.Object:
if unfold:
self.contained[str(ref.displayName.text())] = ObjectifiedNode( Address(ExpandedNodeId(ref.nodeId)) )
def __getattr__(self,name):
if self.variables.has_key(name):
a = self.variables[name]
r = client.read([a])
return r.targets[0].data.value
elif self.contained.has_key(name):
return self.contained[name]
else:
raise AttributeError
def __setattr__(self,name,value):
if name in ["a", "contained", "variables"]:
self.__dict__[name] = value
return
if self.variables.has_key(name):
# depending on what was given, guess the encoding
print "change OPC UA mapped variable"
outputValue = None
if type(value) == tuple:
# we want explicit type mapping
if value[1] == 'Int16':
outputValue = pyuaf.util.primitives.Int16(int(value[0]))
elif value[1] == "UInt32":
outputValue = pyuaf.util.primitives.UInt32(int(value[0]))
else:
raise Exception('Requested type is unknown:'+str(value[1]))
else:
# give best guess
if type(value) is int:
outputValue = pyuaf.util.primitives.Int32(value)
else:
raise Exception('There is no default mapping for type:'+str(type(value)))
r=client.write([self.variables[name]], [outputValue])
if not r.overallStatus.isGood():
raise Exception("Setting attribute failed:"+str(r.targets[0].status))
else:
raise AttributeError
def get_object( string_a, ns ):
n = NodeId( string_a, ns )
a = Address( n, server_uri )
return ObjectifiedNode(a)