Skip to content

Commit 676fe8b

Browse files
committed
Merge pull request #8 from openearth/jack-recv
recv_array now supports receiving an array with timeout
2 parents 4df333a + c6983d2 commit 676fe8b

File tree

3 files changed

+101
-35
lines changed

3 files changed

+101
-35
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
- recv_array now supports polling for timeout.
2+
13
- Added MMIClient.
24

35
- Added tracker client.

mmi/__init__.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,22 @@
22
Model Message Interface
33
"""
44

5-
__version__ = '0.1.4'
5+
__version__ = '0.1.5'
66
import datetime
77

88
import numpy as np
99
import zmq
1010
import zlib
1111

12+
13+
class NoResponseException(Exception):
14+
pass
15+
16+
17+
class EmptyResponseException(Exception):
18+
pass
19+
20+
1221
def send_array(socket, A=None, metadata=None, flags=0, copy=False, track=False, compress=None):
1322
"""send a numpy array with metadata over zmq"""
1423

@@ -49,11 +58,44 @@ def send_array(socket, A=None, metadata=None, flags=0, copy=False, track=False,
4958
return
5059

5160

52-
def recv_array(socket, flags=0, copy=False, track=False):
53-
"""recv a metadata and an optional numpy array from a zmq socket"""
54-
md = socket.recv_json(flags=flags)
61+
def recv_array(
62+
socket, flags=0, copy=False, track=False, poll=None, poll_timeout=10000):
63+
"""recv a metadata and an optional numpy array from a zmq socket
64+
65+
Optionally provide poll object to use recv_array with timeout
66+
67+
poll_timeout is in millis
68+
"""
69+
if poll is None:
70+
md = socket.recv_json(flags=flags)
71+
else:
72+
# one-try "Lazy Pirate" method: http://zguide.zeromq.org/php:chapter4
73+
socks = dict(poll.poll(poll_timeout))
74+
if socks.get(socket) == zmq.POLLIN:
75+
reply = socket.recv_json(flags=flags)
76+
if not reply:
77+
raise EmptyResponseException(
78+
"Recv_array got an empty response (1)")
79+
md = reply
80+
else:
81+
raise NoResponseException(
82+
"Recv_array got no response within timeout (1)")
83+
5584
if socket.getsockopt(zmq.RCVMORE):
56-
msg = socket.recv(flags=flags, copy=copy, track=track)
85+
if poll is None:
86+
msg = socket.recv(flags=flags, copy=copy, track=track)
87+
else:
88+
# one-try "Lazy Pirate" method: http://zguide.zeromq.org/php:chapter4
89+
socks = dict(poll.poll(poll_timeout))
90+
if socks.get(socket) == zmq.POLLIN:
91+
reply = socket.recv(flags=flags, copy=copy, track=track)
92+
if not reply:
93+
raise EmptyResponseException(
94+
"Recv_array got an empty response (2)")
95+
msg = reply
96+
else:
97+
raise NoResponseException(
98+
"Recv_array got no response within timeout (2)")
5799
buf = buffer(msg)
58100
A = np.frombuffer(buf, dtype=md['dtype'])
59101
A = A.reshape(md['shape'])
@@ -63,5 +105,3 @@ def recv_array(socket, flags=0, copy=False, track=False):
63105
# No array expected
64106
A = None
65107
return A, md
66-
67-

mmi/mmi_client.py

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
from mmi import send_array, recv_array
44
from bmi.api import IBmi
55

6+
67
class MMIClient(IBmi):
7-
def __init__(self, zmq_address):
8+
def __init__(self, zmq_address, poll_timeout=10000):
89
"""
910
Constructor
1011
"""
@@ -15,6 +16,16 @@ def __init__(self, zmq_address):
1516
self.socket = context.socket(zmq.REQ)
1617
self.socket.connect(zmq_address)
1718

19+
self.poll = zmq.Poller()
20+
self.poll.register(self.socket, zmq.POLLIN)
21+
22+
self.poll_timeout = poll_timeout
23+
24+
def _close_sockets(self):
25+
self.socket.setsockopt(zmq.LINGER, 0)
26+
self.socket.close()
27+
self.poll.unregister(self.socket)
28+
1829
# from here: BMI commands that get translated to MMI.
1930
def initialize(self, configfile=None):
2031
"""
@@ -24,10 +35,11 @@ def initialize(self, configfile=None):
2435
method = "initialize"
2536

2637
A = None
27-
metadata = {method : configfile}
38+
metadata = {method: configfile}
2839

