-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathacquisition.py
143 lines (110 loc) · 4.54 KB
/
acquisition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
##
## This file is part of the sigrok-meter project.
##
## Copyright (C) 2013 Uwe Hermann <[email protected]>
## Copyright (C) 2014 Jens Steinhauser <[email protected]>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import qtcompat
import re
import sigrok.core as sr
import time
QtCore = qtcompat.QtCore
class Acquisition(QtCore.QObject):
'''Class that handles the sigrok session and the reception of data.'''
'''Signal emitted when new data arrived.'''
measured = QtCore.Signal(float, sr.classes.Device, sr.classes.Channel, tuple)
'''Signal emitted when the session has stopped.'''
stopped = QtCore.Signal()
def __init__(self, context):
super(self.__class__, self).__init__()
self.context = context
self.session = self.context.create_session()
self.session.add_datafeed_callback(self._datafeed_callback)
self.session.set_stopped_callback(self._stopped_callback)
def _parse_configstring(self, cs):
'''Dissect a config string and return the options as a dictionary.'''
def parse_option(k, v):
'''Parse the value for a single option.'''
try:
ck = sr.ConfigKey.get_by_identifier(k)
except:
raise ValueError('No option named "{}".'.format(k))
try:
val = ck.parse_string(v)
except:
raise ValueError(
'Invalid value "{}" for option "{}".'.format(v, k))
return (k, val)
if not re.match('(([^:=]+=[^:=]+)(:[^:=]+=[^:=]+)*)?$', cs):
raise ValueError(
'"{}" is not a valid configuration string.'.format(cs))
if not cs:
return {}
opts = cs.split(':')
opts = [tuple(kv.split('=')) for kv in opts]
opts = [parse_option(k, v) for (k, v) in opts]
return dict(opts)
def _parse_driverstring(self, ds):
'''Dissect the driver string and return a tuple consisting of
the driver name and the options (as a dictionary).'''
m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)$', ds)
if not m:
raise ValueError('"{}" is not a valid driver string.'.format(ds))
opts = m.group('opts')[1:]
return (m.group('name'), self._parse_configstring(opts))
def add_device(self, driverstring, configstring):
'''Add a device to the session.'''
# Process driver string.
(name, opts) = self._parse_driverstring(driverstring)
if not name in self.context.drivers:
raise RuntimeError('No driver named "{}".'.format(name))
driver = self.context.drivers[name]
devs = driver.scan(**opts)
if not devs:
raise RuntimeError('No devices found.')
device = devs[0]
# Process configuration string.
cfgs = self._parse_configstring(configstring)
for k, v in cfgs.items():
device.config_set(sr.ConfigKey.get_by_identifier(k), v)
self.session.add_device(device)
device.open()
def is_running(self):
'''Return whether the session is running.'''
return self.session.is_running()
@QtCore.Slot()
def start(self):
'''Start the session.'''
self.session.start()
@QtCore.Slot()
def stop(self):
'''Stop the session.'''
if self.is_running():
self.session.stop()
def _datafeed_callback(self, device, packet):
now = time.time()
if packet.type != sr.PacketType.ANALOG:
return
if not len(packet.payload.channels):
return
# TODO: Find a device with multiple channels in one packet.
channel = packet.payload.channels[0]
# The most recent value.
value = packet.payload.data[0][-1]
self.measured.emit(now, device, channel,
(value, packet.payload.unit, packet.payload.mq_flags))
def _stopped_callback(self, **kwargs):
self.stopped.emit()