-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathserver.py
31 lines (27 loc) · 947 Bytes
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding: utf-8 -*-
import asyncio
import websockets
async def websocket_handler(websocket, path):
"""
实现websocket命令路由
"""
path = str(path[1:]).split("/")
if len(path) == 1:
path.append("execute")
try:
controller = __import__("controllers." + path[0], fromlist=True)
class_name = str(path[0]).capitalize()
if hasattr(controller, class_name):
class_name = getattr(controller, class_name)
if not hasattr(class_name, path[1]):
raise ModuleNotFoundError()
else:
raise ModuleNotFoundError()
obj = class_name()
func = getattr(obj, path[1])
await func(websocket)
except ModuleNotFoundError:
print("RouterNotFound")
start_server = websockets.serve(websocket_handler, "localhost", 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()