-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathenv.py
82 lines (70 loc) · 2.16 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
try:
from meshtastic.protobuf import telemetry_pb2, portnums_pb2
except ImportError:
from meshtastic import telemetry_pb2, portnums_pb2
import argparse
import sys
import time
parser = argparse.ArgumentParser(
add_help=False,
epilog="If no connection arguments are specified, we attempt a serial connection and then a TCP connection to localhost.")
connOuter = parser.add_argument_group('Connection', 'Optional arguments to specify a device to connect to and how.')
conn = connOuter.add_mutually_exclusive_group()
conn.add_argument(
"--port",
"--serial",
"-s",
help="The port to connect to via serial, e.g. `/dev/ttyUSB0`.",
nargs="?",
default=None,
const=None,
)
conn.add_argument(
"--host",
"--tcp",
"-t",
help="The hostname or IP address to connect to using TCP.",
nargs="?",
default=None,
const="localhost",
)
conn.add_argument(
"--ble",
"-b",
help="The BLE device MAC address or name to connect to.",
nargs="?",
default=None,
const="any"
)
parser.add_argument('nodeid')
args = parser.parse_args()
if args.ble:
interface = meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
interface = meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
interface = meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
print("You probably need to add yourself to the `dialout` group to use a serial connection.")
if interface.devPath is None:
interface = meshtastic.tcp_interface.TCPInterface("localhost")
print(f"Sending environment metrics request to {args.nodeid}")
t = telemetry_pb2.Telemetry()
t.environment_metrics.CopyFrom(telemetry_pb2.EnvironmentMetrics())
gotresp = False
def onresp(packet):
global gotresp
print(packet)
gotresp = True
interface.sendData(
t,
destinationId=args.nodeid,
portNum=portnums_pb2.PortNum.TELEMETRY_APP,
wantResponse=True,
onResponse=onresp,
)
while not gotresp:
time.sleep(1)