From 512fa907665e875d0ec4ee8b079ced5308d77802 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Tue, 4 Mar 2025 10:59:39 -0800 Subject: [PATCH 01/12] WIP: UDP Unicast --- synapse/cli/streaming.py | 423 ++++++++++++++++++++++++++++++++++----- 1 file changed, 373 insertions(+), 50 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 6f08386..a1efcd3 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -7,6 +7,7 @@ from typing import Optional from operator import itemgetter import copy +import math from google.protobuf.json_format import Parse, MessageToJson @@ -22,6 +23,324 @@ from rich.pretty import pprint +class PacketMonitor: + """ + A class to monitor and analyze packet streaming statistics including: + - Effective receive bandwidth (Mbps) + - Jitter (variation in packet arrival intervals) + - Total packets received + - Packet loss detection + - Out-of-order packet detection + """ + + def __init__(self, print_interval=100, history_size=1000): + # Configuration + self.print_interval = print_interval + self.history_size = history_size + + # Packet tracking + self.packet_count = 0 + self.seq_number = None + self.dropped_packets = 0 + self.out_of_order_packets = 0 + + # Timing metrics + self.start_time = None + self.last_stats_time = None + self.first_packet_time = None + + # Bandwidth tracking + self.bytes_received = 0 + self.bandwidth_samples = [] + self.last_bandwidth_time = None + self.last_bytes_received = 0 + + # Jitter tracking + self.packet_arrival_times = [] + self.packet_intervals = [] + + # Stats dictionary + self.stats = { + "total_packets": 0, + "dropped_packets": 0, + "out_of_order_packets": 0, + "drop_rate": 0.0, + "bandwidth_mbps": 0.0, + "jitter_ms": 0.0, + "run_time_seconds": 0.0, + } + + def start_monitoring(self): + """Initialize monitoring timers""" + self.start_time = time.time() + self.last_stats_time = self.start_time + self.last_bandwidth_time = self.start_time + + def process_packet(self, header, data): + """ + Process a single packet and update statistics + + Args: + header: Packet header containing sequence number + data: Packet data + + Returns: + bool: True if packet was processed, False otherwise + """ + if not data: + return False + + packet_received_time = time.time() + + # Record first packet time + if self.packet_count == 0: + self.first_packet_time = packet_received_time + print( + f"First packet received at {packet_received_time - self.start_time:.3f} seconds" + ) + + # Track packet arrival for jitter calculation + self.packet_arrival_times.append(packet_received_time) + + # Calculate packet intervals for jitter + if len(self.packet_arrival_times) > 1: + interval = ( + self.packet_arrival_times[-1] - self.packet_arrival_times[-2] + ) * 1000.0 # Convert to ms + self.packet_intervals.append(interval) + + # Update byte count for bandwidth calculation + # Calculate exact size based on the NDTP message structure + + # Header size from NDTPHeader.STRUCT.size (1 + 1 + 8 + 2 = 12 bytes) + # >BBQH: > (big-endian), B (unsigned char, 1 byte), B (unsigned char, 1 byte), + # Q (unsigned long long, 8 bytes), H (unsigned short, 2 bytes) + header_size = 12 # We know this from the struct format + + # Payload size calculation based on ElectricalBroadbandData + payload_size = 0 + + if hasattr(data, "samples"): + # Add bytes for each channel's samples + for ch_id, samples in data.samples: + # Calculate sample size from bit_width + if hasattr(data, "bit_width"): + bytes_per_sample = math.ceil(data.bit_width / 8) + else: + bytes_per_sample = 4 # Default to 4 bytes if bit_width unknown + + # Calculate this channel's data size + channel_data_size = len(samples) * bytes_per_sample + + # Add overhead for channel metadata (ID, length indicator, etc.) + # This is an estimate; adjust based on actual protocol + channel_overhead = 8 # Typically a few bytes for channel ID and length + + payload_size += channel_data_size + channel_overhead + + # Add general payload overhead (type indicators, length fields, etc.) + # This is an estimate; adjust based on actual protocol + payload_size += 16 # General payload metadata + else: + # Fallback if we can't determine the payload structure + payload_size = 1024 # Assume 1KB payload + + packet_size = header_size + payload_size + self.bytes_received += packet_size + + self.packet_count += 1 + + # Sequence number checking for packet loss/ordering + if self.seq_number is None: + self.seq_number = header.seq_number + else: + expected = (self.seq_number + 1) % (2**16) + if header.seq_number != expected: + if header.seq_number > expected: + # Packet loss scenario + lost = (header.seq_number - expected) % (2**16) + self.dropped_packets += lost + print( + f"Packet loss detected: expected {expected}, got {header.seq_number}, lost {lost} packets" + ) + else: + # Out of order scenario + self.out_of_order_packets += 1 + print( + f"Out of order packet: expected {expected}, got {header.seq_number}" + ) + + self.seq_number = header.seq_number + + # Limit history sizes to prevent memory growth + if len(self.packet_arrival_times) > self.history_size: + self.packet_arrival_times = self.packet_arrival_times[-self.history_size :] + if len(self.packet_intervals) > self.history_size: + self.packet_intervals = self.packet_intervals[-self.history_size :] + + return True + + def calculate_bandwidth(self): + """Calculate current bandwidth in Mbps""" + current_time = time.time() + time_delta = current_time - self.last_bandwidth_time + + # Only calculate bandwidth if enough time has passed (avoid division by small numbers) + if time_delta >= 0.5: # Calculate bandwidth every 500ms minimum + bytes_delta = self.bytes_received - self.last_bytes_received + + # Calculate bandwidth in Mbps (megabits per second) + # bytes_delta * 8 converts bytes to bits, / 1_000_000 converts to megabits + bandwidth_mbps = (bytes_delta * 8) / (time_delta * 1_000_000) + + self.bandwidth_samples.append(bandwidth_mbps) + # Limit bandwidth history + if len(self.bandwidth_samples) > self.history_size: + self.bandwidth_samples = self.bandwidth_samples[-self.history_size :] + + self.last_bandwidth_time = current_time + self.last_bytes_received = self.bytes_received + + return bandwidth_mbps + + # If not enough time has passed, return the last calculated bandwidth or 0 + return self.bandwidth_samples[-1] if self.bandwidth_samples else 0 + + def calculate_jitter(self): + """Calculate jitter in milliseconds""" + if len(self.packet_intervals) < 2: + return 0 + + # Jitter is the mean deviation in packet interval times + # RFC 3550 defines jitter as the mean deviation of the packet spacing + jitter = 0 + + # Calculate mean deviation + for i in range(1, len(self.packet_intervals)): + jitter += abs(self.packet_intervals[i] - self.packet_intervals[i - 1]) + + return jitter / (len(self.packet_intervals) - 1) + + def print_periodic_stats(self, force=False): + """ + Print periodic statistics if interval has elapsed or if forced + + Args: + force: Force printing stats regardless of interval + + Returns: + bool: True if stats were printed, False otherwise + """ + current_time = time.time() + stats_interval = current_time - self.last_stats_time + + if ( + not force + and self.packet_count % self.print_interval != 0 + and stats_interval < 5.0 + ): + return False + + # Calculate summary statistics + run_time = current_time - self.start_time + + # Calculate bandwidth + bandwidth_mbps = self.calculate_bandwidth() + + # Calculate jitter + jitter_ms = self.calculate_jitter() + + # Update stats dictionary + self.stats["total_packets"] = self.packet_count + self.stats["dropped_packets"] = self.dropped_packets + self.stats["out_of_order_packets"] = self.out_of_order_packets + self.stats["drop_rate"] = ( + (self.dropped_packets / (self.packet_count + self.dropped_packets)) * 100 + if self.packet_count + self.dropped_packets > 0 + else 0 + ) + self.stats["bandwidth_mbps"] = bandwidth_mbps + self.stats["jitter_ms"] = jitter_ms + self.stats["run_time_seconds"] = run_time + + # Pretty print stats + print("\n--- Packet Statistics ---") + print(f"Run time: {run_time:.2f} seconds") + print(f"Total packets: {self.packet_count}") + print(f"Effective bandwidth: {bandwidth_mbps:.2f} Mbps") + print( + f"Dropped packets: {self.dropped_packets} ({self.stats['drop_rate']:.2f}%)" + ) + print(f"Out-of-order packets: {self.out_of_order_packets}") + print(f"Jitter: {jitter_ms:.3f} ms") + + throughput_packets = self.packet_count / run_time if run_time > 0 else 0 + print(f"Packet throughput: {throughput_packets:.2f} packets/second") + + print("-----------------------\n") + + # Reset for next interval + self.last_stats_time = current_time + return True + + def print_final_stats(self): + """Print final comprehensive statistics""" + end_time = time.time() + total_runtime = end_time - self.start_time + + # Calculate final bandwidth using total bytes received + avg_bandwidth_mbps = (self.bytes_received * 8) / (total_runtime * 1_000_000) + + print("\n=== Final Statistics ===") + print(f"Total runtime: {total_runtime:.2f} seconds") + print(f"Total packets received: {self.packet_count}") + print(f"Total bytes received: {self.bytes_received:,} bytes") + print(f"Average bandwidth: {avg_bandwidth_mbps:.2f} Mbps") + print( + f"Packet throughput: {self.packet_count / total_runtime:.2f} packets/second" + ) + print( + f"Dropped packets: {self.dropped_packets} ({(self.dropped_packets / (self.packet_count + self.dropped_packets)) * 100:.2f}% loss)" + ) + print(f"Out-of-order packets: {self.out_of_order_packets}") + + if self.packet_intervals: + avg_jitter = self.calculate_jitter() + print(f"Average jitter: {avg_jitter:.3f} ms") + + # Print max bandwidth observed + if self.bandwidth_samples: + max_bandwidth = max(self.bandwidth_samples) + print(f"Peak bandwidth: {max_bandwidth:.2f} Mbps") + + print("=====================") + + def get_stats_dict(self): + """Return a dictionary with all current statistics""" + # Update with latest values before returning + self.calculate_bandwidth() + + stats = self.stats.copy() + + # Add additional derived statistics + stats["peak_bandwidth_mbps"] = ( + max(self.bandwidth_samples) if self.bandwidth_samples else 0 + ) + stats["avg_bandwidth_mbps"] = ( + sum(self.bandwidth_samples) / len(self.bandwidth_samples) + if self.bandwidth_samples + else 0 + ) + stats["total_bytes_received"] = self.bytes_received + stats["packet_throughput"] = ( + self.packet_count / self.stats["run_time_seconds"] + if self.stats["run_time_seconds"] > 0 + else 0 + ) + + return stats + + def add_commands(subparsers): a = subparsers.add_parser("read", help="Read from a device's StreamOut node") a.add_argument("uri", type=str, help="IP address of Synapse device") @@ -216,49 +535,46 @@ def read(args): if args.duration else "Streaming data indefinitely" ) - with console.status( - status_title, spinner="bouncingBall", spinner_style="green" - ) as status: - q = queue.Queue() - plot_q = queue.Queue() if args.plot else None - - threads = [] - stop = threading.Event() - if args.bin: - threads.append( - threading.Thread( - target=_binary_writer, args=(stop, q, num_ch, output_base) - ) - ) - else: - threads.append( - threading.Thread(target=_data_writer, args=(stop, q, output_base)) - ) + console.print(status_title) - if args.plot: - threads.append( - threading.Thread( - target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch) - ) + q = queue.Queue() + plot_q = queue.Queue() if args.plot else None + + threads = [] + stop = threading.Event() + if args.bin: + threads.append( + threading.Thread(target=_binary_writer, args=(stop, q, num_ch, output_base)) + ) + else: + threads.append( + threading.Thread(target=_data_writer, args=(stop, q, output_base)) + ) + + if args.plot: + threads.append( + threading.Thread( + target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch) ) + ) + for thread in threads: + thread.start() + + try: + read_packets(stream_out, q, plot_q, args.duration) + except KeyboardInterrupt: + pass + finally: + print("Stopping read...") + stop.set() for thread in threads: - thread.start() + thread.join() - try: - read_packets(stream_out, q, plot_q, args.duration) - except KeyboardInterrupt: - pass - finally: - print("Stopping read...") - stop.set() - for thread in threads: - thread.join() - - if args.config: - console.print("Stopping device...") - if not device.stop(): - console.print("[red]Failed to stop device") - console.print("Stopped") + if args.config: + console.print("Stopping device...") + if not device.stop(): + console.print("[red]Failed to stop device") + console.print("Stopped") console.print("[bold green]Streaming complete") console.print("[cyan]================") @@ -281,7 +597,7 @@ def read_packets( seq_number = None dropped_packets = 0 start = time.time() - print_interval = 1000 + # print_interval = 1000 print( f"Reading packets for duration {duration} seconds" @@ -289,10 +605,14 @@ def read_packets( else "Reading packets..." ) + monitor = PacketMonitor() + monitor.start_monitoring() + while True: header, data = node.read() if not data: continue + monitor.process_packet(header, data) packet_count += 1 @@ -302,7 +622,6 @@ def read_packets( else: expected = (seq_number + 1) % (2**16) if header.seq_number != expected: - print(f"Seq out of order: got {header.seq_number}, expected {expected}") dropped_packets += header.seq_number - expected seq_number = header.seq_number @@ -311,19 +630,23 @@ def read_packets( if plot_q: plot_q.put(copy.deepcopy(data)) - if packet_count == 1: - print(f"First packet received at {time.time() - start:.2f} seconds") + monitor.print_periodic_stats() - if packet_count % print_interval == 0: - print( - f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - ) + # if packet_count == 1: + # print(f"First packet received at {time.time() - start:.2f} seconds") + + # if packet_count % print_interval == 0: + # print( + # f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" + # ) if duration and (time.time() - start) > duration: break - print( - f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - ) + monitor.print_final_stats() + + # print( + # f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" + # ) def _binary_writer(stop, q, num_ch, output_base): From c9d5b6de3ab001efb5e07854d3ff88d5605c9853 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 5 Mar 2025 14:30:26 -0800 Subject: [PATCH 02/12] Nicer prints --- synapse-api | 2 +- synapse/cli/streaming.py | 334 ++++------------------------- synapse/client/nodes/stream_out.py | 130 ++++++----- 3 files changed, 116 insertions(+), 350 deletions(-) diff --git a/synapse-api b/synapse-api index 6d7898f..62ddda4 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 6d7898fd29f734acd7a63bda98c24ea48f768768 +Subproject commit 62ddda4e36a3f0f3c4608e3a3c30100d9716e0dd diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index a1efcd3..39b1ff5 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -7,7 +7,7 @@ from typing import Optional from operator import itemgetter import copy -import math +import sys from google.protobuf.json_format import Parse, MessageToJson @@ -24,20 +24,7 @@ class PacketMonitor: - """ - A class to monitor and analyze packet streaming statistics including: - - Effective receive bandwidth (Mbps) - - Jitter (variation in packet arrival intervals) - - Total packets received - - Packet loss detection - - Out-of-order packet detection - """ - - def __init__(self, print_interval=100, history_size=1000): - # Configuration - self.print_interval = print_interval - self.history_size = history_size - + def __init__(self): # Packet tracking self.packet_count = 0 self.seq_number = None @@ -46,11 +33,11 @@ def __init__(self, print_interval=100, history_size=1000): # Timing metrics self.start_time = None - self.last_stats_time = None self.first_packet_time = None # Bandwidth tracking self.bytes_received = 0 + self.bytes_received_in_interval = 0 self.bandwidth_samples = [] self.last_bandwidth_time = None self.last_bytes_received = 0 @@ -59,286 +46,74 @@ def __init__(self, print_interval=100, history_size=1000): self.packet_arrival_times = [] self.packet_intervals = [] - # Stats dictionary - self.stats = { - "total_packets": 0, - "dropped_packets": 0, - "out_of_order_packets": 0, - "drop_rate": 0.0, - "bandwidth_mbps": 0.0, - "jitter_ms": 0.0, - "run_time_seconds": 0.0, - } - def start_monitoring(self): """Initialize monitoring timers""" self.start_time = time.time() self.last_stats_time = self.start_time self.last_bandwidth_time = self.start_time - def process_packet(self, header, data): - """ - Process a single packet and update statistics - - Args: - header: Packet header containing sequence number - data: Packet data - - Returns: - bool: True if packet was processed, False otherwise - """ + def process_packet(self, header, data, bytes_read): if not data: return False - packet_received_time = time.time() - # Record first packet time if self.packet_count == 0: self.first_packet_time = packet_received_time print( - f"First packet received at {packet_received_time - self.start_time:.3f} seconds" + f"First packet received after {packet_received_time - self.start_time:.3f} seconds" ) - - # Track packet arrival for jitter calculation - self.packet_arrival_times.append(packet_received_time) - - # Calculate packet intervals for jitter - if len(self.packet_arrival_times) > 1: - interval = ( - self.packet_arrival_times[-1] - self.packet_arrival_times[-2] - ) * 1000.0 # Convert to ms - self.packet_intervals.append(interval) - - # Update byte count for bandwidth calculation - # Calculate exact size based on the NDTP message structure - - # Header size from NDTPHeader.STRUCT.size (1 + 1 + 8 + 2 = 12 bytes) - # >BBQH: > (big-endian), B (unsigned char, 1 byte), B (unsigned char, 1 byte), - # Q (unsigned long long, 8 bytes), H (unsigned short, 2 bytes) - header_size = 12 # We know this from the struct format - - # Payload size calculation based on ElectricalBroadbandData - payload_size = 0 - - if hasattr(data, "samples"): - # Add bytes for each channel's samples - for ch_id, samples in data.samples: - # Calculate sample size from bit_width - if hasattr(data, "bit_width"): - bytes_per_sample = math.ceil(data.bit_width / 8) - else: - bytes_per_sample = 4 # Default to 4 bytes if bit_width unknown - - # Calculate this channel's data size - channel_data_size = len(samples) * bytes_per_sample - - # Add overhead for channel metadata (ID, length indicator, etc.) - # This is an estimate; adjust based on actual protocol - channel_overhead = 8 # Typically a few bytes for channel ID and length - - payload_size += channel_data_size + channel_overhead - - # Add general payload overhead (type indicators, length fields, etc.) - # This is an estimate; adjust based on actual protocol - payload_size += 16 # General payload metadata - else: - # Fallback if we can't determine the payload structure - payload_size = 1024 # Assume 1KB payload - - packet_size = header_size + payload_size - self.bytes_received += packet_size - + # We got a new packet self.packet_count += 1 - + self.bytes_received += bytes_read + self.bytes_received_in_interval += bytes_read # Sequence number checking for packet loss/ordering if self.seq_number is None: self.seq_number = header.seq_number else: expected = (self.seq_number + 1) % (2**16) if header.seq_number != expected: + # Lost some packets if header.seq_number > expected: - # Packet loss scenario lost = (header.seq_number - expected) % (2**16) self.dropped_packets += lost - print( - f"Packet loss detected: expected {expected}, got {header.seq_number}, lost {lost} packets" - ) else: - # Out of order scenario self.out_of_order_packets += 1 - print( - f"Out of order packet: expected {expected}, got {header.seq_number}" - ) - self.seq_number = header.seq_number - # Limit history sizes to prevent memory growth - if len(self.packet_arrival_times) > self.history_size: - self.packet_arrival_times = self.packet_arrival_times[-self.history_size :] - if len(self.packet_intervals) > self.history_size: - self.packet_intervals = self.packet_intervals[-self.history_size :] - return True - def calculate_bandwidth(self): - """Calculate current bandwidth in Mbps""" - current_time = time.time() - time_delta = current_time - self.last_bandwidth_time - - # Only calculate bandwidth if enough time has passed (avoid division by small numbers) - if time_delta >= 0.5: # Calculate bandwidth every 500ms minimum - bytes_delta = self.bytes_received - self.last_bytes_received - - # Calculate bandwidth in Mbps (megabits per second) - # bytes_delta * 8 converts bytes to bits, / 1_000_000 converts to megabits - bandwidth_mbps = (bytes_delta * 8) / (time_delta * 1_000_000) - - self.bandwidth_samples.append(bandwidth_mbps) - # Limit bandwidth history - if len(self.bandwidth_samples) > self.history_size: - self.bandwidth_samples = self.bandwidth_samples[-self.history_size :] - - self.last_bandwidth_time = current_time - self.last_bytes_received = self.bytes_received - - return bandwidth_mbps - - # If not enough time has passed, return the last calculated bandwidth or 0 - return self.bandwidth_samples[-1] if self.bandwidth_samples else 0 - - def calculate_jitter(self): - """Calculate jitter in milliseconds""" - if len(self.packet_intervals) < 2: - return 0 - - # Jitter is the mean deviation in packet interval times - # RFC 3550 defines jitter as the mean deviation of the packet spacing - jitter = 0 + def print_stats(self): + # Use carriage return to update on the same line + sys.stdout.write("\r") - # Calculate mean deviation - for i in range(1, len(self.packet_intervals)): - jitter += abs(self.packet_intervals[i] - self.packet_intervals[i - 1]) - - return jitter / (len(self.packet_intervals) - 1) - - def print_periodic_stats(self, force=False): - """ - Print periodic statistics if interval has elapsed or if forced - - Args: - force: Force printing stats regardless of interval - - Returns: - bool: True if stats were printed, False otherwise - """ - current_time = time.time() - stats_interval = current_time - self.last_stats_time - - if ( - not force - and self.packet_count % self.print_interval != 0 - and stats_interval < 5.0 - ): - return False - - # Calculate summary statistics - run_time = current_time - self.start_time - - # Calculate bandwidth - bandwidth_mbps = self.calculate_bandwidth() + # Drop calculation + drop_percent = (self.dropped_packets / max(1, self.packet_count)) * 100.0 + sys.stdout.write( + f"Dropped: {self.dropped_packets}/{self.packet_count} ({drop_percent:.1f}%) | " + ) - # Calculate jitter - jitter_ms = self.calculate_jitter() + # Bandwidth calcs + now = time.time() + dt_sec = now - self.last_bandwidth_time + if dt_sec > 0: + bytes_per_second = self.bytes_received_in_interval / dt_sec + megabits_per_second = (bytes_per_second * 8) / 1_000_000 + sys.stdout.write(f"Mbit/sec: {megabits_per_second:.1f} | ") - # Update stats dictionary - self.stats["total_packets"] = self.packet_count - self.stats["dropped_packets"] = self.dropped_packets - self.stats["out_of_order_packets"] = self.out_of_order_packets - self.stats["drop_rate"] = ( - (self.dropped_packets / (self.packet_count + self.dropped_packets)) * 100 - if self.packet_count + self.dropped_packets > 0 - else 0 - ) - self.stats["bandwidth_mbps"] = bandwidth_mbps - self.stats["jitter_ms"] = jitter_ms - self.stats["run_time_seconds"] = run_time - - # Pretty print stats - print("\n--- Packet Statistics ---") - print(f"Run time: {run_time:.2f} seconds") - print(f"Total packets: {self.packet_count}") - print(f"Effective bandwidth: {bandwidth_mbps:.2f} Mbps") - print( - f"Dropped packets: {self.dropped_packets} ({self.stats['drop_rate']:.2f}%)" - ) - print(f"Out-of-order packets: {self.out_of_order_packets}") - print(f"Jitter: {jitter_ms:.3f} ms") + # Out of order + sys.stdout.write(f"Out of Order: {self.out_of_order_packets}") - throughput_packets = self.packet_count / run_time if run_time > 0 else 0 - print(f"Packet throughput: {throughput_packets:.2f} packets/second") + # Flush the output to ensure it's displayed + sys.stdout.flush() - print("-----------------------\n") + # Reset for the next run + self.bytes_received_in_interval = 0 + self.last_bandwidth_time = now - # Reset for next interval - self.last_stats_time = current_time return True def print_final_stats(self): - """Print final comprehensive statistics""" - end_time = time.time() - total_runtime = end_time - self.start_time - - # Calculate final bandwidth using total bytes received - avg_bandwidth_mbps = (self.bytes_received * 8) / (total_runtime * 1_000_000) - - print("\n=== Final Statistics ===") - print(f"Total runtime: {total_runtime:.2f} seconds") - print(f"Total packets received: {self.packet_count}") - print(f"Total bytes received: {self.bytes_received:,} bytes") - print(f"Average bandwidth: {avg_bandwidth_mbps:.2f} Mbps") - print( - f"Packet throughput: {self.packet_count / total_runtime:.2f} packets/second" - ) - print( - f"Dropped packets: {self.dropped_packets} ({(self.dropped_packets / (self.packet_count + self.dropped_packets)) * 100:.2f}% loss)" - ) - print(f"Out-of-order packets: {self.out_of_order_packets}") - - if self.packet_intervals: - avg_jitter = self.calculate_jitter() - print(f"Average jitter: {avg_jitter:.3f} ms") - - # Print max bandwidth observed - if self.bandwidth_samples: - max_bandwidth = max(self.bandwidth_samples) - print(f"Peak bandwidth: {max_bandwidth:.2f} Mbps") - - print("=====================") - - def get_stats_dict(self): - """Return a dictionary with all current statistics""" - # Update with latest values before returning - self.calculate_bandwidth() - - stats = self.stats.copy() - - # Add additional derived statistics - stats["peak_bandwidth_mbps"] = ( - max(self.bandwidth_samples) if self.bandwidth_samples else 0 - ) - stats["avg_bandwidth_mbps"] = ( - sum(self.bandwidth_samples) / len(self.bandwidth_samples) - if self.bandwidth_samples - else 0 - ) - stats["total_bytes_received"] = self.bytes_received - stats["packet_throughput"] = ( - self.packet_count / self.stats["run_time_seconds"] - if self.stats["run_time_seconds"] > 0 - else 0 - ) - - return stats + print("final") def add_commands(subparsers): @@ -477,9 +252,6 @@ def read(args): return console.print("[bold green]Device configured successfully") - if not device.configure(config): - raise ValueError("Failed to configure device") - if info.status.state != DeviceState.kRunning: print("Starting device...") if not device.start(): @@ -593,11 +365,8 @@ def read_packets( duration: Optional[int] = None, num_ch: int = 32, ): - packet_count = 0 - seq_number = None - dropped_packets = 0 start = time.time() - # print_interval = 1000 + last_print_time = start print( f"Reading packets for duration {duration} seconds" @@ -605,49 +374,32 @@ def read_packets( else "Reading packets..." ) + # Keep track of our statistics monitor = PacketMonitor() monitor.start_monitoring() - while True: - header, data = node.read() - if not data: + synapse_data, bytes_read = node.read() + if synapse_data is None or bytes_read == 0: + print("Could not read data from node") continue - monitor.process_packet(header, data) - - packet_count += 1 - - # Detect dropped packets via seq_number - if seq_number is None: - seq_number = header.seq_number - else: - expected = (seq_number + 1) % (2**16) - if header.seq_number != expected: - dropped_packets += header.seq_number - expected - seq_number = header.seq_number + header, data = synapse_data + monitor.process_packet(header, data, bytes_read) # Always add the data to the writer queues q.put(data) if plot_q: plot_q.put(copy.deepcopy(data)) - monitor.print_periodic_stats() + # Print every second our current statistics + if time.time() - last_print_time > 1.0: + monitor.print_stats() + last_print_time = time.time() - # if packet_count == 1: - # print(f"First packet received at {time.time() - start:.2f} seconds") - - # if packet_count % print_interval == 0: - # print( - # f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - # ) if duration and (time.time() - start) > duration: break monitor.print_final_stats() - # print( - # f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - # ) - def _binary_writer(stop, q, num_ch, output_base): filename = f"{output_base}.dat" diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index 82043ee..c026225 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -1,9 +1,7 @@ -import os import logging import socket -import struct import traceback -from typing import Optional +from typing import Optional, Tuple from synapse.api.datatype_pb2 import DataType from synapse.api.node_pb2 import NodeConfig, NodeType @@ -16,85 +14,76 @@ SynapseData, ) +DEFAULT_STREAM_OUT_PORT = 50038 + + +# Try to get the current user's ip for setting the destination address +def get_client_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # This won't actually establish a connection, but helps us figure out the ip + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + except Exception as e: + logging.error(f"Failed to get client IP: {e}") + return None + finally: + s.close() + return local_ip + class StreamOut(Node): type = NodeType.kStreamOut - def __init__(self, label=None, multicast_group=None): + def __init__(self, label=None, destination_address=None, destination_port=None): self.__socket = None self.__label = label - self.__multicast_group: Optional[str] = multicast_group + self.__destination_address = destination_address + self.__destination_port = destination_port - def read(self) -> Optional[SynapseData]: + def read(self) -> Tuple[Optional[SynapseData], int]: if self.__socket is None: if self.open_socket() is None: return None data, _ = self.__socket.recvfrom(8192) - return self._unpack(data) + bytes_read = len(data) + return self._unpack(data), bytes_read def open_socket(self): - print("Opening socket") + logging.info( + f"Opening socket at {self.__destination_address}:{self.__destination_port}" + ) if self.device is None: logging.error("Node has no device") return None - node_socket = next( - (s for s in self.device.sockets if s.node_id == self.id), None - ) - if node_socket is None: - logging.error("Couldnt find socket for node") - return None - - bind = node_socket.bind.split(":") - if len(bind) != 2: - logging.error("Invalid bind address") - return None - - addr = ( - self.__multicast_group - if self.__multicast_group - else self.device.uri.split(":")[0] - ) - if addr is None: - logging.error("Invalid bind address") - return None - - port = int(bind[1]) - if not port: - logging.error(f"Invalid bind port. Bind string: {bind}") - return None - - logging.info(f"Opening UDP multicast socket to {addr}:{port}") - + # UDP socket self.__socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP ) + + # Allow reuse for easy restart self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024 # 5MB - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE_BYTES) + # Try to set a large recv buffer + SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024 # 5MB + self.__socket.setsockopt( + socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE_BYTES + ) recvbuf = self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) - if recvbuf < SOCKET_BUFSIZE_BYTES: - logging.warning(f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit.") - - if os.name != "nt": - logging.info(f"binding to {addr}:{port}") - self.__socket.bind((addr, port)) - else: - logging.info(f"binding to {port}") - self.__socket.bind(('', port)) - - if self.__multicast_group: - logging.info(f"joining multicast group {self.__multicast_group}") - host = socket.gethostbyname(socket.gethostname()) - mreq = socket.inet_aton(addr) + socket.inet_aton(host) - - mreq = struct.pack("4sL", socket.inet_aton(addr), socket.INADDR_ANY) - self.__socket.setsockopt( - socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq + logging.warning( + f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit." ) + # Bind to the destination address (our ip) and port + try: + self.__socket.bind((self.__destination_address, self.__destination_port)) + except Exception as e: + logging.error( + f"Failed to bind to {self.__destination_address}:{self.__destination_port}: {e}" + ) + return None return self.__socket def _to_proto(self): @@ -102,7 +91,8 @@ def _to_proto(self): o = StreamOutConfig() o.label = self.__label - o.multicast_group = self.__multicast_group + o.udp_unicast.destination_address = self.__destination_address + o.udp_unicast.destination_port = self.__destination_port n.stream_out.CopyFrom(o) return n @@ -133,4 +123,28 @@ def _from_proto(proto: Optional[StreamOutConfig]): if not isinstance(proto, StreamOutConfig): raise ValueError("proto is not of type StreamOutConfig") - return StreamOut(proto.label, proto.multicast_group) + # We currently only support udp unicast + selected_transport = proto.WhichOneof("transport") + + if selected_transport is None: + # Set the defaults + destination_address = get_client_ip() + destination_port = DEFAULT_STREAM_OUT_PORT + logging.info( + f"Requesting StreamOut to: {destination_address}:{destination_port}" + ) + return StreamOut(proto.label, destination_address, destination_port) + elif selected_transport == "udp_unicast": + dest_address = proto.udp_unicast.destination_address + dest_port = proto.udp_unicast.destination_port + if dest_address == "": + dest_address = get_client_ip() + if dest_port == 0: + dest_port = DEFAULT_STREAM_OUT_PORT + logging.info( + f"Using user provided StreamOut destination: {dest_address}:{dest_port}" + ) + return StreamOut(proto.label, dest_address, dest_port) + else: + logging.error(f"Unsupported transport: {selected_transport}") + return None From 8df8ff0807daa19b7d096eca533d93e7daaecc2e Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 5 Mar 2025 14:35:37 -0800 Subject: [PATCH 03/12] Added jitter --- synapse/cli/streaming.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 39b1ff5..7069b7d 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -43,8 +43,9 @@ def __init__(self): self.last_bytes_received = 0 # Jitter tracking - self.packet_arrival_times = [] - self.packet_intervals = [] + self.last_packet_time = None + self.last_jitter = 0 + self.avg_jitter = 0 def start_monitoring(self): """Initialize monitoring timers""" @@ -59,9 +60,23 @@ def process_packet(self, header, data, bytes_read): # Record first packet time if self.packet_count == 0: self.first_packet_time = packet_received_time + self.last_packet_time = packet_received_time print( - f"First packet received after {packet_received_time - self.start_time:.3f} seconds" + f"First packet received after {packet_received_time - self.start_time:.3f} seconds\n\n" ) + else: + # Calculate jitter + interval = packet_received_time - self.last_packet_time + # Update jitter using RFC 3550 algorithm (smoother than just max-min) + # https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.8 + if self.packet_count > 1: + jitter_diff = abs(interval - self.last_jitter) + self.avg_jitter += (jitter_diff - self.avg_jitter) / 16 + + # Save current values for next calculation + self.last_jitter = interval + self.last_packet_time = packet_received_time + # We got a new packet self.packet_count += 1 self.bytes_received += bytes_read @@ -100,6 +115,10 @@ def print_stats(self): megabits_per_second = (bytes_per_second * 8) / 1_000_000 sys.stdout.write(f"Mbit/sec: {megabits_per_second:.1f} | ") + # Jitter (in milliseconds) + jitter_ms = self.avg_jitter * 1000 # Convert to ms + sys.stdout.write(f"Jitter: {jitter_ms:.2f} ms | ") + # Out of order sys.stdout.write(f"Out of Order: {self.out_of_order_packets}") From 405aa1c4d770546a6b4ae1c3e08356f30c604bb9 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 5 Mar 2025 15:30:05 -0800 Subject: [PATCH 04/12] Nicer outputs, show runtime --- synapse/cli/streaming.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 7069b7d..cb84f0e 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -100,29 +100,43 @@ def process_packet(self, header, data, bytes_read): def print_stats(self): # Use carriage return to update on the same line sys.stdout.write("\r") + try: + terminal_width = os.get_terminal_size().columns + except (AttributeError, OSError): + # Fallback if not available + terminal_width = 80 + + sys.stdout.write(" " * terminal_width) + sys.stdout.write("\r") + + now = time.time() + stats = f"Runtime {now - self.start_time:.1f}s | " # Drop calculation drop_percent = (self.dropped_packets / max(1, self.packet_count)) * 100.0 - sys.stdout.write( - f"Dropped: {self.dropped_packets}/{self.packet_count} ({drop_percent:.1f}%) | " - ) + stats += f"Dropped: {self.dropped_packets}/{self.packet_count} ({drop_percent:.1f}%) | " # Bandwidth calcs - now = time.time() + dt_sec = now - self.last_bandwidth_time if dt_sec > 0: bytes_per_second = self.bytes_received_in_interval / dt_sec megabits_per_second = (bytes_per_second * 8) / 1_000_000 - sys.stdout.write(f"Mbit/sec: {megabits_per_second:.1f} | ") + stats += f"Mbit/sec: {megabits_per_second:.1f} | " # Jitter (in milliseconds) jitter_ms = self.avg_jitter * 1000 # Convert to ms - sys.stdout.write(f"Jitter: {jitter_ms:.2f} ms | ") + stats += f"Jitter: {jitter_ms:.2f} ms | " # Out of order - sys.stdout.write(f"Out of Order: {self.out_of_order_packets}") + stats += f"Out of Order: {self.out_of_order_packets}" # Flush the output to ensure it's displayed + if len(stats) > terminal_width - 1: + stats = stats[: terminal_width - 4] + "..." + + # Write the stats and flush + sys.stdout.write(stats) sys.stdout.flush() # Reset for the next run From cdcdda9cb0b60c5d158fb755789bc39baff7fd93 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 5 Mar 2025 16:42:34 -0800 Subject: [PATCH 05/12] fixed loading of protofile --- synapse/cli/streaming.py | 13 ++++++------ synapse/client/node.py | 1 - synapse/client/nodes/stream_out.py | 32 +++++++++++++++++++++++++----- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index cb84f0e..a98fbb6 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -145,9 +145,6 @@ def print_stats(self): return True - def print_final_stats(self): - print("final") - def add_commands(subparsers): a = subparsers.add_parser("read", help="Read from a device's StreamOut node") @@ -203,9 +200,9 @@ def read(args): return else: console.print(f"[bold yellow]Overwriting existing files in {output_base}") - + print("Before a device") device = syn.Device(args.uri, args.verbose) - + print("After a device") with console.status( "Reading from Synapse Device", spinner="bouncingBall", spinner_style="green" ) as status: @@ -221,14 +218,17 @@ def read(args): console.print("\n") status.update("Loading recording configuration") + print("Loading our config") # Keep track of the sample rate in case we need to plot sample_rate_hz = 32000 if args.config: + print("Loading config from file") config = load_config_from_file(args.config) if not config: console.print(f"[bold red]Failed to load config from {args.config}") return + print("Before finding a streamout") stream_out = next( (n for n in config.nodes if n.type == NodeType.kStreamOut), None ) @@ -265,6 +265,7 @@ def read(args): with console.status( "Configuring device", spinner="bouncingBall", spinner_style="green" ) as status: + print("Before configure with status") configure_status = device.configure_with_status(config) if configure_status is None: console.print( @@ -431,8 +432,6 @@ def read_packets( if duration and (time.time() - start) > duration: break - monitor.print_final_stats() - def _binary_writer(stop, q, num_ch, output_base): filename = f"{output_base}.dat" diff --git a/synapse/client/node.py b/synapse/client/node.py index 8b76e5e..c7041ee 100644 --- a/synapse/client/node.py +++ b/synapse/client/node.py @@ -29,7 +29,6 @@ def from_proto(cls, proto: NodeConfig): oneof = proto.WhichOneof("config") if oneof and hasattr(proto, oneof): config = getattr(proto, oneof) - node = cls._from_proto(config) node.id = proto.id return node diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index c026225..2494851 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -38,8 +38,17 @@ class StreamOut(Node): def __init__(self, label=None, destination_address=None, destination_port=None): self.__socket = None self.__label = label - self.__destination_address = destination_address - self.__destination_port = destination_port + + # If we have been passed a None for destination address, try to resolve it + if not destination_address: + self.__destination_address = get_client_ip() + else: + self.__destination_address = destination_address + + if not destination_port: + self.__destination_port = DEFAULT_STREAM_OUT_PORT + else: + self.__destination_port = destination_port def read(self) -> Tuple[Optional[SynapseData], int]: if self.__socket is None: @@ -90,9 +99,21 @@ def _to_proto(self): n = NodeConfig() o = StreamOutConfig() - o.label = self.__label - o.udp_unicast.destination_address = self.__destination_address - o.udp_unicast.destination_port = self.__destination_port + + print("Calling _to_proto") + print(f"Dest address: {self.__destination_address}") + if self.__label: + o.label = self.__label + else: + o.label = "Stream Out" + + print("Checking for destination address") + + if self.__destination_address: + o.udp_unicast.destination_address = self.__destination_address + + if self.__destination_port: + o.udp_unicast.destination_port = self.__destination_port n.stream_out.CopyFrom(o) return n @@ -117,6 +138,7 @@ def _unpack(self, data: bytes) -> SynapseData: @staticmethod def _from_proto(proto: Optional[StreamOutConfig]): + print(f"Got a {proto} for _from_proto") if proto is None: return StreamOut() From 8a44e7d81c88ae334109149e8e84755eb6be3505 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 10:31:27 -0700 Subject: [PATCH 06/12] Remove debug logs --- synapse/cli/streaming.py | 6 ------ synapse/client/nodes/stream_out.py | 6 ------ 2 files changed, 12 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index a98fbb6..aea1ae0 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -200,9 +200,7 @@ def read(args): return else: console.print(f"[bold yellow]Overwriting existing files in {output_base}") - print("Before a device") device = syn.Device(args.uri, args.verbose) - print("After a device") with console.status( "Reading from Synapse Device", spinner="bouncingBall", spinner_style="green" ) as status: @@ -218,17 +216,14 @@ def read(args): console.print("\n") status.update("Loading recording configuration") - print("Loading our config") # Keep track of the sample rate in case we need to plot sample_rate_hz = 32000 if args.config: - print("Loading config from file") config = load_config_from_file(args.config) if not config: console.print(f"[bold red]Failed to load config from {args.config}") return - print("Before finding a streamout") stream_out = next( (n for n in config.nodes if n.type == NodeType.kStreamOut), None ) @@ -265,7 +260,6 @@ def read(args): with console.status( "Configuring device", spinner="bouncingBall", spinner_style="green" ) as status: - print("Before configure with status") configure_status = device.configure_with_status(config) if configure_status is None: console.print( diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index 2494851..a567124 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -99,16 +99,11 @@ def _to_proto(self): n = NodeConfig() o = StreamOutConfig() - - print("Calling _to_proto") - print(f"Dest address: {self.__destination_address}") if self.__label: o.label = self.__label else: o.label = "Stream Out" - print("Checking for destination address") - if self.__destination_address: o.udp_unicast.destination_address = self.__destination_address @@ -138,7 +133,6 @@ def _unpack(self, data: bytes) -> SynapseData: @staticmethod def _from_proto(proto: Optional[StreamOutConfig]): - print(f"Got a {proto} for _from_proto") if proto is None: return StreamOut() From e1e20982d9e14fd2b495535fd44ef4b61410a5a0 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 10:35:10 -0700 Subject: [PATCH 07/12] Fixing the test code --- test_config.json | 1 - 1 file changed, 1 deletion(-) diff --git a/test_config.json b/test_config.json index e63c330..ed32943 100644 --- a/test_config.json +++ b/test_config.json @@ -4,7 +4,6 @@ "type": "kStreamOut", "id": 1, "stream_out": { - "multicast_group": "224.0.0.115" } }, { From 449d7acf56c35b06ec50326b55d37e070c12dd83 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 10:52:55 -0700 Subject: [PATCH 08/12] Added an example --- synapse/examples/stream_out.py | 50 ++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/synapse/examples/stream_out.py b/synapse/examples/stream_out.py index 10b1f85..9fc45fd 100644 --- a/synapse/examples/stream_out.py +++ b/synapse/examples/stream_out.py @@ -1,5 +1,8 @@ import synapse as syn import sys +import time + +SIMULATED_PERIPHERAL_ID = 100 if __name__ == "__main__": uri = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1:647" @@ -10,18 +13,21 @@ print("Device info:") print(info) - stream_out = syn.StreamOut(label="my broadband", multicast_group="224.0.0.1") - + # The StreamOut node will automatically set your dest IP and port + stream_out = syn.StreamOut(label="my broadband") + channels = [ syn.Channel( id=channel_num, electrode_id=channel_num * 2, - reference_id=channel_num * 2 + 1 - ) for channel_num in range(32) + reference_id=channel_num * 2 + 1, + ) + for channel_num in range(32) ] - + broadband = syn.BroadbandSource( - peripheral_id=2, + # Use the simulated peripheral (100), or replace with your own + peripheral_id=SIMULATED_PERIPHERAL_ID, sample_rate_hz=30000, bit_width=12, gain=20.0, @@ -31,7 +37,7 @@ low_cutoff_hz=500.0, high_cutoff_hz=6000.0, ) - ) + ), ) config = syn.Config() @@ -46,5 +52,33 @@ assert info is not None, "Couldn't get device info" print("Configured device info:") print(info) - + + should_run = True + total_bytes_read = 0 + start_time = time.time() + last_update_time = start_time + update_interval_sec = 1 + while should_run: + try: + # Wait for data + syn_data, bytes_read = stream_out.read() + if syn_data is None or bytes_read == 0: + print("Failed to read data from node") + continue + # Do something with the data + _header, _data = syn_data + total_bytes_read += bytes_read + + current_time = time.time() + if (current_time - last_update_time) >= update_interval_sec: + sys.stdout.write("\r") + sys.stdout.write( + f"{total_bytes_read} bytes in {time.time() - start_time:.2f} sec" + ) + last_update_time = current_time + except KeyboardInterrupt: + print("Keyboard interrupt detected, stopping") + should_run = False + + print("Stopping device") device.stop() From 2eadd7854f5b473341f6950facfd1746aa3335ff Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 11:16:48 -0700 Subject: [PATCH 09/12] updated server --- README.md | 2 +- synapse/server/entrypoint.py | 23 ++------------ synapse/server/nodes/stream_out.py | 49 +++++++++--------------------- synapse/server/rpc.py | 11 +++---- 4 files changed, 22 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index e3e3cd1..8b40fe7 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ And a toy device `synapse-sim` for local development, --discovery-port DISCOVERY_PORT Port to listen for discovery requests --discovery-addr DISCOVERY_ADDR - Multicast address to listen for discovery requests + UDP address to listen for discovery requests --name NAME Device name --serial SERIAL Device serial number -v, --verbose Enable verbose output diff --git a/synapse/server/entrypoint.py b/synapse/server/entrypoint.py index abdcd00..d2d4ce2 100644 --- a/synapse/server/entrypoint.py +++ b/synapse/server/entrypoint.py @@ -1,18 +1,15 @@ import signal -import struct import socket import sys import asyncio import logging import argparse from coolname import generate_slug - -logging.basicConfig(level=logging.INFO) - from synapse.server.rpc import serve from synapse.server.autodiscovery import BroadcastDiscoveryProtocol from synapse.server.nodes import SERVER_NODE_OBJECT_MAP +logging.basicConfig(level=logging.INFO) ENTRY_DEFAULTS = { "iface_ip": None, @@ -32,11 +29,6 @@ def main( # formatter_class=argparse.ArgumentDefaultsHelpFormatter formatter_class=lambda prog: argparse.HelpFormatter(prog, width=124), ) - parser.add_argument( - "--iface-ip", - help="IP of the network interface to use for multicast traffic", - required=True, - ) parser.add_argument( "--rpc-port", help="Port to listen for RPC requests", @@ -51,7 +43,7 @@ def main( ) parser.add_argument( "--discovery-addr", - help="Multicast address to listen for discovery requests", + help="UDP address to listen for discovery requests", default=defaults["discovery_addr"], ) parser.add_argument("--name", help="Device name", default=defaults["server_name"]) @@ -62,16 +54,6 @@ def main( "-v", "--verbose", action="store_true", help="Enable verbose output" ) args = parser.parse_args() - # verify that network interface is real - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((args.iface_ip, 8000)) - logging.info(f"Binding to {s.getsockname()[0]}...") - except Exception as e: - parser.error("Invalid --iface-ip given, could not bind to interface") - finally: - s.close() if args.verbose: logging.basicConfig(level=logging.DEBUG) @@ -101,7 +83,6 @@ async def async_main(args, node_object_map, peripherals): args.name, args.serial, args.rpc_port, - args.iface_ip, node_object_map, peripherals, ) diff --git a/synapse/server/nodes/stream_out.py b/synapse/server/nodes/stream_out.py index e70ff66..ef95135 100644 --- a/synapse/server/nodes/stream_out.py +++ b/synapse/server/nodes/stream_out.py @@ -1,18 +1,13 @@ import asyncio -import queue import socket -import struct from typing import List from synapse.api.node_pb2 import NodeType from synapse.api.nodes.stream_out_pb2 import StreamOutConfig from synapse.server.nodes.base import BaseNode -from synapse.server.status import Status, StatusCode +from synapse.server.status import Status from synapse.utils.ndtp_types import SynapseData -PORT = 6480 -MULTICAST_TTL = 3 - class StreamOut(BaseNode): __n = 0 @@ -22,8 +17,8 @@ def __init__(self, id): self.__i = StreamOut.__n StreamOut.__n += 1 self.__sequence_number = 0 - self.__iface_ip = None self.__config = None + self.socket_endpoint = None def config(self): c = super().config() @@ -34,46 +29,32 @@ def config(self): return c def configure(self, config: StreamOutConfig) -> Status: - if not self.__iface_ip: - return Status(StatusCode.kUndefinedError, "No interface IP specified") - - if not config.multicast_group: - return Status(StatusCode.kUndefinedError, "No multicast group specified") - self.__config = config + if not config.udp_unicast: + self.logger.error( + "Cannot conifgure StreamOut, only udp unicast is supported" + ) + raise Exception("Only udp unicast is supported for streamout") + self.__socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP ) - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - port = PORT + self.__i + dest_address = self.__config.udp_unicast.destination_address + dest_port = self.__config.udp_unicast.destination_port - self.__socket.bind((self.__iface_ip, port)) - - mreq = struct.pack( - "=4sl", socket.inet_aton(config.multicast_group), socket.INADDR_ANY - ) - self.__socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - self.__socket.setsockopt( - socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, MULTICAST_TTL - ) - - self.socket = [config.multicast_group, port] - self.logger.info( - f"created multicast socket on {self.socket}, group {config.multicast_group}" - ) + self.__socket.bind((dest_address, dest_port)) + self.socket_endpoint = (dest_address, dest_port) + self.logger.info(f"created stream out socket on {self.__socket}") return Status() - def configure_iface_ip(self, iface_ip): - self.__iface_ip = iface_ip - async def run(self): loop = asyncio.get_running_loop() while self.running: - if not self.socket: + if not self.socket_endpoint: self.logger.error("socket not configured") return @@ -85,7 +66,7 @@ async def run(self): None, self.__socket.sendto, packet, - (self.socket[0], self.socket[1]), + self.socket_endpoint, ) def _pack(self, data: SynapseData) -> List[bytes]: diff --git a/synapse/server/rpc.py b/synapse/server/rpc.py index eae2fa1..1c5ead8 100644 --- a/synapse/server/rpc.py +++ b/synapse/server/rpc.py @@ -13,13 +13,11 @@ async def serve( - server_name, device_serial, rpc_port, iface_ip, node_object_map, peripherals + server_name, device_serial, rpc_port, node_object_map, peripherals ) -> None: server = grpc.aio.server() add_SynapseDeviceServicer_to_server( - SynapseServicer( - server_name, device_serial, iface_ip, node_object_map, peripherals - ), + SynapseServicer(server_name, device_serial, node_object_map, peripherals), server, ) server.add_insecure_port("[::]:%d" % rpc_port) @@ -35,10 +33,9 @@ class SynapseServicer(SynapseDeviceServicer): connections = [] nodes = [] - def __init__(self, name, serial, iface_ip, node_object_map, peripherals): + def __init__(self, name, serial, node_object_map, peripherals): self.name = name self.serial = serial - self.iface_ip = iface_ip self.node_object_map = node_object_map self.peripherals = peripherals self.logger = logging.getLogger("server") @@ -170,7 +167,7 @@ def _reconfigure(self, configuration): "Creating %s node(%d)" % (NodeType.Name(node.type), node.id) ) node = self.node_object_map[node.type](node.id) - if node.type in [NodeType.kStreamOut, NodeType.kStreamIn]: + if node.type in [NodeType.kStreamIn]: node.configure_iface_ip(self.iface_ip) status = node.configure(config) From 1dcff15a9130d5e8fd9cb0e4620848b8445de222 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 11:17:10 -0700 Subject: [PATCH 10/12] updated server --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 8b40fe7..bb1183a 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ As well as the base for a device implementation (`synapse/server`), And a toy device `synapse-sim` for local development, % synapse-sim --help - usage: synapse-sim [-h] --iface-ip IFACE_IP [--rpc-port RPC_PORT] + usage: synapse-sim [-h] [--rpc-port RPC_PORT] [--discovery-port DISCOVERY_PORT] [--discovery-addr DISCOVERY_ADDR] [--name NAME] [--serial SERIAL] [-v] @@ -43,7 +43,6 @@ And a toy device `synapse-sim` for local development, options: -h, --help show this help message and exit - --iface-ip IFACE_IP IP of the network interface to use for multicast traffic --rpc-port RPC_PORT Port to listen for RPC requests --discovery-port DISCOVERY_PORT Port to listen for discovery requests From 35148a9edd7d876edfe3392d686bf432617ae729 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 11:38:30 -0700 Subject: [PATCH 11/12] Fix tests --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6866505..39a04d5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -56,7 +56,7 @@ jobs: make pip install -e . - synapse-sim --iface-ip 127.0.0.1 --rpc-port 50051 & + synapse-sim --rpc-port 50051 & sleep 2 From dfe73787983d6e78b0b9d41be16d020f7b68f8aa Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 10 Mar 2025 12:11:15 -0700 Subject: [PATCH 12/12] Added timeout --- synapse/examples/stream_out.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/examples/stream_out.py b/synapse/examples/stream_out.py index 9fc45fd..82a9838 100644 --- a/synapse/examples/stream_out.py +++ b/synapse/examples/stream_out.py @@ -76,6 +76,10 @@ f"{total_bytes_read} bytes in {time.time() - start_time:.2f} sec" ) last_update_time = current_time + + if current_time - start_time > 5: + should_run = False + except KeyboardInterrupt: print("Keyboard interrupt detected, stopping") should_run = False