Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: upgrade to use udp unicast #73

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
make
pip install -e .
synapse-sim --iface-ip 127.0.0.1 --rpc-port 50051 &
synapse-sim --rpc-port 50051 &
sleep 2
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,19 @@ As well as the base for a device implementation (`synapse/server`),
And a toy device `synapse-sim` for local development,

% synapse-sim --help
usage: synapse-sim [-h] --iface-ip IFACE_IP [--rpc-port RPC_PORT]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to assume the IP returned by getsockname() is the desired one? What if there are multiple interfaces with IPs?

usage: synapse-sim [-h] [--rpc-port RPC_PORT]
[--discovery-port DISCOVERY_PORT] [--discovery-addr DISCOVERY_ADDR] [--name NAME] [--serial SERIAL]
[-v]

Simple Synapse Device Simulator (Development)

options:
-h, --help show this help message and exit
--iface-ip IFACE_IP IP of the network interface to use for multicast traffic
--rpc-port RPC_PORT Port to listen for RPC requests
--discovery-port DISCOVERY_PORT
Port to listen for discovery requests
--discovery-addr DISCOVERY_ADDR
Multicast address to listen for discovery requests
UDP address to listen for discovery requests
--name NAME Device name
--serial SERIAL Device serial number
-v, --verbose Enable verbose output
Expand Down
245 changes: 173 additions & 72 deletions synapse/cli/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Optional
from operator import itemgetter
import copy
import sys

from google.protobuf.json_format import Parse, MessageToJson

Expand All @@ -22,6 +23,129 @@
from rich.pretty import pprint


class PacketMonitor:
def __init__(self):
# Packet tracking
self.packet_count = 0
self.seq_number = None
self.dropped_packets = 0
self.out_of_order_packets = 0

# Timing metrics
self.start_time = None
self.first_packet_time = None

# Bandwidth tracking
self.bytes_received = 0
self.bytes_received_in_interval = 0
self.bandwidth_samples = []
self.last_bandwidth_time = None
self.last_bytes_received = 0

# Jitter tracking
self.last_packet_time = None
self.last_jitter = 0
self.avg_jitter = 0

def start_monitoring(self):
"""Initialize monitoring timers"""
self.start_time = time.time()
self.last_stats_time = self.start_time
self.last_bandwidth_time = self.start_time

def process_packet(self, header, data, bytes_read):
if not data:
return False
packet_received_time = time.time()
# Record first packet time
if self.packet_count == 0:
self.first_packet_time = packet_received_time
self.last_packet_time = packet_received_time
print(
f"First packet received after {packet_received_time - self.start_time:.3f} seconds\n\n"
)
else:
# Calculate jitter
interval = packet_received_time - self.last_packet_time
# Update jitter using RFC 3550 algorithm (smoother than just max-min)
# https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.8
if self.packet_count > 1:
jitter_diff = abs(interval - self.last_jitter)
self.avg_jitter += (jitter_diff - self.avg_jitter) / 16

# Save current values for next calculation
self.last_jitter = interval
self.last_packet_time = packet_received_time

# We got a new packet
self.packet_count += 1
self.bytes_received += bytes_read
self.bytes_received_in_interval += bytes_read
# Sequence number checking for packet loss/ordering
if self.seq_number is None:
self.seq_number = header.seq_number
else:
expected = (self.seq_number + 1) % (2**16)
if header.seq_number != expected:
# Lost some packets
if header.seq_number > expected:
lost = (header.seq_number - expected) % (2**16)
self.dropped_packets += lost
else:
self.out_of_order_packets += 1
self.seq_number = header.seq_number

return True

def print_stats(self):
# Use carriage return to update on the same line
sys.stdout.write("\r")
try:
terminal_width = os.get_terminal_size().columns
except (AttributeError, OSError):
# Fallback if not available
terminal_width = 80

sys.stdout.write(" " * terminal_width)
sys.stdout.write("\r")

now = time.time()
stats = f"Runtime {now - self.start_time:.1f}s | "

# Drop calculation
drop_percent = (self.dropped_packets / max(1, self.packet_count)) * 100.0
stats += f"Dropped: {self.dropped_packets}/{self.packet_count} ({drop_percent:.1f}%) | "

# Bandwidth calcs

dt_sec = now - self.last_bandwidth_time
if dt_sec > 0:
bytes_per_second = self.bytes_received_in_interval / dt_sec
megabits_per_second = (bytes_per_second * 8) / 1_000_000
stats += f"Mbit/sec: {megabits_per_second:.1f} | "

# Jitter (in milliseconds)
jitter_ms = self.avg_jitter * 1000 # Convert to ms
stats += f"Jitter: {jitter_ms:.2f} ms | "

# Out of order
stats += f"Out of Order: {self.out_of_order_packets}"

