Skip to content

Commit 58e5c65

Browse files
committed
New Handler and SharedPV methods with example
1 parent ab76714 commit 58e5c65

File tree

2 files changed

+290
-0
lines changed

2 files changed

+290
-0
lines changed

example/persist.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
"""
2+
Use a handler to automatically persist values to an SQLite3 file database.
3+
Any values persisted this way will be automatically restored when the
4+
program is rerun. The details of users (account name and IP address) are
5+
recorded for puts.
6+
7+
Try monitoring the PV `demo:pv:optime` then quit, wait, and restart the
8+
program while continuing to monitor the PV. Compare with the value of
9+
`demo:pv:uptime` which resets on each program start. Try setting the value of
10+
demo:pv:optime while continuing to monitor it. It is recommended to
11+
inspect the persisted file, e.g. `sqlite3 persist_pvs.db "select * from pvs"`.
12+
13+
There is an important caveat for this simple demo:
14+
The `PersistHandler` will not work as expected if anything other than the
15+
value of a field is changed, e.g. if a Control field was added to an NTScalar
16+
if would not be persisted correctly. This could be resolved by correctly
17+
merging the pv.current().raw and value.raw appropriately in the post().
18+
"""
19+
20+
import json
21+
import sqlite3
22+
import time
23+
24+
from p4p import Value
25+
from p4p.nt.scalar import NTScalar
26+
from p4p.server import Server, ServerOperation
27+
from p4p.server.raw import Handler
28+
from p4p.server.thread import SharedPV
29+
30+
31+
class PersistHandler(Handler):
32+
"""
33+
A handler that will allow simple persistence of values and timestamps
34+
across retarts. It requires a post handler in order to persist values
35+
set within the program.
36+
"""
37+
38+
def __init__(self, pv_name: str, conn: sqlite3.Connection, open_restore=True):
39+
self._conn = conn
40+
self._pv_name = pv_name
41+
self._open_restore = open_restore
42+
43+
def open(self, value, **kws):
44+
# If there is a value already in the database we always use that
45+
# instead of the supplied initial value, unless the
46+
# handler_open_restore flag indicates otherwise.
47+
if not self._open_restore:
48+
return
49+
50+
# We could, in theory, re-apply authentication here if we queried for
51+
# that information and then did something with it!
52+
res = self._conn.execute("SELECT data FROM pvs WHERE id=?", [self._pv_name])
53+
query_val = res.fetchone()
54+
55+
if query_val is not None:
56+
json_val = json.loads(query_val[0])
57+
print(f"Will restore to {self._pv_name} value: {json_val['value']}")
58+
59+
# Override initial value
60+
value["value"] = json_val["value"]
61+
62+
value["timeStamp.secondsPastEpoch"] = json_val["timeStamp"][
63+
"secondsPastEpoch"
64+
]
65+
value["timeStamp.nanoseconds"] = json_val["timeStamp"]["nanoseconds"]
66+
else:
67+
# We are using an initial value so persist it
68+
self._upsert(value)
69+
70+
def post(
71+
self,
72+
pv: SharedPV,
73+
value: Value,
74+
):
75+
self._update_timestamp(value)
76+
77+
self._upsert(
78+
value,
79+
)
80+
81+
def put(self, pv: SharedPV, op: ServerOperation):
82+
# The post does all the real work, we just add info only available
83+
# from the ServerOperation
84+
self._update_timestamp(op.value())
85+
86+
self._upsert(
87+
op.value(), op.account(), op.peer()
88+
)
89+
90+
op.done()
91+
92+
def _update_timestamp(self, value) -> None:
93+
if not value.changed("timeStamp") or (
94+
value["timeStamp.nanoseconds"] == value["timeStamp.nanoseconds"] == 0
95+
):
96+
now = time.time()
97+
value["timeStamp.secondsPastEpoch"] = now // 1
98+
value["timeStamp.nanoseconds"] = int((now % 1) * 1e9)
99+
100+
def _upsert(self, value, account=None, peer=None) -> None:
101+
# Persist the data; turn into JSON and write it to the DB
102+
val_json = json.dumps(value.todict())
103+
104+
# Use UPSERT: https://sqlite.org/lang_upsert.html
105+
conn.execute(
106+
"""
107+
INSERT INTO pvs (id, data, account, peer)
108+
VALUES (:name, :json_data, :account, :peer)
109+
ON CONFLICT(id)
110+
DO UPDATE SET data = :json_data, account = :account, peer = :peer;
111+
""",
112+
{
113+
"name": self._pv_name,
114+
"json_data": val_json,
115+
"account": account,
116+
"peer": peer,
117+
},
118+
)
119+
conn.commit()
120+
121+
122+
# Create an SQLite dayabase to function as our persistence store
123+
conn = sqlite3.connect("persist_pvs.db", check_same_thread=False)
124+
#conn.execute("DROP TABLE IF EXISTS pvs")
125+
conn.execute(
126+
"CREATE TABLE IF NOT EXISTS pvs (id VARCHAR(255), data JSON, account VARCHAR(30), peer VARCHAR(55), PRIMARY KEY (id));"
127+
) # IPv6 addresses can be long and will contain port number as well!
128+
129+
duplicate_pv = SharedPV(
130+
nt=NTScalar("i"), handler=PersistHandler("demo:pv:int", conn), initial=12
131+
)
132+
pvs = {
133+
"demo:pv:optime": SharedPV(
134+
nt=NTScalar("i"),
135+
handler=PersistHandler("demo:pv:optime", conn),
136+
initial=0,
137+
), # Operational time; total time running
138+
"demo:pv:uptime": SharedPV(
139+
nt=NTScalar("i"),
140+
handler=PersistHandler("demo:pv:uptime", conn, open_restore=False),
141+
timestamp=time.time(),
142+
initial=0,
143+
), # Uptime since most recent (re)start
144+
"demo:pv:int": duplicate_pv,
145+
"demo:pv:float": SharedPV(
146+
nt=NTScalar("d"),
147+
handler=PersistHandler("demo:pv:float", conn),
148+
initial=9.99,
149+
),
150+
"demo:pv:string": SharedPV(
151+
nt=NTScalar("s"),
152+
handler=PersistHandler("demo:pv:string", conn),
153+
initial="Hello!",
154+
),
155+
"demo:pv:alias_int": duplicate_pv, # It works except for reporting its restore
156+
}
157+
158+
159+
# Make the uptime PV readonly; maybe we want to be able to update optime
160+
# after major system upgrades?
161+
uptime_pv = pvs["demo:pv:uptime"]
162+
163+
164+
@uptime_pv.put
165+
def read_only(pv: SharedPV, op: ServerOperation):
166+
op.done(error="Read-only")
167+
return
168+
169+
170+
print(f"Starting server with the following PVs: {pvs}")
171+
172+
server = None
173+
try:
174+
server = Server(providers=[pvs])
175+
while True:
176+
# Every second increment the values of uptime and optime
177+
time.sleep(1)
178+
increment_value = pvs["demo:pv:uptime"].current().raw["value"] + 1
179+
pvs["demo:pv:uptime"].post(increment_value)
180+
increment_value = pvs["demo:pv:optime"].current().raw["value"] + 1
181+
pvs["demo:pv:optime"].post(increment_value)
182+
except KeyboardInterrupt:
183+
pass
184+
finally:
185+
if server:
186+
server.stop()
187+
conn.close()

