-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync_server.py
executable file
·154 lines (129 loc) · 5.31 KB
/
async_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
#
# See also
# https://github.com/grpc/grpc/blob/v1.13.x/examples/python/route_guide/route_guide_server.py
#
from grpclib.server import Server
from grpclib.utils import graceful_exit
from mdt_grpc_dialout_grpc import gRPCMdtDialoutBase
from proto_to_dict import proto_to_dict
from telemetry_pb2 import Telemetry
from threading import Lock
from walk_fields import walk_fields
import asyncio
import json
import logging
import mdt_grpc_dialout_pb2
import os
import ssl
import time
from argparse import ArgumentParser
logger = logging.getLogger(__name__)
logger_grpc = logging.getLogger("grpc._server")
logger_p2d = logging.getLogger("proto_to_dict")
logging_lock = Lock()
class TelemetryReceiver(gRPCMdtDialoutBase):
msgs_recvd = 0
async def MdtDialout(self, stream):
while True:
logger.debug('awaiting telemetry messge...')
req: MdtDialoutArgs = await stream.recv_message()
self.msgs_recvd += 1
t = Telemetry()
t.ParseFromString(req.data)
#
# Convert telemetry message as a whole into a Python dict
#
d = proto_to_dict(t)
with logging_lock:
logger.debug('--> CALLBACK START')
logger.debug('Messages Received = %d', self.msgs_recvd)
logger.debug('Node Id = "{}"'.format(t.node_id_str))
logger.debug('Subscription Id = "{}"'.format(t.subscription_id_str))
logger.debug('Encoding Path = "{}"'.format(t.encoding_path))
logger.debug('Msg Timestamp = "{}"'.format(t.msg_timestamp))
logger.debug('Collection Id = "{}"'.format(t.collection_id))
logger.debug('Collection Start = "{}"'.format(t.collection_start_time))
logger.debug('Collection End = "{}"'.format(t.collection_end_time))
if t.collection_end_time > 0:
logger.debug('last message for collection_id {}'.format(t.collection_id))
# json dict of the raw data
data_gpbkv = d.get('data_gpbkv')
if data_gpbkv:
for l in json.dumps(data_gpbkv, indent=2).splitlines():
logger.debug(l)
# retval = mdt_grpc_dialout_pb2.MdtDialoutArgs()
# retval.ReqId = req.ReqId
# await stream.send_message(retval)
#
# Really simple gRPC server dor dialout telemetry. No TLS, plain TCP.
#
async def serve(bind_address='0.0.0.0', port=57850, certfile=None, keyfile=None, client_ca=None):
logger.debug('Create gRPC server')
server = Server([TelemetryReceiver()])
ssl_options = {}
if certfile is not None:
logger.debug(f"Running with TLS, certfile {certfile} and keyfile {keyfile}")
ssl_ctx = ssl.SSLContext()
ssl_ctx.load_cert_chain(certfile=certfile, keyfile=keyfile)
if client_ca is not None:
ssl_ctx.load_verify_locations(cafile=client_ca)
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.set_alpn_protocols(['h2'])
keylog_filename = os.environ.get('SSLKEYLOGFILE')
if keylog_filename is not None:
ssl_ctx.keylog_filename = keylog_filename
ssl_options = {"ssl": ssl_ctx}
with graceful_exit([server]):
await server.start(bind_address, port, **ssl_options)
logger.debug('serving on %s:%d', bind_address, port)
await server.wait_closed()
if __name__ == "__main__":
#
# parse arguments
#
parser = ArgumentParser(description='Options:')
parser.add_argument(
'-b', '--bind-address', type=str,
default='0.0.0.0',
help="Specify bind address (default=0.0.0.0)")
parser.add_argument(
'-p', '--port', type=int,
default=57850,
help="Specify telemetry listening port (default=57850)")
parser.add_argument(
'-v', '--verbose', action='store_true',
help="Exceedingly verbose logging to the console")
parser.add_argument(
'-c', '--cert',
help="Specify certificate chain file (will use TLS; requires -k)")
parser.add_argument(
'-k', '--key',
help="Specify key file (will use TLS; requires -c)")
parser.add_argument(
'--client-ca',
help="Specify CA file to verify client certificate (will use TLS; requires -c and -k)")
args = parser.parse_args()
if args.cert is not None:
assert args.key is not None, "Key must be specified if certificate is specified"
if args.key is not None:
assert args.cert is not None, "Certificate must be specified if key is specified"
if args.client_ca is not None:
assert args.cert is not None, "Certificate and key must be specified if client CA is specified"
#
# setup logging to have a wauy to see what's happening
#
if args.verbose:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
#
# run the server
#
asyncio.run(serve(bind_address=args.bind_address,
port=args.port,
certfile=args.cert,
keyfile=args.key,
client_ca=args.client_ca))