# Flush the output to ensure it's displayed
if len(stats) > terminal_width - 1:
stats = stats[: terminal_width - 4] + "..."

# Write the stats and flush
sys.stdout.write(stats)
sys.stdout.flush()

# Reset for the next run
self.bytes_received_in_interval = 0
self.last_bandwidth_time = now

return True


def add_commands(subparsers):
a = subparsers.add_parser("read", help="Read from a device's StreamOut node")
a.add_argument("uri", type=str, help="IP address of Synapse device")
Expand Down Expand Up @@ -76,9 +200,7 @@ def read(args):
return
else:
console.print(f"[bold yellow]Overwriting existing files in {output_base}")

device = syn.Device(args.uri, args.verbose)

with console.status(
"Reading from Synapse Device", spinner="bouncingBall", spinner_style="green"
) as status:
Expand Down Expand Up @@ -158,9 +280,6 @@ def read(args):
return
console.print("[bold green]Device configured successfully")

if not device.configure(config):
raise ValueError("Failed to configure device")

if info.status.state != DeviceState.kRunning:
print("Starting device...")
if not device.start():
Expand Down Expand Up @@ -216,49 +335,46 @@ def read(args):
if args.duration
else "Streaming data indefinitely"
)
with console.status(
status_title, spinner="bouncingBall", spinner_style="green"
) as status:
q = queue.Queue()
plot_q = queue.Queue() if args.plot else None

threads = []
stop = threading.Event()
if args.bin:
threads.append(
threading.Thread(
target=_binary_writer, args=(stop, q, num_ch, output_base)
)
)
else:
threads.append(
threading.Thread(target=_data_writer, args=(stop, q, output_base))
)
console.print(status_title)

if args.plot:
threads.append(
threading.Thread(
target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch)
)
q = queue.Queue()
plot_q = queue.Queue() if args.plot else None

threads = []
stop = threading.Event()
if args.bin:
threads.append(
threading.Thread(target=_binary_writer, args=(stop, q, num_ch, output_base))
)
else:
threads.append(
threading.Thread(target=_data_writer, args=(stop, q, output_base))
)

if args.plot:
threads.append(
threading.Thread(
target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch)
)
)
for thread in threads:
thread.start()

try:
read_packets(stream_out, q, plot_q, args.duration)
except KeyboardInterrupt:
pass
finally:
print("Stopping read...")
stop.set()
for thread in threads:
thread.start()
thread.join()

try:
read_packets(stream_out, q, plot_q, args.duration)
except KeyboardInterrupt:
pass
finally:
print("Stopping read...")
stop.set()
for thread in threads:
thread.join()

if args.config:
console.print("Stopping device...")
if not device.stop():
console.print("[red]Failed to stop device")
console.print("Stopped")
if args.config:
console.print("Stopping device...")
if not device.stop():
console.print("[red]Failed to stop device")
console.print("Stopped")

console.print("[bold green]Streaming complete")
console.print("[cyan]================")
Expand All @@ -277,54 +393,39 @@ def read_packets(
duration: Optional[int] = None,
num_ch: int = 32,
):
packet_count = 0
seq_number = None
dropped_packets = 0
start = time.time()
print_interval = 1000
last_print_time = start

print(
f"Reading packets for duration {duration} seconds"
if duration
else "Reading packets..."
)

# Keep track of our statistics
monitor = PacketMonitor()
monitor.start_monitoring()
while True:
header, data = node.read()
if not data:
synapse_data, bytes_read = node.read()
if synapse_data is None or bytes_read == 0:
print("Could not read data from node")
continue

packet_count += 1

# Detect dropped packets via seq_number
if seq_number is None:
seq_number = header.seq_number
else:
expected = (seq_number + 1) % (2**16)
if header.seq_number != expected:
print(f"Seq out of order: got {header.seq_number}, expected {expected}")
dropped_packets += header.seq_number - expected
seq_number = header.seq_number
header, data = synapse_data
monitor.process_packet(header, data, bytes_read)

# Always add the data to the writer queues
q.put(data)
if plot_q:
plot_q.put(copy.deepcopy(data))

if packet_count == 1:
print(f"First packet received at {time.time() - start:.2f} seconds")
# Print every second our current statistics
if time.time() - last_print_time > 1.0:
monitor.print_stats()
last_print_time = time.time()

if packet_count % print_interval == 0:
print(
f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)"
)
if duration and (time.time() - start) > duration:
break

print(
f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)"
)


def _binary_writer(stop, q, num_ch, output_base):
filename = f"{output_base}.dat"
Expand Down
1 change: 0 additions & 1 deletion synapse/client/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def from_proto(cls, proto: NodeConfig):
oneof = proto.WhichOneof("config")
if oneof and hasattr(proto, oneof):
config = getattr(proto, oneof)

node = cls._from_proto(config)
node.id = proto.id
return node
Loading