src/p4p/server/raw.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ class Handler(object):
3939
4040
Use of this as a base class is optional.
4141
"""
42+
def open(self, value, **kws):
43+
"""
44+
Called each time an Open operation is performed on this Channel
45+
46+
:param value: A Value, or appropriate object (see nt= and wrap= of the constructor).
47+
"""
48+
pass
4249

4350
def put(self, pv, op):
4451
"""
@@ -50,6 +57,17 @@ def put(self, pv, op):
5057
"""
5158
op.done(error='Not supported')
5259

60+
def post(self, pv, value, **kws):
61+
"""
62+
Called each time a client issues a post
63+
operation on this Channel.
64+
65+
:param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
66+
:param value: A Value, or appropriate object (see nt= and wrap= of the constructor).
67+
:param dict options: A dictionary of configuration options.
68+
"""
69+
pass
70+
5371
def rpc(self, pv, op):
5472
"""
5573
Called each time a client issues a Remote Procedure Call
@@ -77,6 +95,14 @@ def onLastDisconnect(self, pv):
7795
pass
7896

7997

98+
def close(self, pv):
99+
"""
100+
Called when the Channel is closed.
101+
102+
:param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with.
103+
"""
104+
pass
105+
80106
class SharedPV(_SharedPV):
81107

82108
"""Shared state Process Variable. Callback based implementation.
@@ -158,6 +184,14 @@ def open(self, value, nt=None, wrap=None, unwrap=None, **kws):
158184
V = self._wrap(value, **kws)
159185
except: # py3 will chain automatically, py2 won't
160186
raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws))
187+
188+
# Guard goes here because we can have handlers that don't inherit from
189+
# the Handler base class
190+
try:
191+
self._handler.open(V, **kws)
192+
except AttributeError as err:
193+
pass
194+
161195
_SharedPV.open(self, V)
162196

163197
def post(self, value, **kws):
@@ -174,8 +208,35 @@ def post(self, value, **kws):
174208
V = self._wrap(value, **kws)
175209
except: # py3 will chain automatically, py2 won't
176210
raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws))
211+
212+
# Guard goes here because we can have handlers that don't inherit from
213+
# the Handler base class
214+
try:
215+
self._handler.post(self, V, **kws)
216+
except AttributeError:
217+
pass
218+
177219
_SharedPV.post(self, V)
178220

221+
def close(self, destroy=False, sync=False, timeout=None):
222+
"""Close PV, disconnecting any clients.
223+
224+
:param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open().
225+
:param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies).
226+
:param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value.
227+
228+
close() with destory=True or sync=True will not prevent clients from re-connecting.
229+
New clients may prevent sync=True from succeeding.
230+
Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`,
231+
or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV.
232+
"""
233+
try:
234+
self._handler.close(self)
235+
except AttributeError:
236+
pass
237+
238+
_SharedPV.close(self)
239+
179240
def current(self):
180241
V = _SharedPV.current(self)
181242
try:
@@ -208,6 +269,13 @@ def __init__(self, pv, real):
208269
self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC
209270
self._real = real
210271

272+
def open(self, value, **kws):
273+
_log.debug('OPEN %s %s', self._pv, value)
274+
try:
275+
self._pv._exec(None, self._real.open, value, **kws)
276+
except AttributeError:
277+
pass
278+
211279
def onFirstConnect(self):
212280
self._pv._exec(None, self._pv._onFirstConnect, None)
213281
try: # user handler may omit onFirstConnect()
@@ -239,6 +307,20 @@ def rpc(self, op):
239307
except AttributeError:
240308
op.done(error="RPC not supported")
241309

310+
def post(self, value, **kws):
311+
_log.debug('POST %s %s', self._pv, value)
312+
try:
313+
self._pv._exec(None, self._real.post, self._pv, value, **kws)
314+
except AttributeError:
315+
pass
316+
317+
def close(self, pv):
318+
_log.debug('CLOSE %s', self._pv)
319+
try:
320+
self._pv._exec(None, self._real.close, self._pv)
321+
except AttributeError:
322+
pass
323+
242324
@property
243325
def onFirstConnect(self):
244326
def decorate(fn):
@@ -253,6 +335,20 @@ def decorate(fn):
253335
return fn
254336
return decorate
255337

338+
@property
339+
def on_open(self):
340+
def decorate(fn):
341+
self._handler.open = fn
342+
return fn
343+
return decorate
344+
345+
@property
346+
def on_post(self):
347+
def decorate(fn):
348+
self._handler.post = fn
349+
return fn
350+
return decorate
351+
256352
@property
257353
def put(self):
258354
def decorate(fn):
@@ -267,6 +363,13 @@ def decorate(fn):
267363
return fn
268364
return decorate
269365

366+
@property
367+
def on_close(self):
368+
def decorate(fn):
369+
self._handler.close = fn
370+
return fn
371+
return decorate
372+
270373
def __repr__(self):
271374
if self.isOpen():
272375
return '%s(value=%s)' % (self.__class__.__name__, repr(self.current()))

0 commit comments

Comments
 (0)