Skip to content

Commit ebb6fe7

Browse files
committed
CBOR rpc client
1 parent b3af679 commit ebb6fe7

File tree

1 file changed

+175
-0
lines changed

1 file changed

+175
-0
lines changed

cbor/cbor_rpc_client.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from __future__ import absolute_import
2+
import logging
3+
import random
4+
import socket
5+
import time
6+
7+
import cbor
8+
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class SocketReader(object):
14+
'''
15+
Simple adapter from socket.recv to file-like-read
16+
'''
17+
def __init__(self, sock):
18+
self.socket = sock
19+
self.timeout_seconds = 10.0
20+
21+
def read(self, num):
22+
start = time.time()
23+
data = self.socket.recv(num)
24+
while len(data) < num:
25+
now = time.time()
26+
if now > (start + self.timeout_seconds):
27+
break
28+
ndat = self.socket.recv(num - len(data))
29+
if ndat:
30+
data += ndat
31+
return data
32+
33+
34+
class CborRpcClient(object):
35+
'''Base class for all client objects.
36+
37+
This provides common `addr_family`, `address`, and `registry_addresses`
38+
configuration parameters, and manages the connection back to the server.
39+
40+
Automatic retry and time based fallback is managed from
41+
configuration parameters `retries` (default 5), and
42+
`base_retry_seconds` (default 0.5). Retry time doubles on each
43+
retry. E.g. try 0; wait 0.5s; try 1; wait 1s; try 2; wait 2s; try
44+
3; wait 4s; try 4; wait 8s; try 5; FAIL. Total time waited just
45+
under base_retry_seconds * (2 ** retries).
46+
47+
.. automethod:: __init__
48+
.. automethod:: _rpc
49+
.. automethod:: close
50+
51+
'''
52+
53+
def __init__(self, config=None):
54+
self._socket_family = config.get('addr_family', socket.AF_INET)
55+
# may need to be ('host', port)
56+
self._socket_addr = config.get('address')
57+
if self._socket_family == socket.AF_INET:
58+
if not isinstance(self._socket_addr, tuple):
59+
# python socket standard library insists this be tuple!
60+
tsocket_addr = tuple(self._socket_addr)
61+
assert len(tsocket_addr) == 2, 'address must be length-2 tuple ("hostname", port number), got {!r} tuplified to {!r}'.format(self._socket_addr, tsocket_addr)
62+
self._socket_addr = tsocket_addr
63+
self._socket = None
64+
self._rfile = None
65+
self._local_addr = None
66+
self._message_count = 0
67+
self._retries = config.get('retries', 5)
68+
self._base_retry_seconds = float(config.get('base_retry_seconds', 0.5))
69+
70+
def _conn(self):
71+
# lazy socket opener
72+
if self._socket is None:
73+
try:
74+
self._socket = socket.create_connection(self._socket_addr)
75+
self._local_addr = self._socket.getsockname()
76+
except:
77+
logger.error('error connecting to %r:%r', self._socket_addr[0],
78+
self._socket_addr[1], exc_info=True)
79+
raise
80+
return self._socket
81+
82+
def close(self):
83+
'''Close the connection to the server.
84+
85+
The next RPC call will reopen the connection.
86+
87+
'''
88+
if self._socket is not None:
89+
self._rfile = None
90+
try:
91+
self._socket.shutdown(socket.SHUT_RDWR)
92+
self._socket.close()
93+
except socket.error:
94+
logger.warn('error closing lockd client socket',
95+
exc_info=True)
96+
self._socket = None
97+
98+
@property
99+
def rfile(self):
100+
if self._rfile is None:
101+
conn = self._conn()
102+
self._rfile = SocketReader(conn)
103+
return self._rfile
104+
105+
def _rpc(self, method_name, params):
106+
'''Call a method on the server.
107+
108+
Calls ``method_name(*params)`` remotely, and returns the results
109+
of that function call. Expected return types are primitives, lists,
110+
and dictionaries.
111+
112+
:raise Exception: if the server response was a failure
113+
114+
'''
115+
mlog = logging.getLogger('cborrpc')
116+
tryn = 0
117+
delay = self._base_retry_seconds
118+
self._message_count += 1
119+
message = {
120+
'id': self._message_count,
121+
'method': method_name,
122+
'params': params
123+
}
124+
mlog.debug('request %r', message)
125+
buf = cbor.dumps(message)
126+
127+
errormessage = None
128+
while True:
129+
try:
130+
conn = self._conn()
131+
conn.send(buf)
132+
response = cbor.load(self.rfile)
133+
mlog.debug('response %r', response)
134+
assert response['id'] == message['id']
135+
if 'result' in response:
136+
return response['result']
137+
# From here on out we got a response, the server
138+
# didn't have some weird intermittent error or
139+
# non-connectivity, it gave us an error message. We
140+
# don't retry that, we raise it to the user.
141+
errormessage = response.get('error')
142+
if errormessage and hasattr(errormessage,'get'):
143+
errormessage = errormessage.get('message')
144+
if not errormessage:
145+
errormessage = repr(response)
146+
break
147+
except Exception as ex:
148+
if tryn < self._retries:
149+
tryn += 1
150+
logger.debug('ex in %r (%s), retrying %s in %s sec...',
151+
method_name, ex, tryn, delay, exc_info=True)
152+
self.close()
153+
time.sleep(delay)
154+
delay *= 2
155+
continue
156+
logger.error('failed in rpc %r %r', method_name, params,
157+
exc_info=True)
158+
raise
159+
raise Exception(errormessage)
160+
161+
162+
if __name__ == '__main__':
163+
import sys
164+
logging.basicConfig(level=logging.DEBUG)
165+
host,port = sys.argv[1].split(':')
166+
if not host:
167+
host = 'localhost'
168+
port = int(port)
169+
client = CborRpcClient({'address':(host,port)})
170+
print(client._rpc(u'connect', [u'127.0.0.1:5432', u'root', u'aoeu']))
171+
print(client._rpc(u'put', [[('k1','v1'), ('k2','v2')]]))
172+
#print(client._rpc(u'ping', []))
173+
#print(client._rpc(u'gnip', []))
174+
client.close()
175+

0 commit comments

Comments
 (0)