forked from brandon-rhodes/fopnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsrv_asyncio1.py
42 lines (36 loc) · 1.4 KB
/
srv_asyncio1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_asyncio1.py
# Asynchronous I/O inside "asyncio" callback methods.
import asyncio, zen_utils
class ZenServer(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.data = b''
print('Accepted connection from {}'.format(self.address))
def data_received(self, data):
self.data += data
if self.data.endswith(b'?'):
answer = zen_utils.get_answer(self.data)
self.transport.write(answer)
self.data = b''
def connection_lost(self, exc):
if exc:
print('Client {} error: {}'.format(self.address, exc))
elif self.data:
print('Client {} sent {} but then closed'
.format(self.address, self.data))
else:
print('Client {} closed socket'.format(self.address))
if __name__ == '__main__':
address = zen_utils.parse_command_line('asyncio server using callbacks')
loop = asyncio.get_event_loop()
coro = loop.create_server(ZenServer, *address)
server = loop.run_until_complete(coro)
print('Listening at {}'.format(address))
try:
loop.run_forever()
finally:
server.close()
loop.close()