-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathServer_280_RDKX5.py
202 lines (166 loc) · 7.51 KB
/
Server_280_RDKX5.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import fcntl
import logging
import traceback
import socket
import struct
import serial
import Hobot.GPIO as GPIO
GPIO.setwarnings(False)
class GPIOProtocolCode:
SETUP_GPIO_MODE = 0xAA
SETUP_GPIO_STATE = 0xAB
SET_GPIO_OUTPUT = 0xAC
GET_GPIO_INPUT = 0xAD
def to_string(data: bytes):
return ' '.join(map(lambda x: f'{x:02x}', data))
def get_local_host(name: str):
host = None
dgram_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
pack_res = struct.pack('256s', bytes(name, encoding="utf8"))
host = socket.inet_ntoa(fcntl.ioctl(dgram_socket.fileno(), 0x8915, pack_res)[20:24])
except Exception as e:
print(e)
finally:
dgram_socket.close()
return host
localhost = get_local_host("wlan0")
class SocketTransport(object):
def __init__(self, host="0.0.0.0", port=30002):
self.port = port
self.host = host
self.running = True
self.log = logging.getLogger("socket")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((host, port))
self.socket.listen(5)
if host == "0.0.0.0":
host = localhost
self.log.info(f"start listening on {host}:{port}")
def accept(self):
while self.running is True:
yield self.socket.accept()
def context(self, conn, buffer_size=1024):
while self.running is True:
try:
data_buffer = conn.recv(buffer_size)
if len(data_buffer) == 0:
break
yield data_buffer
except Exception as e:
self.log.error(f"error while reading socket: {e}")
traceback.print_exc()
break
def close(self):
self.log.info(f"close socket on {self.host}:{self.port}")
self.running = False
self.socket.close()
class SerialTransport(object):
def __init__(self, comport="/dev/ttyS1", baudrate=100_0000, timeout=None):
self.serial = serial.Serial(port=comport, baudrate=baudrate, timeout=timeout)
self.log = logging.getLogger("serial")
self.baudrate = baudrate
self.comport = comport
self.open()
self.log.info(f"start serial on [{self.comport}] with baudrate [{self.baudrate}]")
def send(self, data):
self.serial.write(data)
def recv(self, size=1024):
return self.serial.read(size)
@property
def is_open(self):
return self.serial.is_open
def close(self):
self.serial.close()
def open(self):
if not self.serial.is_open:
self.serial.open()
class MyCobot280RDKX5Server(object):
"""
Server for 280 RDK-X5
1. System GPIO operating protocol adheres to protocol MyCobot 280 RDK X5.
2. This server only does the work of forwarding protocol and does not participate in the analysis of instructions.
3. The server is only responsible for forwarding the data received from the socket to the serial port and vice versa.
4. Instruction parsing is done entirely by the client
5. The server is responsible for setting the GPIO mode
"""
def __init__(self, socket_transport, serial_transport, debug=True):
self.debug = debug
self.socket_transport = socket_transport
self.serial_transport = serial_transport
self.log = logging.getLogger("server")
def mainloop(self):
try:
self.log.info("tcp server started.")
for conn, addr in self.socket_transport.accept():
self.log.info(f"{addr} accepted!")
for data in self.socket_transport.context(conn, buffer_size=1024):
self.serial_transport.log.info(f"{addr} recv << {to_string(data)}")
if data[3] == GPIOProtocolCode.SETUP_GPIO_MODE:
try:
mode = GPIO.BCM if data[4] == 0 else GPIO.BOARD
GPIO.setmode(mode)
self.log.debug(f"{addr} setup gpio mode => {mode}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0x01, 0xfa])
except Exception as e:
self.log.error(f"{addr} setup gpio mode error: {e}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0xff, 0xfa])
elif data[3] == GPIOProtocolCode.SETUP_GPIO_STATE:
try:
mode = GPIO.OUT if data[5] == 1 else GPIO.IN
level = GPIO.HIGH if data[6] == 1 else GPIO.LOW
self.log.debug(f"{addr} setup gpio state, mode => {mode}, level => {level}")
GPIO.setup(data[4], mode, initial=level)
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0x01, 0xfa])
except Exception as e:
self.log.error(f"{addr} setup gpio state error: {e}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0xff, 0xfa])
elif data[3] == GPIOProtocolCode.SET_GPIO_OUTPUT:
try:
level = GPIO.HIGH if data[5] == 1 else GPIO.LOW
self.log.debug(f"{addr} set gpio output, level => {level}")
GPIO.output(data[4], level)
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0x01, 0xfa])
except Exception as e:
self.log.error(f"{addr} set gpio output error: {e}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0xff, 0xfa])
elif data[3] == GPIOProtocolCode.GET_GPIO_INPUT:
try:
self.log.debug(f"{addr} get gpio input, channel => {data[4]}")
level = GPIO.input(data[4])
self.log.debug(f"{addr} get gpio input, level => {level}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], level, 0xfa])
except Exception as e:
self.log.error(f"{addr} get gpio input error: {e}")
serial_data = bytes([0xfe, 0xfe, 0x03, data[3], 0xff, 0xfa])
else:
self.serial_transport.send(data)
serial_data = self.serial_transport.recv()
self.serial_transport.log.info(f"{addr} send >> {to_string(serial_data)}")
conn.send(serial_data)
else:
self.log.info(f"{addr} closed!")
except Exception as e:
self.log.error(f"server error: {e}")
self.log.exception(traceback.format_exc())
finally:
self.socket_transport.close()
self.serial_transport.close()
self.log.info("server closed")
def main(debug=False):
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format="%(asctime)s - [%(name)s] %(levelname)7s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("server.log")
]
)
socket_transport = SocketTransport(host="0.0.0.0", port=30002)
serial_transport = SerialTransport(comport="/dev/ttyS1", baudrate=100_0000, timeout=0.1)
MyCobot280RDKX5Server(socket_transport, serial_transport).mainloop()
GPIO.cleanup()
if __name__ == '__main__':
main()