2940
send_array(self.socket, A, metadata)
30-
A, metadata = recv_array(self.socket)
41+
A, metadata = recv_array(
42+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
3143

3244
def finalize(self):
3345
"""
@@ -37,10 +49,11 @@ def finalize(self):
3749
method = "finalize"
3850

3951
A = None
40-
metadata = {method : -1}
52+
metadata = {method: -1}
4153

4254
send_array(self.socket, A, metadata)
43-
A, metadata = recv_array(self.socket)
55+
A, metadata = recv_array(
56+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
4457

4558
def get_var_count(self):
4659
"""
@@ -50,10 +63,11 @@ def get_var_count(self):
5063
method = "get_var_count"
5164

5265
A = None
53-
metadata = {method : -1}
66+
metadata = {method: -1}
5467

5568
send_array(self.socket, A, metadata)
56-
A, metadata = recv_array(self.socket)
69+
A, metadata = recv_array(
70+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
5771

5872
return metadata[method]
5973

@@ -65,10 +79,11 @@ def get_var_name(self, i):
6579
method = "get_var_name"
6680

6781
A = None
68-
metadata = {method : i}
82+
metadata = {method: i}
6983

7084
send_array(self.socket, A, metadata)
71-
A, metadata = recv_array(self.socket)
85+
A, metadata = recv_array(
86+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
7287

7388
return metadata[method]
7489

@@ -80,10 +95,11 @@ def get_var_type(self, name):
8095
method = "get_var_type"
8196

8297
A = None
83-
metadata = {method : name}
98+
metadata = {method: name}
8499

85100
send_array(self.socket, A, metadata)
86-
A, metadata = recv_array(self.socket)
101+
A, metadata = recv_array(
102+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
87103

88104
return metadata[method]
89105

@@ -95,10 +111,11 @@ def get_var_rank(self, name):
95111
method = "get_var_rank"
96112

97113
A = None
98-
metadata = {method : name}
114+
metadata = {method: name}
99115

100116
send_array(self.socket, A, metadata)
101-
A, metadata = recv_array(self.socket)
117+
A, metadata = recv_array(
118+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
102119

103120
return metadata[method]
104121

@@ -110,10 +127,11 @@ def get_var_shape(self, name):
110127
method = "get_var_shape"
111128

112129
A = None
113-
metadata = {method : rank}
130+
metadata = {method: name}
114131

115132
send_array(self.socket, A, metadata)
116-
A, metadata = recv_array(self.socket)
133+
A, metadata = recv_array(
134+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
117135

118136
return metadata[method]
119137

@@ -125,10 +143,11 @@ def get_var(self, name):
125143
method = "get_var"
126144

127145
A = None
128-
metadata = {method : name}
146+
metadata = {method: name}
129147

130148
send_array(self.socket, A, metadata)
131-
A, metadata = recv_array(self.socket)
149+
A, metadata = recv_array(
150+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
132151

133152
return A
134153

@@ -140,10 +159,11 @@ def set_var(self, name, var):
140159
method = "set_var"
141160

142161
A = var
143-
metadata = {method : name}
162+
metadata = {method: name}
144163

145164
send_array(self.socket, A, metadata)
146-
A, metadata = recv_array(self.socket)
165+
A, metadata = recv_array(
166+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
147167

148168
def get_start_time(self):
149169
"""
@@ -153,10 +173,11 @@ def get_start_time(self):
153173
method = "get_start_time"
154174

155175
A = None
156-
metadata = {method : -1}
176+
metadata = {method: -1}
157177

158178
send_array(self.socket, A, metadata)
159-
A, metadata = recv_array(self.socket)
179+
A, metadata = recv_array(
180+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
160181

161182
return metadata[method]
162183

@@ -168,10 +189,11 @@ def get_end_time(self):
168189
method = "get_end_time"
169190

170191
A = None
171-
metadata = {method : -1}
192+
metadata = {method: -1}
172193

173194
send_array(self.socket, A, metadata)
174-
A, metadata = recv_array(self.socket)
195+
A, metadata = recv_array(
196+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
175197

176198
return metadata[method]
177199

@@ -183,10 +205,11 @@ def get_current_time(self):
183205
method = "get_current_time"
184206

185207
A = None
186-
metadata = {method : -1}
208+
metadata = {method: -1}
187209

188210
send_array(self.socket, A, metadata)
189-
A, metadata = recv_array(self.socket)
211+
A, metadata = recv_array(
212+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
190213

191214
return metadata[method]
192215

@@ -198,10 +221,11 @@ def update(self, dt):
198221
method = "update"
199222

200223
A = None
201-
metadata = {method : dt}
224+
metadata = {method: dt}
202225

203226
send_array(self.socket, A, metadata)
204-
A, metadata = recv_array(self.socket)
227+
A, metadata = recv_array(
228+
self.socket, poll=self.poll, poll_timeout=self.poll_timeout)
205229

206230
# TODO: Do we really need these two?
207231
def inq_compound(self, name):
@@ -227,6 +251,6 @@ def remote(self, action):
227251
metadata = {method: action}
228252

229253
send_array(self.socket, A, metadata)
230-
A, metadata = recv_array(self.socket)
254+
A, metadata = recv_array(self.socket, poll=self.poll)
231255

232256
return metadata[method]

0 commit comments

Comments
 (0)