diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6866505..39a04d5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -56,7 +56,7 @@ jobs: make pip install -e . - synapse-sim --iface-ip 127.0.0.1 --rpc-port 50051 & + synapse-sim --rpc-port 50051 & sleep 2 diff --git a/README.md b/README.md index e3e3cd1..bb1183a 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ As well as the base for a device implementation (`synapse/server`), And a toy device `synapse-sim` for local development, % synapse-sim --help - usage: synapse-sim [-h] --iface-ip IFACE_IP [--rpc-port RPC_PORT] + usage: synapse-sim [-h] [--rpc-port RPC_PORT] [--discovery-port DISCOVERY_PORT] [--discovery-addr DISCOVERY_ADDR] [--name NAME] [--serial SERIAL] [-v] @@ -43,12 +43,11 @@ And a toy device `synapse-sim` for local development, options: -h, --help show this help message and exit - --iface-ip IFACE_IP IP of the network interface to use for multicast traffic --rpc-port RPC_PORT Port to listen for RPC requests --discovery-port DISCOVERY_PORT Port to listen for discovery requests --discovery-addr DISCOVERY_ADDR - Multicast address to listen for discovery requests + UDP address to listen for discovery requests --name NAME Device name --serial SERIAL Device serial number -v, --verbose Enable verbose output diff --git a/synapse-api b/synapse-api index 6d7898f..62ddda4 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 6d7898fd29f734acd7a63bda98c24ea48f768768 +Subproject commit 62ddda4e36a3f0f3c4608e3a3c30100d9716e0dd diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 6f08386..aea1ae0 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -7,6 +7,7 @@ from typing import Optional from operator import itemgetter import copy +import sys from google.protobuf.json_format import Parse, MessageToJson @@ -22,6 +23,129 @@ from rich.pretty import pprint +class PacketMonitor: + def __init__(self): + # Packet tracking + self.packet_count = 0 + self.seq_number = None + self.dropped_packets = 0 + self.out_of_order_packets = 0 + + # Timing metrics + self.start_time = None + self.first_packet_time = None + + # Bandwidth tracking + self.bytes_received = 0 + self.bytes_received_in_interval = 0 + self.bandwidth_samples = [] + self.last_bandwidth_time = None + self.last_bytes_received = 0 + + # Jitter tracking + self.last_packet_time = None + self.last_jitter = 0 + self.avg_jitter = 0 + + def start_monitoring(self): + """Initialize monitoring timers""" + self.start_time = time.time() + self.last_stats_time = self.start_time + self.last_bandwidth_time = self.start_time + + def process_packet(self, header, data, bytes_read): + if not data: + return False + packet_received_time = time.time() + # Record first packet time + if self.packet_count == 0: + self.first_packet_time = packet_received_time + self.last_packet_time = packet_received_time + print( + f"First packet received after {packet_received_time - self.start_time:.3f} seconds\n\n" + ) + else: + # Calculate jitter + interval = packet_received_time - self.last_packet_time + # Update jitter using RFC 3550 algorithm (smoother than just max-min) + # https://datatracker.ietf.org/doc/html/rfc3550#appendix-A.8 + if self.packet_count > 1: + jitter_diff = abs(interval - self.last_jitter) + self.avg_jitter += (jitter_diff - self.avg_jitter) / 16 + + # Save current values for next calculation + self.last_jitter = interval + self.last_packet_time = packet_received_time + + # We got a new packet + self.packet_count += 1 + self.bytes_received += bytes_read + self.bytes_received_in_interval += bytes_read + # Sequence number checking for packet loss/ordering + if self.seq_number is None: + self.seq_number = header.seq_number + else: + expected = (self.seq_number + 1) % (2**16) + if header.seq_number != expected: + # Lost some packets + if header.seq_number > expected: + lost = (header.seq_number - expected) % (2**16) + self.dropped_packets += lost + else: + self.out_of_order_packets += 1 + self.seq_number = header.seq_number + + return True + + def print_stats(self): + # Use carriage return to update on the same line + sys.stdout.write("\r") + try: + terminal_width = os.get_terminal_size().columns + except (AttributeError, OSError): + # Fallback if not available + terminal_width = 80 + + sys.stdout.write(" " * terminal_width) + sys.stdout.write("\r") + + now = time.time() + stats = f"Runtime {now - self.start_time:.1f}s | " + + # Drop calculation + drop_percent = (self.dropped_packets / max(1, self.packet_count)) * 100.0 + stats += f"Dropped: {self.dropped_packets}/{self.packet_count} ({drop_percent:.1f}%) | " + + # Bandwidth calcs + + dt_sec = now - self.last_bandwidth_time + if dt_sec > 0: + bytes_per_second = self.bytes_received_in_interval / dt_sec + megabits_per_second = (bytes_per_second * 8) / 1_000_000 + stats += f"Mbit/sec: {megabits_per_second:.1f} | " + + # Jitter (in milliseconds) + jitter_ms = self.avg_jitter * 1000 # Convert to ms + stats += f"Jitter: {jitter_ms:.2f} ms | " + + # Out of order + stats += f"Out of Order: {self.out_of_order_packets}" + + # Flush the output to ensure it's displayed + if len(stats) > terminal_width - 1: + stats = stats[: terminal_width - 4] + "..." + + # Write the stats and flush + sys.stdout.write(stats) + sys.stdout.flush() + + # Reset for the next run + self.bytes_received_in_interval = 0 + self.last_bandwidth_time = now + + return True + + def add_commands(subparsers): a = subparsers.add_parser("read", help="Read from a device's StreamOut node") a.add_argument("uri", type=str, help="IP address of Synapse device") @@ -76,9 +200,7 @@ def read(args): return else: console.print(f"[bold yellow]Overwriting existing files in {output_base}") - device = syn.Device(args.uri, args.verbose) - with console.status( "Reading from Synapse Device", spinner="bouncingBall", spinner_style="green" ) as status: @@ -158,9 +280,6 @@ def read(args): return console.print("[bold green]Device configured successfully") - if not device.configure(config): - raise ValueError("Failed to configure device") - if info.status.state != DeviceState.kRunning: print("Starting device...") if not device.start(): @@ -216,49 +335,46 @@ def read(args): if args.duration else "Streaming data indefinitely" ) - with console.status( - status_title, spinner="bouncingBall", spinner_style="green" - ) as status: - q = queue.Queue() - plot_q = queue.Queue() if args.plot else None - - threads = [] - stop = threading.Event() - if args.bin: - threads.append( - threading.Thread( - target=_binary_writer, args=(stop, q, num_ch, output_base) - ) - ) - else: - threads.append( - threading.Thread(target=_data_writer, args=(stop, q, output_base)) - ) + console.print(status_title) - if args.plot: - threads.append( - threading.Thread( - target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch) - ) + q = queue.Queue() + plot_q = queue.Queue() if args.plot else None + + threads = [] + stop = threading.Event() + if args.bin: + threads.append( + threading.Thread(target=_binary_writer, args=(stop, q, num_ch, output_base)) + ) + else: + threads.append( + threading.Thread(target=_data_writer, args=(stop, q, output_base)) + ) + + if args.plot: + threads.append( + threading.Thread( + target=_plot_data, args=(stop, plot_q, sample_rate_hz, num_ch) ) + ) + for thread in threads: + thread.start() + + try: + read_packets(stream_out, q, plot_q, args.duration) + except KeyboardInterrupt: + pass + finally: + print("Stopping read...") + stop.set() for thread in threads: - thread.start() + thread.join() - try: - read_packets(stream_out, q, plot_q, args.duration) - except KeyboardInterrupt: - pass - finally: - print("Stopping read...") - stop.set() - for thread in threads: - thread.join() - - if args.config: - console.print("Stopping device...") - if not device.stop(): - console.print("[red]Failed to stop device") - console.print("Stopped") + if args.config: + console.print("Stopping device...") + if not device.stop(): + console.print("[red]Failed to stop device") + console.print("Stopped") console.print("[bold green]Streaming complete") console.print("[cyan]================") @@ -277,11 +393,8 @@ def read_packets( duration: Optional[int] = None, num_ch: int = 32, ): - packet_count = 0 - seq_number = None - dropped_packets = 0 start = time.time() - print_interval = 1000 + last_print_time = start print( f"Reading packets for duration {duration} seconds" @@ -289,42 +402,30 @@ def read_packets( else "Reading packets..." ) + # Keep track of our statistics + monitor = PacketMonitor() + monitor.start_monitoring() while True: - header, data = node.read() - if not data: + synapse_data, bytes_read = node.read() + if synapse_data is None or bytes_read == 0: + print("Could not read data from node") continue - - packet_count += 1 - - # Detect dropped packets via seq_number - if seq_number is None: - seq_number = header.seq_number - else: - expected = (seq_number + 1) % (2**16) - if header.seq_number != expected: - print(f"Seq out of order: got {header.seq_number}, expected {expected}") - dropped_packets += header.seq_number - expected - seq_number = header.seq_number + header, data = synapse_data + monitor.process_packet(header, data, bytes_read) # Always add the data to the writer queues q.put(data) if plot_q: plot_q.put(copy.deepcopy(data)) - if packet_count == 1: - print(f"First packet received at {time.time() - start:.2f} seconds") + # Print every second our current statistics + if time.time() - last_print_time > 1.0: + monitor.print_stats() + last_print_time = time.time() - if packet_count % print_interval == 0: - print( - f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - ) if duration and (time.time() - start) > duration: break - print( - f"Recieved {packet_count} packets in {time.time() - start} seconds. Dropped {dropped_packets} packets ({(dropped_packets / packet_count) * 100}%)" - ) - def _binary_writer(stop, q, num_ch, output_base): filename = f"{output_base}.dat" diff --git a/synapse/client/node.py b/synapse/client/node.py index 8b76e5e..c7041ee 100644 --- a/synapse/client/node.py +++ b/synapse/client/node.py @@ -29,7 +29,6 @@ def from_proto(cls, proto: NodeConfig): oneof = proto.WhichOneof("config") if oneof and hasattr(proto, oneof): config = getattr(proto, oneof) - node = cls._from_proto(config) node.id = proto.id return node diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index 82043ee..a567124 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -1,9 +1,7 @@ -import os import logging import socket -import struct import traceback -from typing import Optional +from typing import Optional, Tuple from synapse.api.datatype_pb2 import DataType from synapse.api.node_pb2 import NodeConfig, NodeType @@ -16,93 +14,101 @@ SynapseData, ) +DEFAULT_STREAM_OUT_PORT = 50038 + + +# Try to get the current user's ip for setting the destination address +def get_client_ip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # This won't actually establish a connection, but helps us figure out the ip + s.connect(("8.8.8.8", 80)) + local_ip = s.getsockname()[0] + except Exception as e: + logging.error(f"Failed to get client IP: {e}") + return None + finally: + s.close() + return local_ip + class StreamOut(Node): type = NodeType.kStreamOut - def __init__(self, label=None, multicast_group=None): + def __init__(self, label=None, destination_address=None, destination_port=None): self.__socket = None self.__label = label - self.__multicast_group: Optional[str] = multicast_group - def read(self) -> Optional[SynapseData]: + # If we have been passed a None for destination address, try to resolve it + if not destination_address: + self.__destination_address = get_client_ip() + else: + self.__destination_address = destination_address + + if not destination_port: + self.__destination_port = DEFAULT_STREAM_OUT_PORT + else: + self.__destination_port = destination_port + + def read(self) -> Tuple[Optional[SynapseData], int]: if self.__socket is None: if self.open_socket() is None: return None data, _ = self.__socket.recvfrom(8192) - return self._unpack(data) + bytes_read = len(data) + return self._unpack(data), bytes_read def open_socket(self): - print("Opening socket") + logging.info( + f"Opening socket at {self.__destination_address}:{self.__destination_port}" + ) if self.device is None: logging.error("Node has no device") return None - node_socket = next( - (s for s in self.device.sockets if s.node_id == self.id), None - ) - if node_socket is None: - logging.error("Couldnt find socket for node") - return None - - bind = node_socket.bind.split(":") - if len(bind) != 2: - logging.error("Invalid bind address") - return None - - addr = ( - self.__multicast_group - if self.__multicast_group - else self.device.uri.split(":")[0] - ) - if addr is None: - logging.error("Invalid bind address") - return None - - port = int(bind[1]) - if not port: - logging.error(f"Invalid bind port. Bind string: {bind}") - return None - - logging.info(f"Opening UDP multicast socket to {addr}:{port}") - + # UDP socket self.__socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP ) + + # Allow reuse for easy restart self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024 # 5MB - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE_BYTES) + # Try to set a large recv buffer + SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024 # 5MB + self.__socket.setsockopt( + socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE_BYTES + ) recvbuf = self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) - if recvbuf < SOCKET_BUFSIZE_BYTES: - logging.warning(f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit.") - - if os.name != "nt": - logging.info(f"binding to {addr}:{port}") - self.__socket.bind((addr, port)) - else: - logging.info(f"binding to {port}") - self.__socket.bind(('', port)) - - if self.__multicast_group: - logging.info(f"joining multicast group {self.__multicast_group}") - host = socket.gethostbyname(socket.gethostname()) - mreq = socket.inet_aton(addr) + socket.inet_aton(host) - - mreq = struct.pack("4sL", socket.inet_aton(addr), socket.INADDR_ANY) - self.__socket.setsockopt( - socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq + logging.warning( + f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit." ) + # Bind to the destination address (our ip) and port + try: + self.__socket.bind((self.__destination_address, self.__destination_port)) + except Exception as e: + logging.error( + f"Failed to bind to {self.__destination_address}:{self.__destination_port}: {e}" + ) + return None return self.__socket def _to_proto(self): n = NodeConfig() o = StreamOutConfig() - o.label = self.__label - o.multicast_group = self.__multicast_group + if self.__label: + o.label = self.__label + else: + o.label = "Stream Out" + + if self.__destination_address: + o.udp_unicast.destination_address = self.__destination_address + + if self.__destination_port: + o.udp_unicast.destination_port = self.__destination_port n.stream_out.CopyFrom(o) return n @@ -133,4 +139,28 @@ def _from_proto(proto: Optional[StreamOutConfig]): if not isinstance(proto, StreamOutConfig): raise ValueError("proto is not of type StreamOutConfig") - return StreamOut(proto.label, proto.multicast_group) + # We currently only support udp unicast + selected_transport = proto.WhichOneof("transport") + + if selected_transport is None: + # Set the defaults + destination_address = get_client_ip() + destination_port = DEFAULT_STREAM_OUT_PORT + logging.info( + f"Requesting StreamOut to: {destination_address}:{destination_port}" + ) + return StreamOut(proto.label, destination_address, destination_port) + elif selected_transport == "udp_unicast": + dest_address = proto.udp_unicast.destination_address + dest_port = proto.udp_unicast.destination_port + if dest_address == "": + dest_address = get_client_ip() + if dest_port == 0: + dest_port = DEFAULT_STREAM_OUT_PORT + logging.info( + f"Using user provided StreamOut destination: {dest_address}:{dest_port}" + ) + return StreamOut(proto.label, dest_address, dest_port) + else: + logging.error(f"Unsupported transport: {selected_transport}") + return None diff --git a/synapse/examples/stream_out.py b/synapse/examples/stream_out.py index 10b1f85..82a9838 100644 --- a/synapse/examples/stream_out.py +++ b/synapse/examples/stream_out.py @@ -1,5 +1,8 @@ import synapse as syn import sys +import time + +SIMULATED_PERIPHERAL_ID = 100 if __name__ == "__main__": uri = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1:647" @@ -10,18 +13,21 @@ print("Device info:") print(info) - stream_out = syn.StreamOut(label="my broadband", multicast_group="224.0.0.1") - + # The StreamOut node will automatically set your dest IP and port + stream_out = syn.StreamOut(label="my broadband") + channels = [ syn.Channel( id=channel_num, electrode_id=channel_num * 2, - reference_id=channel_num * 2 + 1 - ) for channel_num in range(32) + reference_id=channel_num * 2 + 1, + ) + for channel_num in range(32) ] - + broadband = syn.BroadbandSource( - peripheral_id=2, + # Use the simulated peripheral (100), or replace with your own + peripheral_id=SIMULATED_PERIPHERAL_ID, sample_rate_hz=30000, bit_width=12, gain=20.0, @@ -31,7 +37,7 @@ low_cutoff_hz=500.0, high_cutoff_hz=6000.0, ) - ) + ), ) config = syn.Config() @@ -46,5 +52,37 @@ assert info is not None, "Couldn't get device info" print("Configured device info:") print(info) - + + should_run = True + total_bytes_read = 0 + start_time = time.time() + last_update_time = start_time + update_interval_sec = 1 + while should_run: + try: + # Wait for data + syn_data, bytes_read = stream_out.read() + if syn_data is None or bytes_read == 0: + print("Failed to read data from node") + continue + # Do something with the data + _header, _data = syn_data + total_bytes_read += bytes_read + + current_time = time.time() + if (current_time - last_update_time) >= update_interval_sec: + sys.stdout.write("\r") + sys.stdout.write( + f"{total_bytes_read} bytes in {time.time() - start_time:.2f} sec" + ) + last_update_time = current_time + + if current_time - start_time > 5: + should_run = False + + except KeyboardInterrupt: + print("Keyboard interrupt detected, stopping") + should_run = False + + print("Stopping device") device.stop() diff --git a/synapse/server/entrypoint.py b/synapse/server/entrypoint.py index abdcd00..d2d4ce2 100644 --- a/synapse/server/entrypoint.py +++ b/synapse/server/entrypoint.py @@ -1,18 +1,15 @@ import signal -import struct import socket import sys import asyncio import logging import argparse from coolname import generate_slug - -logging.basicConfig(level=logging.INFO) - from synapse.server.rpc import serve from synapse.server.autodiscovery import BroadcastDiscoveryProtocol from synapse.server.nodes import SERVER_NODE_OBJECT_MAP +logging.basicConfig(level=logging.INFO) ENTRY_DEFAULTS = { "iface_ip": None, @@ -32,11 +29,6 @@ def main( # formatter_class=argparse.ArgumentDefaultsHelpFormatter formatter_class=lambda prog: argparse.HelpFormatter(prog, width=124), ) - parser.add_argument( - "--iface-ip", - help="IP of the network interface to use for multicast traffic", - required=True, - ) parser.add_argument( "--rpc-port", help="Port to listen for RPC requests", @@ -51,7 +43,7 @@ def main( ) parser.add_argument( "--discovery-addr", - help="Multicast address to listen for discovery requests", + help="UDP address to listen for discovery requests", default=defaults["discovery_addr"], ) parser.add_argument("--name", help="Device name", default=defaults["server_name"]) @@ -62,16 +54,6 @@ def main( "-v", "--verbose", action="store_true", help="Enable verbose output" ) args = parser.parse_args() - # verify that network interface is real - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((args.iface_ip, 8000)) - logging.info(f"Binding to {s.getsockname()[0]}...") - except Exception as e: - parser.error("Invalid --iface-ip given, could not bind to interface") - finally: - s.close() if args.verbose: logging.basicConfig(level=logging.DEBUG) @@ -101,7 +83,6 @@ async def async_main(args, node_object_map, peripherals): args.name, args.serial, args.rpc_port, - args.iface_ip, node_object_map, peripherals, ) diff --git a/synapse/server/nodes/stream_out.py b/synapse/server/nodes/stream_out.py index e70ff66..ef95135 100644 --- a/synapse/server/nodes/stream_out.py +++ b/synapse/server/nodes/stream_out.py @@ -1,18 +1,13 @@ import asyncio -import queue import socket -import struct from typing import List from synapse.api.node_pb2 import NodeType from synapse.api.nodes.stream_out_pb2 import StreamOutConfig from synapse.server.nodes.base import BaseNode -from synapse.server.status import Status, StatusCode +from synapse.server.status import Status from synapse.utils.ndtp_types import SynapseData -PORT = 6480 -MULTICAST_TTL = 3 - class StreamOut(BaseNode): __n = 0 @@ -22,8 +17,8 @@ def __init__(self, id): self.__i = StreamOut.__n StreamOut.__n += 1 self.__sequence_number = 0 - self.__iface_ip = None self.__config = None + self.socket_endpoint = None def config(self): c = super().config() @@ -34,46 +29,32 @@ def config(self): return c def configure(self, config: StreamOutConfig) -> Status: - if not self.__iface_ip: - return Status(StatusCode.kUndefinedError, "No interface IP specified") - - if not config.multicast_group: - return Status(StatusCode.kUndefinedError, "No multicast group specified") - self.__config = config + if not config.udp_unicast: + self.logger.error( + "Cannot conifgure StreamOut, only udp unicast is supported" + ) + raise Exception("Only udp unicast is supported for streamout") + self.__socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP ) - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - port = PORT + self.__i + dest_address = self.__config.udp_unicast.destination_address + dest_port = self.__config.udp_unicast.destination_port - self.__socket.bind((self.__iface_ip, port)) - - mreq = struct.pack( - "=4sl", socket.inet_aton(config.multicast_group), socket.INADDR_ANY - ) - self.__socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - self.__socket.setsockopt( - socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, MULTICAST_TTL - ) - - self.socket = [config.multicast_group, port] - self.logger.info( - f"created multicast socket on {self.socket}, group {config.multicast_group}" - ) + self.__socket.bind((dest_address, dest_port)) + self.socket_endpoint = (dest_address, dest_port) + self.logger.info(f"created stream out socket on {self.__socket}") return Status() - def configure_iface_ip(self, iface_ip): - self.__iface_ip = iface_ip - async def run(self): loop = asyncio.get_running_loop() while self.running: - if not self.socket: + if not self.socket_endpoint: self.logger.error("socket not configured") return @@ -85,7 +66,7 @@ async def run(self): None, self.__socket.sendto, packet, - (self.socket[0], self.socket[1]), + self.socket_endpoint, ) def _pack(self, data: SynapseData) -> List[bytes]: diff --git a/synapse/server/rpc.py b/synapse/server/rpc.py index eae2fa1..1c5ead8 100644 --- a/synapse/server/rpc.py +++ b/synapse/server/rpc.py @@ -13,13 +13,11 @@ async def serve( - server_name, device_serial, rpc_port, iface_ip, node_object_map, peripherals + server_name, device_serial, rpc_port, node_object_map, peripherals ) -> None: server = grpc.aio.server() add_SynapseDeviceServicer_to_server( - SynapseServicer( - server_name, device_serial, iface_ip, node_object_map, peripherals - ), + SynapseServicer(server_name, device_serial, node_object_map, peripherals), server, ) server.add_insecure_port("[::]:%d" % rpc_port) @@ -35,10 +33,9 @@ class SynapseServicer(SynapseDeviceServicer): connections = [] nodes = [] - def __init__(self, name, serial, iface_ip, node_object_map, peripherals): + def __init__(self, name, serial, node_object_map, peripherals): self.name = name self.serial = serial - self.iface_ip = iface_ip self.node_object_map = node_object_map self.peripherals = peripherals self.logger = logging.getLogger("server") @@ -170,7 +167,7 @@ def _reconfigure(self, configuration): "Creating %s node(%d)" % (NodeType.Name(node.type), node.id) ) node = self.node_object_map[node.type](node.id) - if node.type in [NodeType.kStreamOut, NodeType.kStreamIn]: + if node.type in [NodeType.kStreamIn]: node.configure_iface_ip(self.iface_ip) status = node.configure(config) diff --git a/test_config.json b/test_config.json index e63c330..ed32943 100644 --- a/test_config.json +++ b/test_config.json @@ -4,7 +4,6 @@ "type": "kStreamOut", "id": 1, "stream_out": { - "multicast_group": "224.0.0.115" } }, {