-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmav_device.py
More file actions
90 lines (79 loc) · 2.75 KB
/
mav_device.py
File metadata and controls
90 lines (79 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import pymavlink.mavutil as utility
import threading
import time
from .mav_protocol import MAVProtocol
from .mav_receiver import Receiver
from .mav_sender import Sender
from .mav_message import MAVMessage
class MAVDevice:
"""
Primary class for MAVCore. Manages drone connection, sending, and receiving mavlink messages.
"""
def __init__(
self,
device_address: str,
baud_rate: int = 115200,
source_system: int = 255,
source_component: int = 0,
attempt_reconnect: bool = True,
):
self.attempt_reconnect = attempt_reconnect
self.receiver = Receiver()
self.connection: utility.mavudp | utility.mavserial = self._connect(
device_address, baud_rate, source_system, source_component
)
self.sender = Sender(
self.connection.target_system,
self.connection.target_component,
self.connection,
)
self.receiver.start_receiving()
self.reading = True
self.thread = threading.Thread(target=self._main_loop, daemon=True)
self.thread.start()
time.sleep(1)
def _connect(
self,
device_address: str,
baud_rate: int,
source_system: int,
source_component: int,
) -> utility.mavudp | utility.mavserial:
"""
device_address follows by pymavlink standards: 'udp:127.0.0.1:14550' or '/dev/ttyACM0'
"""
connection = utility.mavlink_connection(
device=device_address,
baud=baud_rate,
source_system=source_system,
source_component=source_component,
autoreconnect=self.attempt_reconnect,
)
if (
type(connection) is not utility.mavudp
and type(connection) is not utility.mavserial
):
raise RuntimeError("Connection is not udp or serial.")
return connection
def stop_reading(self):
"""
Will stop the thread that reads messages.
"""
self.reading = False
def add_listener(self, listener: MAVMessage) -> MAVMessage:
"""
Pass in a MAVMessage to listen for. Will save occurences of this message and update this message object accordingly.
"""
self.receiver.add_listener(listener)
return listener
def run_protocol(self, protocol: MAVProtocol) -> MAVProtocol:
"""
Runs a MAVProtocol object that sends and receives messages to complete the protcol.
"""
protocol.run(self.sender, self.receiver)
return protocol
def _main_loop(self):
while self.reading:
msg = self.connection.recv_match(blocking=True, timeout=1)
if msg:
self.receiver.update_queue(time.time(), msg)