Skip to content

Commit 3d1f5d4

Browse files
authored
Merge pull request #80 from klemie/telemetry-packet-details
Telemetry Packet Extraction
2 parents 4eccfe5 + 999acf1 commit 3d1f5d4

4 files changed

Lines changed: 109 additions & 31 deletions

File tree

client/src/components/pdp-monitoring/feed-system/InstrumentationNode.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { memo } from 'react';
22

3-
import InstrumentationSymbol from '../../../static/instrumentation/instrumentationSymbol.svg';
3+
import InstrumentationSymbol from '../../../static/instrumentation/InstrumentationSymbol.svg';
44
import { Stack, Typography } from '@mui/material';
55
import { Handle, Position } from 'reactflow';
66

services/telemetry/Cargo.lock

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/telemetry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ clap = { version = "4.3.0", features = ["derive"] }
1414
sha1 = "0.10.6"
1515
serde = { version = "1.0.190", features = ["derive"] }
1616
serde_json = "1.0.107"
17+
regex = "1.10.5"

services/telemetry/src/telemetry.rs

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use std::io::prelude::*;
77
use std::string::FromUtf8Error;
88
use std::thread::spawn;
99
use std::collections::HashMap;
10+
use regex::Regex;
11+
use std::str;
1012

1113
use crate::data::DataPacket;
1214

@@ -16,41 +18,58 @@ use std::thread::sleep;
1618
/// Takes one line of a packet as a Vec<u8>
1719
/// Updates the state with information in that line
1820
async fn extract_packets(state_ref: Arc<Mutex<State>>, received_line: Vec<u8>) {
19-
let line = String::from_utf8_lossy(&received_line);
21+
let data = match str::from_utf8(&received_line) {
22+
Ok(v) => v,
23+
Err(e) => panic!("Invalid UTF-8: {}", e)
24+
};
25+
let mut packet_data: HashMap<String, String> = HashMap::new();
2026

21-
let words: Vec<&str> = line.split_whitespace().collect();
27+
let callsign_regex = Regex::new(r"([A-Z0-9]+)-\d").unwrap();
28+
let lat_long_regex = Regex::new(r"(\d{4}\.\d{2}N)/(\d{5}\.\d{2}W)").unwrap();
29+
let altitude_regex = Regex::new(r"/A=(\d{6})\*").unwrap();
2230

23-
// Search for the word "alt" in the words
24-
if let Some(alt_index) = words.iter().position(|&word| word == "alt") {
25-
// Extract numbers after "alt" until a non-number is hit
26-
let mut altitude = String::new();
27-
for word in &words[(alt_index + 1)..] {
28-
if let Some(_) = word.chars().next().unwrap().to_digit(10) {
29-
altitude.push_str(word);
30-
altitude.push(' ');
31-
} else {
32-
break;
33-
}
34-
}
35-
let mut packet_data: HashMap<String, String> = HashMap::new();
36-
packet_data.insert("altitude".to_string(), altitude.to_string());
37-
let packet_id: u64 = match state_ref.lock().unwrap().last_packet_id {
38-
Some(id) => id.checked_add(1).unwrap_or(1), // Add 1 if possible, otherwise set to 1
39-
None => 1, // If None, set to 1
40-
};
41-
packet_id.checked_add(1);
31+
if let Some(callsign_caps) = callsign_regex.captures(data) {
32+
let callsign = &callsign_caps[1];
33+
packet_data.insert("callsign".to_string(), callsign.to_string());
34+
println!("Callsign: {}", callsign);
35+
} else {
36+
packet_data.insert("callsign".to_string(), "".to_string());
37+
println!("Failed to get callsign");
38+
}
39+
40+
if let Some(lat_long_caps) = lat_long_regex.captures(data) {
41+
let latitude = &lat_long_caps[1];
42+
let longitude = &lat_long_caps[2];
43+
packet_data.insert("latitude".to_string(), latitude.to_string());
44+
packet_data.insert("longitude".to_string(), longitude.to_string());
45+
println!("Latitude: {}", latitude);
46+
println!("Longitude: {}", longitude);
47+
} else {
48+
packet_data.insert("latitude".to_string(), "".to_string());
49+
packet_data.insert("longitude".to_string(), "".to_string());
50+
println!("Failed to get lat long")
51+
}
4252

43-
let mut packet = DataPacket{
44-
id: packet_id, data: packet_data
45-
};
46-
state_ref.lock().unwrap().append_packet(packet);
53+
if let Some(altitude_caps) = altitude_regex.captures(data) {
54+
let altitude = &altitude_caps[1];
55+
packet_data.insert("altitude".to_string(), altitude.to_string());
4756
println!("Altitude: {}", altitude);
4857
} else {
49-
// prints here and above are for logging
50-
println!("{}", line);
58+
packet_data.insert("altitude".to_string(), "".to_string());
59+
println!("Failed to get altitude");
5160
}
52-
}
5361

62+
let packet_id: u64 = match state_ref.lock().unwrap().last_packet_id {
63+
Some(id) => id.checked_add(1).unwrap_or(1), // Add 1 if possible, otherwise set to 1
64+
None => 1, // If None, set to 1
65+
};
66+
packet_id.checked_add(1);
67+
68+
let mut packet = DataPacket{
69+
id: packet_id, data: packet_data
70+
};
71+
state_ref.lock().unwrap().append_packet(packet);
72+
}
5473
/// starts telemetry by calling decode.sh which in turn calls linux direwolf and rtl_fm binaries
5574
///
5675
/// calls extract_packets when new data received from rocket
@@ -67,11 +86,30 @@ pub async fn start_telemetry(state_ref: Arc<Mutex<State>>) -> io::Result<()> {
6786

6887
let mut child_out = BufReader::new(decode_aprs.stdout.as_mut().unwrap());
6988
let mut readbuf = vec![0; 256];
89+
let mut buffer = Vec::new();
7090

7191
loop {
7292
// read() blocks until data is available. This makes this not polling.
73-
let mut b = child_out.read(&mut readbuf);
74-
extract_packets(Arc::clone(&state_ref), readbuf.clone()).await;
93+
let mut bytes_read = child_out.read(&mut readbuf);
94+
match bytes_read {
95+
Ok(value) => {
96+
let int_bytes_read: i32 = value as i32;
97+
if (int_bytes_read > 0) {
98+
buffer.extend_from_slice(&readbuf[..value]);
99+
// Check for two successive newlines
100+
if let Some(pos) = buffer.windows(2).position(|window| window == b"\n\n") {
101+
// Extract the data before the two newlines
102+
let data_to_process = buffer[..pos].to_vec();
103+
extract_packets(Arc::clone(&state_ref), data_to_process).await;
104+
// Remove the processed data and the two newlines from the buffer
105+
buffer.drain(..pos + 2);
106+
}
107+
}
108+
}
109+
Err(e) => {
110+
println!("An error occurred: {}", e);
111+
}
112+
}
75113
}
76114
Ok(())
77115
}

0 commit comments

Comments
 (0)