diff --git a/README.md b/README.md index cba9b39..2600556 100644 --- a/README.md +++ b/README.md @@ -222,7 +222,7 @@ resolved by name via entry points or a built-in default. | # | Layer | Interface | Default plugin | |---|---|---|---| -| 1 | Transport | `Transport` | `in_memory` (in-process event queue; no network I/O) | +| 1 | Transport | `Transport` | `in_memory` (in-process event queue; no network I/O) — also `realistic` (per-hop latency, jitter, bandwidth, queueing, drop-tail backpressure) | | 2 | Communication | `CommsProtocol` | `nest_native` (JSON envelope, base64 payload) | | 3 | Identity | `Identity` | `did_key` (deterministic public-key signatures for simulation; not Ed25519) | | 4 | Registry | `Registry` | `in_memory` (dict lookup, no persistence) | @@ -312,7 +312,11 @@ Two things to know that aren't obvious: *is correct*. That's what your plugin is for. - **Single process.** No distributed execution. - **In-memory transport only out of the box.** No TCP, gRPC, or HTTP - yet. + yet. The bundled `realistic` transport models per-hop latency, jitter, + bandwidth, queueing, and packet loss on top of the same in-process + event queue — useful for stressing protocol timing without leaving + Tier 1. Pick it via `layers.transport: realistic` and tune knobs in + `layers.transport_config` (see `scenarios/marketplace_realistic.yaml`). - **Tier 2 is non-deterministic.** Don't use it for benchmarks. --- diff --git a/packages/nest-core/nest_core/plugins.py b/packages/nest-core/nest_core/plugins.py index 8a4d76e..057a7df 100644 --- a/packages/nest-core/nest_core/plugins.py +++ b/packages/nest-core/nest_core/plugins.py @@ -18,6 +18,7 @@ _REF = "nest_plugins_reference" _BUILTINS: dict[tuple[str, str], str] = { ("transport", "in_memory"): f"{_REF}.transport.in_memory:StandaloneInMemoryTransport", + ("transport", "realistic"): f"{_REF}.transport.realistic:RealisticTransport", ("comms", "nest_native"): f"{_REF}.comms.nest_native:NestNativeComms", ("identity", "did_key"): f"{_REF}.identity.did_key:DidKeyIdentity", ("registry", "in_memory"): f"{_REF}.registry.in_memory:InMemoryRegistry", diff --git a/packages/nest-core/nest_core/runner.py b/packages/nest-core/nest_core/runner.py index 434ce71..46a2856 100644 --- a/packages/nest-core/nest_core/runner.py +++ b/packages/nest-core/nest_core/runner.py @@ -140,6 +140,25 @@ def _create_shell_agents(self, plugins: dict[str, Any]) -> dict[AgentId, Any]: raise KeyError(msg) return factory_fn(self._config, plugins, backend=backend) + def _build_network_model(self) -> Any: + """Build a NetworkModel from scenario config, or return None. + + For ``transport: realistic``, instantiates + :class:`~nest_plugins_reference.transport.realistic.RealisticNetwork` + from ``layers.transport_config``. Other transport plugins fall + back to the simulator's default (zero-latency) model. + + Example:: + + model = runner._build_network_model() + """ + layers = self._config.layers + if layers.transport != "realistic": + return None + from nest_plugins_reference.transport.realistic import RealisticNetwork + + return RealisticNetwork.from_config(layers.transport_config or {}) + async def run(self) -> Path: """Run the scenario and return the trace file path. @@ -159,6 +178,8 @@ async def run(self) -> Path: raw_groups = failures.network_partition.get("groups") partition_groups = _parse_partition_groups(raw_groups) + network_model = self._build_network_model() + sim = Simulator( seed=self._config.seed, trace_path=trace_path, @@ -166,6 +187,7 @@ async def run(self) -> Path: byzantine_fraction=failures.byzantine_agents, partition_groups=partition_groups, plugins=plugins, + network_model=network_model, ) agents = self._create_agents(plugins) diff --git a/packages/nest-core/nest_core/scenario.py b/packages/nest-core/nest_core/scenario.py index fb45849..f1592e1 100644 --- a/packages/nest-core/nest_core/scenario.py +++ b/packages/nest-core/nest_core/scenario.py @@ -65,12 +65,17 @@ def _count_non_negative(cls, value: int) -> int: class LayerConfig(BaseModel): """Plugin selection for each of the 12 layers. + ``transport_config`` is an optional dict forwarded to the transport + plugin (e.g. ``RealisticNetwork.from_config``). Most plugins accept + no config; the field is ignored when empty. + Example:: layers = LayerConfig(transport="in_memory", comms="nest_native") """ transport: str = "in_memory" + transport_config: dict[str, Any] = Field(default_factory=dict) comms: str = "nest_native" identity: str = "did_key" registry: str = "in_memory" diff --git a/packages/nest-core/nest_core/sim/__init__.py b/packages/nest-core/nest_core/sim/__init__.py index 3668206..dcc0d00 100644 --- a/packages/nest-core/nest_core/sim/__init__.py +++ b/packages/nest-core/nest_core/sim/__init__.py @@ -11,6 +11,8 @@ from nest_core.sim.clock import VirtualClock as VirtualClock from nest_core.sim.events import Event as Event from nest_core.sim.events import EventQueue as EventQueue +from nest_core.sim.network import NetworkModel as NetworkModel +from nest_core.sim.network import ZeroLatencyNetworkModel as ZeroLatencyNetworkModel from nest_core.sim.simulator import Simulator as Simulator from nest_core.sim.trace import TraceWriter as TraceWriter from nest_core.sim.transport import InMemoryTransport as InMemoryTransport @@ -20,8 +22,10 @@ "Event", "EventQueue", "InMemoryTransport", + "NetworkModel", "Simulator", "StateMachineAgent", "TraceWriter", "VirtualClock", + "ZeroLatencyNetworkModel", ] diff --git a/packages/nest-core/nest_core/sim/network.py b/packages/nest-core/nest_core/sim/network.py new file mode 100644 index 0000000..13cbec6 --- /dev/null +++ b/packages/nest-core/nest_core/sim/network.py @@ -0,0 +1,95 @@ +# SPDX-License-Identifier: Apache-2.0 +"""Network model abstraction for the discrete-event simulator. + +The default ``in_memory`` transport delivers messages at ``time = now`` — fine +for correctness testing, but useless for any property that depends on +latency, queueing, or backpressure. ``NetworkModel`` is the seam: given a +send event, it returns the virtual time at which the message should be +delivered (or ``None`` to drop the message). + +The default :class:`ZeroLatencyNetworkModel` reproduces the existing +zero-latency semantics so simulations that don't configure a network model +behave exactly as before. + +Plugin transports can ship their own :class:`NetworkModel` subclass and +expose it via a ``network_model`` attribute; the scenario runner picks it +up automatically and wires it into the simulator. + +Example:: + + class FixedDelayNetwork(NetworkModel): + def schedule(self, sender, target, payload_size, t_now, rng): + return t_now + 0.010 # 10 ms per hop, no jitter, no drops + + sim = Simulator(seed=42, network_model=FixedDelayNetwork()) +""" + +from __future__ import annotations + +import random +from typing import Protocol, runtime_checkable + +from nest_core.types import AgentId + + +@runtime_checkable +class NetworkModel(Protocol): + """Pluggable per-hop scheduling for the simulator's transport. + + Implementations must be deterministic given the supplied ``rng``: the + simulator passes its own failure-injection RNG so that traces remain + reproducible across runs with the same seed. + + Example:: + + class MyNetwork: + def schedule(self, sender, target, payload_size, t_now, rng): + return t_now + rng.uniform(0.001, 0.005) + """ + + def schedule( + self, + sender: AgentId, + target: AgentId, + payload_size: int, + t_now: float, + rng: random.Random, + ) -> float | None: + """Return the delivery time for a message, or ``None`` to drop it. + + The returned time must be ``>= t_now``. Returning ``None`` signals + a transport-level drop; the simulator records a ``dropped`` event + with reason ``"network"``. + + Example:: + + t_deliver = model.schedule(a1, a2, 128, ctx.time, rng) + """ + ... + + +class ZeroLatencyNetworkModel: + """Default model: every message is delivered at the current simulation time. + + This preserves the existing in-memory behavior and ensures backwards + compatibility for traces and validators that assume zero-latency + delivery. + + Example:: + + sim = Simulator(seed=0, network_model=ZeroLatencyNetworkModel()) + """ + + def schedule( + self, + sender: AgentId, # noqa: ARG002 — part of the protocol surface + target: AgentId, # noqa: ARG002 + payload_size: int, # noqa: ARG002 + t_now: float, + rng: random.Random, # noqa: ARG002 + ) -> float | None: + return t_now + + +# Verify the default satisfies the protocol at import time. +_proto_check: type[NetworkModel] = ZeroLatencyNetworkModel # noqa: F841 diff --git a/packages/nest-core/nest_core/sim/simulator.py b/packages/nest-core/nest_core/sim/simulator.py index c61c343..1fb7bbc 100644 --- a/packages/nest-core/nest_core/sim/simulator.py +++ b/packages/nest-core/nest_core/sim/simulator.py @@ -22,6 +22,7 @@ from nest_core.sim.agent import AgentContext, StateMachineAgent from nest_core.sim.clock import VirtualClock from nest_core.sim.events import Event, EventQueue +from nest_core.sim.network import NetworkModel, ZeroLatencyNetworkModel from nest_core.sim.trace import TraceWriter from nest_core.sim.transport import InMemoryTransport from nest_core.types import AgentId, CorrelationId @@ -99,7 +100,20 @@ async def send(self, to: AgentId, payload: bytes) -> None: "corr": str(cid), } ) - await self._transport.send(to, payload, correlation_id=cid) + _, accepted = await self._transport.send(to, payload, correlation_id=cid) + if not accepted and self._trace: + self._trace.record( + { + "ts": self._clock.now, + "agent": str(to), + "kind": "dropped", + "from": str(self._agent_id), + "size": len(payload), + "msg": payload.decode("utf-8", errors="replace"), + "corr": str(cid), + "reason": "network", + } + ) async def broadcast(self, payload: bytes) -> None: cid = self._corr.next() @@ -114,7 +128,22 @@ async def broadcast(self, payload: bytes) -> None: "corr": str(cid), } ) - await self._transport.broadcast(payload, correlation_id=cid) + results = await self._transport.broadcast(payload, correlation_id=cid) + if self._trace: + for target, _t, accepted in results: + if not accepted: + self._trace.record( + { + "ts": self._clock.now, + "agent": str(target), + "kind": "dropped", + "from": str(self._agent_id), + "size": len(payload), + "msg": payload.decode("utf-8", errors="replace"), + "corr": str(cid), + "reason": "network", + } + ) async def schedule(self, delay: float, payload: bytes) -> None: self._queue.push( @@ -150,6 +179,7 @@ def __init__( byzantine_fraction: float = 0.0, partition_groups: list[list[str]] | None = None, plugins: dict[str, Any] | None = None, + network_model: NetworkModel | None = None, ) -> None: if not 0.0 <= message_drop_rate <= 1.0: msg = f"message_drop_rate must be between 0 and 1: {message_drop_rate}" @@ -175,6 +205,10 @@ def __init__( self._byzantine_agents: set[AgentId] = set() self._partition_map: dict[AgentId, int] = {} self._failure_rng = random.Random(self._master_rng.randint(0, 2**63)) + # Separate RNG for the network model so byzantine/partition draws are + # not perturbed by per-hop latency sampling. + self._network_rng = random.Random(self._master_rng.randint(0, 2**63)) + self._network_model: NetworkModel = network_model or ZeroLatencyNetworkModel() self._plugins: dict[str, Any] = plugins or {} self._agent_plugins: dict[AgentId, dict[str, Any]] = {} @@ -227,7 +261,14 @@ def add_agent(self, agent_id: AgentId, agent: StateMachineAgent) -> None: """ agent_rng = random.Random(self._master_rng.randint(0, 2**63)) all_ids = [aid for aid in self._agents] - transport = InMemoryTransport(agent_id, self._queue, self._clock, all_ids) + transport = InMemoryTransport( + agent_id, + self._queue, + self._clock, + all_ids, + network_model=self._network_model, + network_rng=self._network_rng, + ) self._agents[agent_id] = _AgentSlot( agent=agent, transport=transport, @@ -250,17 +291,32 @@ def _init_failures(self) -> None: if aid in self._agents: self._partition_map[aid] = group_idx - def _should_drop(self, sender: AgentId, target: AgentId) -> bool: + def _drop_reason(self, sender: AgentId, target: AgentId) -> str | None: + """Return a reason string if the message should be dropped, else None. + + Failure-injection drop and network-partition drop are distinguished + in the trace so downstream metrics can attribute them correctly. + """ if self._message_drop_rate > 0 and self._failure_rng.random() < self._message_drop_rate: - return True + return "failure_injection" if self._partition_map: s_group = self._partition_map.get(sender, -1) t_group = self._partition_map.get(target, -2) if s_group >= 0 and t_group >= 0 and s_group != t_group: - return True + return "partition" + + return None - return False + def _should_drop(self, sender: AgentId, target: AgentId) -> bool: + """Backwards-compatible alias for ``_drop_reason() is not None``. + + Example:: + + if sim._should_drop(sender, target): + ... + """ + return self._drop_reason(sender, target) is not None async def run(self, max_ticks: int = 100_000, max_time: float | None = None) -> None: """Run the simulation until events are exhausted or limits are reached. @@ -314,7 +370,8 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) -> if target_slot is None: continue - if self._should_drop(event.target_id, event.agent_id): + drop_reason = self._drop_reason(event.target_id, event.agent_id) + if drop_reason is not None: self._dropped_count += 1 if self._trace: drop_rec: dict[str, Any] = { @@ -324,6 +381,7 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) -> "from": str(event.target_id), "size": len(event.payload), "msg": event.payload.decode("utf-8", errors="replace"), + "reason": drop_reason, } if event.correlation_id is not None: drop_rec["corr"] = str(event.correlation_id) @@ -368,6 +426,31 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) -> if self._trace: self._trace.close() + @property + def network_model(self) -> NetworkModel: + """The currently installed network model. + + Example:: + + sim.network_model + """ + return self._network_model + + def set_network_model(self, model: NetworkModel) -> None: + """Install a network model after construction. + + Must be called before :meth:`add_agent` for new agents to pick it + up; existing agents are also updated so it is safe to call right + before :meth:`run`. + + Example:: + + sim.set_network_model(RealisticNetwork(...)) + """ + self._network_model = model + for slot in self._agents.values(): + slot.transport.set_network(model, self._network_rng) + def set_agent_plugins(self, agent_id: AgentId, overrides: dict[str, Any]) -> None: """Set per-agent plugin overrides (merged on top of shared plugins). diff --git a/packages/nest-core/nest_core/sim/transport.py b/packages/nest-core/nest_core/sim/transport.py index 65dcc99..499a049 100644 --- a/packages/nest-core/nest_core/sim/transport.py +++ b/packages/nest-core/nest_core/sim/transport.py @@ -1,6 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 """In-memory transport wired to the simulator's event queue. +Delivery time is delegated to a :class:`NetworkModel` so that plugins can +inject per-hop latency, jitter, queueing, and packet loss without +forking the simulator. The default model is zero-latency, preserving the +existing behavior. + Example:: transport = InMemoryTransport(agent_id, event_queue, clock) @@ -9,8 +14,10 @@ from __future__ import annotations +import random from typing import TYPE_CHECKING +from nest_core.sim.network import NetworkModel, ZeroLatencyNetworkModel from nest_core.types import AgentId, CorrelationId, TransportCapabilities if TYPE_CHECKING: @@ -39,29 +46,63 @@ def __init__( event_queue: EventQueue, clock: VirtualClock, all_agents: list[AgentId] | None = None, + network_model: NetworkModel | None = None, + network_rng: random.Random | None = None, ) -> None: self._agent_id = agent_id self._queue = event_queue self._clock = clock self.all_agents = all_agents or [] + self._network_model: NetworkModel = network_model or ZeroLatencyNetworkModel() + # Falling back to a deterministic Random(0) keeps standalone uses of the + # transport reproducible. The simulator overrides this with its own + # failure-injection RNG so traces stay byte-identical across runs. + self._network_rng = network_rng if network_rng is not None else random.Random(0) + + def set_network(self, model: NetworkModel, rng: random.Random) -> None: + """Replace the network model and RNG (used by the simulator at wire-up). + + Example:: + + transport.set_network(RealisticNetwork(...), failure_rng) + """ + self._network_model = model + self._network_rng = rng async def send( self, to: AgentId, payload: bytes, correlation_id: CorrelationId | None = None, - ) -> None: + ) -> tuple[float, bool]: """Enqueue a message delivery event. + Returns ``(delivery_time, accepted)`` so the caller (the simulator) + can record drop events when the network model rejects the message. + Example:: - await transport.send(AgentId("a2"), b"hello") + t, ok = await transport.send(AgentId("a2"), b"hello") """ from nest_core.sim.events import Event + t_deliver = self._network_model.schedule( + sender=self._agent_id, + target=to, + payload_size=len(payload), + t_now=self._clock.now, + rng=self._network_rng, + ) + if t_deliver is None: + return (self._clock.now, False) + if t_deliver < self._clock.now: + # Network models must respect the arrow of time; clamp defensively + # rather than corrupt the event queue. + t_deliver = self._clock.now + self._queue.push( Event( - time=self._clock.now, + time=t_deliver, kind="deliver", agent_id=to, target_id=self._agent_id, @@ -69,6 +110,7 @@ async def send( correlation_id=correlation_id, ) ) + return (t_deliver, True) async def receive(self) -> tuple[AgentId, bytes]: """Not used in Tier 1 — the simulator pushes events to agents. @@ -83,13 +125,19 @@ async def broadcast( self, payload: bytes, correlation_id: CorrelationId | None = None, - ) -> None: + ) -> list[tuple[AgentId, float, bool]]: """Broadcast to all known agents. + Returns a list of ``(target, delivery_time, accepted)`` so the + simulator can record per-target trace events. + Example:: - await transport.broadcast(b"announcement") + results = await transport.broadcast(b"announcement") """ + results: list[tuple[AgentId, float, bool]] = [] for aid in self.all_agents: if aid != self._agent_id: - await self.send(aid, payload, correlation_id=correlation_id) + t, ok = await self.send(aid, payload, correlation_id=correlation_id) + results.append((aid, t, ok)) + return results diff --git a/packages/nest-core/tests/test_network_model.py b/packages/nest-core/tests/test_network_model.py new file mode 100644 index 0000000..69e70e9 --- /dev/null +++ b/packages/nest-core/tests/test_network_model.py @@ -0,0 +1,255 @@ +# SPDX-License-Identifier: Apache-2.0 +"""End-to-end tests for the NetworkModel hook in the Tier 1 simulator. + +Verifies that: +1. The default zero-latency model preserves backwards-compatible behavior. +2. A custom NetworkModel actually shifts the virtual clock and surfaces + in mean_latency / duration. +3. Network-level drops (returning None) produce trace ``dropped`` events + with a ``reason`` field distinct from scenario-level failure injection. +4. The model is invoked deterministically — same seed, same trace. +""" + +from __future__ import annotations + +import json +import math +import random +from pathlib import Path +from typing import Any + +import pytest +from nest_core.sim import ( + NetworkModel, + Simulator, + StateMachineAgent, + ZeroLatencyNetworkModel, +) +from nest_core.sim.agent import AgentContext +from nest_core.types import AgentId + + +class _PingOnce(StateMachineAgent): + """Sends one ping on start, records every message it receives.""" + + def __init__(self, target: AgentId) -> None: + self.target = target + self.received: list[tuple[float, bytes]] = [] + + async def on_start(self, ctx: AgentContext) -> None: + if self.target != ctx.agent_id: + await ctx.send(self.target, b"ping") + + async def on_message(self, ctx: AgentContext, sender: AgentId, payload: bytes) -> None: + del sender + self.received.append((ctx.time, payload)) + + +class _FixedDelayNetwork: + """Adds a constant 0.5-second delay to every send.""" + + def schedule( + self, + sender: AgentId, # noqa: ARG002 + target: AgentId, # noqa: ARG002 + payload_size: int, # noqa: ARG002 + t_now: float, + rng: random.Random, # noqa: ARG002 + ) -> float | None: + return t_now + 0.5 + + +class _DropEverythingNetwork: + """Drops every send to A2; lets others through with no delay.""" + + def schedule( + self, + sender: AgentId, # noqa: ARG002 + target: AgentId, + payload_size: int, # noqa: ARG002 + t_now: float, + rng: random.Random, # noqa: ARG002 + ) -> float | None: + if target == AgentId("a2"): + return None + return t_now + + +def _read_trace(p: Path) -> list[dict[str, Any]]: + return [json.loads(line) for line in p.read_text().splitlines()] + + +class TestDefaultBehavior: + @pytest.mark.asyncio + async def test_no_network_model_means_zero_latency(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator(seed=0, trace_path=trace) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + assert sim.clock.now == 0.0 + + @pytest.mark.asyncio + async def test_explicit_zero_latency_matches_default(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator(seed=0, trace_path=trace, network_model=ZeroLatencyNetworkModel()) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + assert sim.clock.now == 0.0 + + +class TestLatencyAdvancesClock: + @pytest.mark.asyncio + async def test_fixed_delay_shows_up_in_clock(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator( + seed=0, + trace_path=trace, + network_model=_FixedDelayNetwork(), + ) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + # Two pings — each carries a 0.5 s hop, so the clock should reach 0.5. + assert sim.clock.now >= 0.5 + + @pytest.mark.asyncio + async def test_receive_events_carry_post_delay_timestamp(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator( + seed=0, + trace_path=trace, + network_model=_FixedDelayNetwork(), + ) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + + events = _read_trace(trace) + sends = [e for e in events if e["kind"] == "send"] + receives = [e for e in events if e["kind"] == "receive"] + assert sends and receives + + # For each correlation_id, receive.ts should be send.ts + 0.5. + send_by_corr = {e["corr"]: e["ts"] for e in sends} + for r in receives: + assert math.isclose(r["ts"], send_by_corr[r["corr"]] + 0.5, abs_tol=1e-9) + + +class TestNetworkDrop: + @pytest.mark.asyncio + async def test_drop_recorded_in_trace(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator( + seed=0, + trace_path=trace, + network_model=_DropEverythingNetwork(), + ) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + + events = _read_trace(trace) + dropped = [e for e in events if e["kind"] == "dropped"] + # a1 -> a2 ping should be dropped; a2's send to a1 still arrives. + assert any(e.get("from") == "a1" and e.get("reason") == "network" for e in dropped) + receives = [e for e in events if e["kind"] == "receive"] + # a1 should still receive a2's ping. + assert any(r["agent"] == "a1" for r in receives) + + @pytest.mark.asyncio + async def test_failure_injection_drops_carry_distinct_reason(self, tmp_path: Path) -> None: + trace = tmp_path / "out.jsonl" + sim = Simulator( + seed=42, + trace_path=trace, + message_drop_rate=1.0, # drop everything at the failure layer + ) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=100) + + events = _read_trace(trace) + dropped = [e for e in events if e["kind"] == "dropped"] + assert dropped + assert all(e.get("reason") == "failure_injection" for e in dropped) + + +class TestDeterminism: + @pytest.mark.asyncio + async def test_same_seed_same_trace(self, tmp_path: Path) -> None: + from nest_plugins_reference.transport.realistic import RealisticNetwork + + traces: list[str] = [] + for run_idx in range(2): + trace = tmp_path / f"out{run_idx}.jsonl" + sim = Simulator( + seed=2024, + trace_path=trace, + network_model=RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.4, + bandwidth_bps=1_000_000.0, + loss_rate=0.05, + ), + ) + sim.add_agent(AgentId("a1"), _PingOnce(AgentId("a2"))) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=200) + traces.append(trace.read_text()) + + assert traces[0] == traces[1] + + +class TestProtocolHook: + def test_network_model_protocol_runtime_check(self) -> None: + m: NetworkModel = _FixedDelayNetwork() + assert hasattr(m, "schedule") + + +class _Chatterbox(StateMachineAgent): + """Spams N messages on start to stress the egress queue.""" + + def __init__(self, target: AgentId, n: int, size: int) -> None: + self.target = target + self.n = n + self.size = size + + async def on_start(self, ctx: AgentContext) -> None: + if self.target == ctx.agent_id: + return + payload = b"x" * self.size + for _ in range(self.n): + await ctx.send(self.target, payload) + + +class TestRealisticEndToEnd: + @pytest.mark.asyncio + async def test_queueing_observed_in_mean_latency(self, tmp_path: Path) -> None: + from nest_core.metrics import compute_metrics + from nest_plugins_reference.transport.realistic import RealisticNetwork + + trace = tmp_path / "out.jsonl" + # 1 Mbps, 1 kB messages => 8 ms serialization per message. + # 100 messages back-to-back => last one arrives ~ 800 ms after send. + sim = Simulator( + seed=0, + trace_path=trace, + network_model=RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + ), + ) + sim.add_agent(AgentId("a1"), _Chatterbox(AgentId("a2"), n=100, size=1000)) + sim.add_agent(AgentId("a2"), _PingOnce(AgentId("a1"))) + await sim.run(max_ticks=10_000) + + metrics = compute_metrics(trace, ["mean_latency", "duration", "message_count"]) + # Each message gets at least 5 ms base; the last queued one is much + # higher. The average should comfortably exceed 5 ms. + assert metrics["mean_latency"] > 0.005 + assert metrics["duration"] > 0.0 + assert metrics["message_count"] >= 100 diff --git a/packages/nest-core/tests/test_runner_realistic.py b/packages/nest-core/tests/test_runner_realistic.py new file mode 100644 index 0000000..9f2609d --- /dev/null +++ b/packages/nest-core/tests/test_runner_realistic.py @@ -0,0 +1,220 @@ +# SPDX-License-Identifier: Apache-2.0 +"""End-to-end tests for wiring the realistic transport through the ScenarioRunner.""" + +from __future__ import annotations + +import json +import math +from pathlib import Path +from typing import Any + +import pytest +from nest_core.metrics import compute_metrics +from nest_core.runner import ScenarioRunner +from nest_core.scenario import ScenarioConfig + + +def _read_trace(p: Path) -> list[dict[str, Any]]: + return [json.loads(line) for line in p.read_text().splitlines()] + + +class TestRealisticTransportWiring: + @pytest.mark.asyncio + async def test_scenario_picks_up_realistic_transport(self, tmp_path: Path) -> None: + """When ``layers.transport: realistic`` is set, the simulator's + virtual clock must actually advance — proving that the network + model was installed and consulted.""" + trace_file = tmp_path / "trace.jsonl" + config = ScenarioConfig.from_dict( + { + "name": "rt-test", + "seed": 42, + "agents": { + "count": 6, + "roles": [ + {"name": "buyer", "count": 3}, + {"name": "seller", "count": 3}, + ], + }, + "layers": { + "transport": "realistic", + "transport_config": { + "base_latency_ms": 20.0, + "jitter_sigma": 0.0, + "bandwidth_bps": 100_000_000.0, + "loss_rate": 0.0, + }, + }, + "task": {"type": "marketplace", "config": {"rounds": 2}}, + "duration": "ticks: 5000", + "metrics": ["mean_latency", "duration", "message_count"], + "output": {"trace": str(trace_file)}, + } + ) + + runner = ScenarioRunner(config) + await runner.run() + + events = _read_trace(trace_file) + # All ``receive`` events should now have ts > 0 because every hop + # costs 20 ms. + receives = [e for e in events if e["kind"] == "receive"] + assert receives, "scenario produced no receive events" + assert all(e["ts"] > 0 for e in receives) + + metrics = runner.metrics + assert metrics["mean_latency"] >= 0.020 - 1e-9 + assert metrics["duration"] > 0.0 + + @pytest.mark.asyncio + async def test_scenario_loss_increases_drop_count(self, tmp_path: Path) -> None: + """Setting a non-zero loss_rate on the realistic transport should + produce ``dropped`` trace events with reason=='network'.""" + trace_file = tmp_path / "trace.jsonl" + config = ScenarioConfig.from_dict( + { + "name": "loss-test", + "seed": 7, + "agents": { + "count": 6, + "roles": [ + {"name": "buyer", "count": 3}, + {"name": "seller", "count": 3}, + ], + }, + "layers": { + "transport": "realistic", + "transport_config": { + "base_latency_ms": 1.0, + "jitter_sigma": 0.0, + "bandwidth_bps": 1e12, + "loss_rate": 0.5, # half of messages drop at the link + }, + }, + "task": {"type": "marketplace", "config": {"rounds": 3}}, + "duration": "ticks: 5000", + "output": {"trace": str(trace_file)}, + } + ) + + runner = ScenarioRunner(config) + await runner.run() + + events = _read_trace(trace_file) + network_drops = [ + e for e in events if e["kind"] == "dropped" and e.get("reason") == "network" + ] + assert network_drops, "expected network-level drops with loss_rate=0.5" + + @pytest.mark.asyncio + async def test_in_memory_default_still_zero_latency(self, tmp_path: Path) -> None: + """Backwards-compat guard: scenarios that don't pick the realistic + transport must keep the historical zero-latency behavior.""" + trace_file = tmp_path / "trace.jsonl" + config = ScenarioConfig.from_dict( + { + "name": "default-rt", + "seed": 42, + "agents": { + "count": 6, + "roles": [ + {"name": "buyer", "count": 3}, + {"name": "seller", "count": 3}, + ], + }, + "task": {"type": "marketplace", "config": {"rounds": 2}}, + "duration": "ticks: 5000", + "metrics": ["mean_latency", "duration"], + "output": {"trace": str(trace_file)}, + } + ) + + runner = ScenarioRunner(config) + await runner.run() + + events = _read_trace(trace_file) + receives = [e for e in events if e["kind"] == "receive"] + # All receives happen at ts==0 with the default in-memory transport. + assert all(e["ts"] == 0 for e in receives) + assert runner.metrics.get("mean_latency", 0.0) == 0.0 + + @pytest.mark.asyncio + async def test_realistic_run_is_deterministic(self, tmp_path: Path) -> None: + """Same seed + same realistic-network config => byte-identical trace.""" + traces: list[str] = [] + for i in range(2): + trace_file = tmp_path / f"trace-{i}.jsonl" + config = ScenarioConfig.from_dict( + { + "name": "det", + "seed": 99, + "agents": { + "count": 6, + "roles": [ + {"name": "buyer", "count": 3}, + {"name": "seller", "count": 3}, + ], + }, + "layers": { + "transport": "realistic", + "transport_config": { + "base_latency_ms": 5.0, + "jitter_sigma": 0.4, + "bandwidth_bps": 1_000_000.0, + "loss_rate": 0.05, + }, + }, + "task": {"type": "marketplace", "config": {"rounds": 2}}, + "duration": "ticks: 3000", + "output": {"trace": str(trace_file)}, + } + ) + await ScenarioRunner(config).run() + traces.append(trace_file.read_text()) + + assert traces[0] == traces[1] + + def test_metrics_compute_handles_realistic_trace(self, tmp_path: Path) -> None: + """A trace with positive timestamps should produce non-zero latency + and throughput, exercising the previously-dormant code paths.""" + import asyncio + + trace_file = tmp_path / "trace.jsonl" + config = ScenarioConfig.from_dict( + { + "name": "metrics-test", + "seed": 1, + "agents": { + "count": 4, + "roles": [ + {"name": "buyer", "count": 2}, + {"name": "seller", "count": 2}, + ], + }, + "layers": { + "transport": "realistic", + "transport_config": { + "base_latency_ms": 50.0, + "jitter_sigma": 0.0, + "bandwidth_bps": 1e12, + "loss_rate": 0.0, + }, + }, + "task": {"type": "marketplace", "config": {"rounds": 2}}, + "duration": "ticks: 5000", + "output": {"trace": str(trace_file)}, + } + ) + + asyncio.run(ScenarioRunner(config).run()) + + m = compute_metrics( + trace_file, + ["mean_latency", "throughput", "duration", "message_count"], + ) + # Base latency 50 ms + a negligible serialization delay at 1 Tbps. + assert math.isclose(m["mean_latency"], 0.050, abs_tol=1e-3) + assert m["mean_latency"] >= 0.050 + assert m["duration"] > 0 + assert m["throughput"] > 0 + assert m["message_count"] > 0 diff --git a/packages/nest-plugins-reference/nest_plugins_reference/transport/realistic.py b/packages/nest-plugins-reference/nest_plugins_reference/transport/realistic.py new file mode 100644 index 0000000..c4b47ad --- /dev/null +++ b/packages/nest-plugins-reference/nest_plugins_reference/transport/realistic.py @@ -0,0 +1,483 @@ +# SPDX-License-Identifier: Apache-2.0 +"""Realistic transport plugin — per-hop latency, bandwidth, queueing, loss. + +The default ``in_memory`` transport delivers every message at ``time = now``. +That is correctness-faithful but useless for studying any protocol property +that depends on timing: tail latency, retry behavior, backoff strategies, +deadline budgets, congestion. ``RealisticNetwork`` makes those properties +observable inside the deterministic Tier 1 simulator. + +What it models +-------------- + +1. **Per-hop base latency + lognormal jitter.** ``base_latency_ms`` is the + mean propagation delay; ``jitter_sigma`` shapes the spread. Lognormal + keeps the tail heavy (a real network never has a Gaussian latency + distribution). +2. **Bandwidth / serialization delay.** ``bandwidth_bps`` adds + ``payload_size * 8 / bandwidth_bps`` seconds to the hop. A 1 KB message + over a 1 Mbps link is ~8 ms slower than a 64 B message. +3. **Egress queueing.** Each sender has a virtual egress link with a + finite service rate (``bandwidth_bps``). Messages queued back-to-back + are serialized — the second message can't leave until the first is + fully transmitted. This is where ``mean_latency`` stops being a + constant and starts to show the load-curve shape every backend + engineer knows. +4. **Egress queue capacity / load shedding.** ``max_queue_bytes`` caps how + much can be queued per sender. When exceeded, the message is dropped + with a ``"network"`` reason in the trace — drop-tail backpressure, the + crude but honest baseline. +5. **Random packet loss.** ``loss_rate`` applies an additional per-hop + Bernoulli drop, distinct from the simulator's scenario-level + ``failures.message_drop``. This lives at the link layer; failure + injection lives above it. +6. **Per-link overrides.** Specific ``(sender, target)`` pairs can carry + their own latency, jitter, bandwidth, and loss — useful for modeling + a slow datacenter cross-link, a flaky satellite hop, or a hot pair. + +What it does *not* do +--------------------- + +This is still a single-process discrete-event simulator. There are no +TCP windows, no congestion control loops, no FEC. The point is: give +NEST the smallest, sharpest knobs that change protocol-level behavior +in a controlled way. Real TCP is what your production transport does; +NEST is for stressing the protocol that runs *on top* of it. + +Example +------- + +:: + + from nest_plugins_reference.transport.realistic import ( + RealisticNetwork, RealisticTransport, LinkConfig, + ) + + net = RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.4, + bandwidth_bps=10_000_000, # 10 Mbps egress per agent + max_queue_bytes=1_000_000, # 1 MB per-agent egress queue + loss_rate=0.001, + links={ + ("buyer-0", "seller-0"): LinkConfig(base_latency_ms=80.0), + }, + ) + + sim = Simulator(seed=42, network_model=net) +""" + +from __future__ import annotations + +import math +import random +from dataclasses import dataclass +from typing import cast + +from nest_core.sim.network import NetworkModel +from nest_core.types import AgentId, TransportCapabilities + + +@dataclass(frozen=True) +class LinkConfig: + """Per-link overrides for the realistic network. + + Any field left as ``None`` falls back to the network-wide default. + + Example:: + + slow = LinkConfig(base_latency_ms=200.0, loss_rate=0.05) + """ + + base_latency_ms: float | None = None + jitter_sigma: float | None = None + bandwidth_bps: float | None = None + loss_rate: float | None = None + + +@dataclass +class _EgressState: + """Per-sender egress link bookkeeping for queueing delay.""" + + busy_until: float = 0.0 + queued_bytes: int = 0 + + +class RealisticNetwork: + """A network model with per-hop latency, bandwidth, jitter, and queueing. + + Instances are stateless across runs in the sense that determinism is + governed entirely by the RNG the simulator supplies, but within a + single run the model maintains per-agent egress state so queueing + composes correctly across consecutive sends. + + Example:: + + net = RealisticNetwork(base_latency_ms=10.0, bandwidth_bps=1_000_000) + """ + + def __init__( + self, + base_latency_ms: float = 5.0, + jitter_sigma: float = 0.3, + bandwidth_bps: float = 100_000_000.0, + max_queue_bytes: int = 10_000_000, + loss_rate: float = 0.0, + links: dict[tuple[str, str], LinkConfig] | None = None, + ) -> None: + if base_latency_ms < 0: + msg = "base_latency_ms must be >= 0" + raise ValueError(msg) + if jitter_sigma < 0: + msg = "jitter_sigma must be >= 0" + raise ValueError(msg) + if bandwidth_bps <= 0: + msg = "bandwidth_bps must be > 0" + raise ValueError(msg) + if max_queue_bytes < 0: + msg = "max_queue_bytes must be >= 0" + raise ValueError(msg) + if not 0.0 <= loss_rate <= 1.0: + msg = "loss_rate must be in [0, 1]" + raise ValueError(msg) + + self._base_latency_ms = base_latency_ms + self._jitter_sigma = jitter_sigma + self._bandwidth_bps = bandwidth_bps + self._max_queue_bytes = max_queue_bytes + self._loss_rate = loss_rate + self._links: dict[tuple[str, str], LinkConfig] = { + (str(s), str(t)): cfg for (s, t), cfg in (links or {}).items() + } + self._egress: dict[AgentId, _EgressState] = {} + + # Diagnostic counters — useful in tests, not part of the public API. + self.stats: dict[str, int] = { + "scheduled": 0, + "dropped_loss": 0, + "dropped_queue_full": 0, + } + + @classmethod + def from_config(cls, config: dict[str, object]) -> RealisticNetwork: + """Build a network from a plain-dict config (typically scenario YAML). + + Recognized keys: ``base_latency_ms``, ``jitter_sigma``, + ``bandwidth_bps``, ``max_queue_bytes``, ``loss_rate``, and + ``links`` (a list of ``{from, to, ...}`` dicts). + + Example:: + + net = RealisticNetwork.from_config({ + "base_latency_ms": 20, + "jitter_sigma": 0.5, + "bandwidth_bps": 1_000_000, + "links": [{"from": "a", "to": "b", "base_latency_ms": 150}], + }) + """ + links: dict[tuple[str, str], LinkConfig] = {} + raw_links = config.get("links") + if isinstance(raw_links, list): + # cast: ``config`` is dict[str, object]; ``list`` widens its items + # back to Unknown for pyright. Treat each item as ``object`` and + # rely on ``isinstance`` checks below. + raw_links_list = cast("list[object]", raw_links) + for item in raw_links_list: + if not isinstance(item, dict): + continue + item_dict = cast("dict[object, object]", item) + src = item_dict.get("from") + dst = item_dict.get("to") + if not isinstance(src, str) or not isinstance(dst, str): + continue + links[(src, dst)] = LinkConfig( + base_latency_ms=_opt_float(item_dict.get("base_latency_ms")), + jitter_sigma=_opt_float(item_dict.get("jitter_sigma")), + bandwidth_bps=_opt_float(item_dict.get("bandwidth_bps")), + loss_rate=_opt_float(item_dict.get("loss_rate")), + ) + return cls( + base_latency_ms=_cfg_float(config.get("base_latency_ms"), 5.0), + jitter_sigma=_cfg_float(config.get("jitter_sigma"), 0.3), + bandwidth_bps=_cfg_float(config.get("bandwidth_bps"), 100_000_000.0), + max_queue_bytes=_cfg_int(config.get("max_queue_bytes"), 10_000_000), + loss_rate=_cfg_float(config.get("loss_rate"), 0.0), + links=links, + ) + + def _link(self, sender: AgentId, target: AgentId) -> LinkConfig | None: + return self._links.get((str(sender), str(target))) + + def _resolve( + self, + sender: AgentId, + target: AgentId, + ) -> tuple[float, float, float, float]: + """Resolve per-link params, falling back to network defaults.""" + link = self._link(sender, target) + base_ms = ( + self._base_latency_ms + if link is None or link.base_latency_ms is None + else link.base_latency_ms + ) + sigma = ( + self._jitter_sigma if link is None or link.jitter_sigma is None else link.jitter_sigma + ) + bw = ( + self._bandwidth_bps + if link is None or link.bandwidth_bps is None + else link.bandwidth_bps + ) + loss = self._loss_rate if link is None or link.loss_rate is None else link.loss_rate + return base_ms, sigma, bw, loss + + def schedule( + self, + sender: AgentId, + target: AgentId, + payload_size: int, + t_now: float, + rng: random.Random, + ) -> float | None: + """Compute the delivery time for a single send. + + Returns ``None`` to signal a transport-level drop (random loss or + queue overflow). + + Example:: + + t = net.schedule(AgentId("a1"), AgentId("a2"), 256, 0.0, rng) + """ + base_ms, sigma, bw, loss = self._resolve(sender, target) + + # 1. Random link-layer loss. Sampled first so the drop decision + # does not depend on the queueing state. + if loss > 0.0 and rng.random() < loss: + self.stats["dropped_loss"] += 1 + return None + + # 2. Egress queueing. The sender has a virtual link with a finite + # service rate. A new message has to wait for whatever is + # currently in flight. + egress = self._egress.setdefault(sender, _EgressState()) + service_time = (payload_size * 8.0) / bw # seconds + + # Queue-occupancy estimate: bytes that haven't finished serializing. + # We update lazily — if the link has gone idle, drain it first. + if egress.busy_until <= t_now: + egress.busy_until = t_now + egress.queued_bytes = 0 + + # Drop-tail: refuse the message if it would push the queue past the cap. + if (egress.queued_bytes + payload_size) > self._max_queue_bytes: + self.stats["dropped_queue_full"] += 1 + return None + + # 3. Jitter — lognormal so the tail behaves like a real network. + # A sigma of ~0.3 gives a P99/median ratio around 2x; + # sigma ~0.6 gives 4x, which is more like cross-region. + jitter_factor = math.exp(rng.gauss(0.0, sigma)) if sigma > 0 else 1.0 + base_s = (base_ms / 1000.0) * jitter_factor + + # 4. Departure = when this message finishes serializing on the link. + departure = egress.busy_until + service_time + delivery = departure + base_s + + egress.busy_until = departure + egress.queued_bytes += payload_size + + self.stats["scheduled"] += 1 + return delivery + + @property + def network_model(self) -> NetworkModel: + """Self-reference so callers can treat a plugin instance uniformly. + + Example:: + + sim.set_network_model(net.network_model) + """ + return self + + +# Protocol check at import time. +_proto_check: type[NetworkModel] = RealisticNetwork # noqa: F841 + + +def _opt_float(v: object) -> float | None: + if v is None: + return None + if isinstance(v, (int, float)): + return float(v) + if isinstance(v, str): + try: + return float(v) + except ValueError: + return None + return None + + +def _cfg_float(v: object, default: float) -> float: + parsed = _opt_float(v) + return default if parsed is None else parsed + + +def _cfg_int(v: object, default: int) -> int: + if v is None: + return default + if isinstance(v, bool): + return default + if isinstance(v, int): + return v + if isinstance(v, float): + return int(v) + if isinstance(v, str): + try: + return int(v) + except ValueError: + return default + return default + + +# --------------------------------------------------------------------------- +# Standalone transport — usable outside the simulator (e.g. Tier 2 / tests) +# --------------------------------------------------------------------------- + + +@dataclass +class _Pending: + """A scheduled delivery awaiting its turn.""" + + deliver_at: float + sender: AgentId + payload: bytes + seq: int + + +# Public type alias so callers (and tests) can name the bus payload type +# without reaching into private symbols. +PendingMessage = _Pending + + +class RealisticTransport: + """A standalone transport that uses :class:`RealisticNetwork` directly. + + For Tier 1, prefer wiring :class:`RealisticNetwork` into the + :class:`~nest_core.sim.simulator.Simulator` via the ``network_model`` + parameter — that path is push-based and reuses the simulator's event + queue. This class exists so the plugin satisfies the + :class:`nest.plugins.transport` entry-point contract (a callable + ``Transport`` implementation) and can be exercised in unit tests + without spinning up a simulator. + + Example:: + + net = RealisticNetwork(base_latency_ms=10.0) + bus = {} + t1 = RealisticTransport(AgentId("a1"), net, bus) + t2 = RealisticTransport(AgentId("a2"), net, bus) + await t1.send(AgentId("a2"), b"hello") + """ + + capabilities = TransportCapabilities( + supports_streaming=False, + ordered=True, + reliable=False, # because realistic networks drop packets + ) + + def __init__( + self, + agent_id: AgentId, + network: RealisticNetwork, + bus: dict[AgentId, list[_Pending]] | None = None, + clock: float = 0.0, + rng: random.Random | None = None, + ) -> None: + self._agent_id = agent_id + self._network = network + self._bus: dict[AgentId, list[_Pending]] = bus if bus is not None else {} + self._bus.setdefault(agent_id, []) + self._clock = clock + self._seq = 0 + self._rng = rng if rng is not None else random.Random(0) + + @property + def now(self) -> float: + """Current logical time used for scheduling decisions. + + Example:: + + t = transport.now + """ + return self._clock + + def advance(self, t: float) -> None: + """Advance the standalone clock (caller-driven; no autopilot here). + + Example:: + + transport.advance(1.5) + """ + if t < self._clock: + msg = "Cannot move clock backwards" + raise ValueError(msg) + self._clock = t + + async def send(self, to: AgentId, payload: bytes) -> bool: + """Schedule a send. Returns ``True`` if accepted, ``False`` if dropped. + + Example:: + + ok = await transport.send(AgentId("a2"), b"hi") + """ + t = self._network.schedule( + self._agent_id, + to, + len(payload), + self._clock, + self._rng, + ) + if t is None: + return False + self._seq += 1 + self._bus.setdefault(to, []).append( + _Pending(deliver_at=t, sender=self._agent_id, payload=payload, seq=self._seq), + ) + return True + + async def receive(self) -> tuple[AgentId, bytes]: + """Return the earliest deliverable message (blocks logically, not async). + + Will raise ``LookupError`` if nothing has been delivered yet at + the current logical clock — call :meth:`advance` first. + + Example:: + + sender, payload = await transport.receive() + """ + queue = self._bus.get(self._agent_id, []) + ready = [p for p in queue if p.deliver_at <= self._clock] + if not ready: + raise LookupError("no message ready at current clock") + # Earliest-time, then send-order, for deterministic ordering. + ready.sort(key=lambda p: (p.deliver_at, p.seq)) + chosen = ready[0] + queue.remove(chosen) + return (chosen.sender, chosen.payload) + + async def broadcast(self, payload: bytes) -> int: + """Broadcast to every agent registered on the shared bus. + + Returns the number of recipients that accepted the message + (loss / queue-overflow drops are counted separately on the + network's ``stats`` dict). + + Example:: + + n = await transport.broadcast(b"announcement") + """ + accepted = 0 + for aid in list(self._bus.keys()): + if aid != self._agent_id and await self.send(aid, payload): + accepted += 1 + return accepted diff --git a/packages/nest-plugins-reference/tests/test_realistic_transport.py b/packages/nest-plugins-reference/tests/test_realistic_transport.py new file mode 100644 index 0000000..dd2515a --- /dev/null +++ b/packages/nest-plugins-reference/tests/test_realistic_transport.py @@ -0,0 +1,403 @@ +# SPDX-License-Identifier: Apache-2.0 +"""Tests for the realistic transport: latency, queueing, loss, determinism.""" + +from __future__ import annotations + +import math +import random +import statistics + +import pytest +from nest_core.sim.network import NetworkModel, ZeroLatencyNetworkModel +from nest_core.types import AgentId +from nest_plugins_reference.transport.realistic import ( + LinkConfig, + PendingMessage, + RealisticNetwork, + RealisticTransport, +) + + +def _close(actual: float | None, expected: float, *, abs_tol: float = 1e-9) -> bool: + """Pyright-friendly replacement for ``pytest.approx(...)``. + + pytest's ``approx`` returns an ``ApproxBase`` with loosely typed members + that pyright flags as ``Unknown`` under ``strict``. ``math.isclose`` is + fully typed and equivalent for the numeric comparisons we do here. + """ + return actual is not None and math.isclose(actual, expected, abs_tol=abs_tol) + + +A1 = AgentId("a1") +A2 = AgentId("a2") +A3 = AgentId("a3") + + +# --------------------------------------------------------------------------- +# NetworkModel protocol conformance +# --------------------------------------------------------------------------- + + +class TestProtocolConformance: + def test_zero_latency_implements_protocol(self) -> None: + model: NetworkModel = ZeroLatencyNetworkModel() + t = model.schedule(A1, A2, 100, 0.5, random.Random(0)) + assert t == 0.5 + + def test_realistic_implements_protocol(self) -> None: + model: NetworkModel = RealisticNetwork(base_latency_ms=10.0, jitter_sigma=0.0) + t = model.schedule(A1, A2, 100, 0.0, random.Random(0)) + assert isinstance(t, float) + assert t is not None + assert t > 0.0 + + +# --------------------------------------------------------------------------- +# Validation: bad construction params should be rejected eagerly +# --------------------------------------------------------------------------- + + +class TestValidation: + def test_negative_latency_rejected(self) -> None: + with pytest.raises(ValueError, match="base_latency_ms"): + RealisticNetwork(base_latency_ms=-1.0) + + def test_negative_sigma_rejected(self) -> None: + with pytest.raises(ValueError, match="jitter_sigma"): + RealisticNetwork(jitter_sigma=-0.1) + + def test_zero_bandwidth_rejected(self) -> None: + with pytest.raises(ValueError, match="bandwidth_bps"): + RealisticNetwork(bandwidth_bps=0.0) + + def test_invalid_loss_rejected(self) -> None: + with pytest.raises(ValueError, match="loss_rate"): + RealisticNetwork(loss_rate=1.5) + + def test_invalid_queue_cap_rejected(self) -> None: + with pytest.raises(ValueError, match="max_queue_bytes"): + RealisticNetwork(max_queue_bytes=-1) + + +# --------------------------------------------------------------------------- +# Latency / jitter behavior +# --------------------------------------------------------------------------- + + +class TestLatency: + def test_fixed_latency_no_jitter(self) -> None: + net = RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.0, + bandwidth_bps=1e12, # essentially infinite — no serialization delay + loss_rate=0.0, + ) + t = net.schedule(A1, A2, 1, 0.0, random.Random(0)) + assert _close(t, 0.010) + + def test_serialization_delay(self) -> None: + # 1 Mbps, 1000 bytes => 8 ms transmission delay, plus 5 ms base. + net = RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + ) + t = net.schedule(A1, A2, 1000, 0.0, random.Random(0)) + assert _close(t, 0.005 + 0.008) + + def test_jitter_produces_spread(self) -> None: + rng = random.Random(123) + # Reset egress state between samples so we only see jitter, not queueing. + samples: list[float] = [] + for _ in range(500): + n = RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.4, + bandwidth_bps=1e12, + loss_rate=0.0, + ) + t = n.schedule(A1, A2, 1, 0.0, rng) + assert t is not None + samples.append(t) + assert min(samples) < 0.010 < max(samples) # both sides of base + # Lognormal with sigma=0.4 has median = base, mean > base. Spread > 0. + assert statistics.stdev(samples) > 0.001 + + def test_per_link_override(self) -> None: + net = RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.0, + bandwidth_bps=1e12, + loss_rate=0.0, + links={(str(A1), str(A2)): LinkConfig(base_latency_ms=200.0)}, + ) + t_slow = net.schedule(A1, A2, 1, 0.0, random.Random(0)) + # Use a fresh network instance so queueing on A1's egress doesn't + # bias the comparison. + net2 = RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.0, + bandwidth_bps=1e12, + loss_rate=0.0, + links={(str(A1), str(A2)): LinkConfig(base_latency_ms=200.0)}, + ) + t_fast = net2.schedule(A1, A3, 1, 0.0, random.Random(0)) + assert _close(t_slow, 0.200) + assert _close(t_fast, 0.005) + + +# --------------------------------------------------------------------------- +# Egress queueing — the load curve +# --------------------------------------------------------------------------- + + +class TestQueueing: + def test_sequential_sends_queue_up(self) -> None: + # 1 Mbps, 1000-byte messages => 8 ms per message of serialization. + # The second message must depart 8 ms after the first. + net = RealisticNetwork( + base_latency_ms=0.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + ) + rng = random.Random(0) + t1 = net.schedule(A1, A2, 1000, 0.0, rng) + t2 = net.schedule(A1, A2, 1000, 0.0, rng) + t3 = net.schedule(A1, A2, 1000, 0.0, rng) + assert _close(t1, 0.008) + assert _close(t2, 0.016) + assert _close(t3, 0.024) + + def test_idle_link_drains(self) -> None: + # If the simulation clock moves past busy_until, queueing resets. + net = RealisticNetwork( + base_latency_ms=0.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + ) + rng = random.Random(0) + t1 = net.schedule(A1, A2, 1000, 0.0, rng) + assert _close(t1, 0.008) + # Big gap — link goes idle. Next send should not stack on top. + t2 = net.schedule(A1, A2, 1000, 1.0, rng) + assert _close(t2, 1.008) + + def test_per_sender_queues_are_independent(self) -> None: + # A1 and A3 both target A2, but each has its own egress queue. + net = RealisticNetwork( + base_latency_ms=0.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + ) + rng = random.Random(0) + t_a1 = net.schedule(A1, A2, 1000, 0.0, rng) + t_a3 = net.schedule(A3, A2, 1000, 0.0, rng) + assert _close(t_a1, 0.008) + assert _close(t_a3, 0.008) + + def test_queue_overflow_drops(self) -> None: + # Tiny cap: 1500-byte budget. First 1000 fits; second 1000 overflows. + net = RealisticNetwork( + base_latency_ms=0.0, + jitter_sigma=0.0, + bandwidth_bps=1_000_000.0, + loss_rate=0.0, + max_queue_bytes=1500, + ) + rng = random.Random(0) + assert net.schedule(A1, A2, 1000, 0.0, rng) is not None + assert net.schedule(A1, A2, 1000, 0.0, rng) is None + assert net.stats["dropped_queue_full"] == 1 + + +# --------------------------------------------------------------------------- +# Loss +# --------------------------------------------------------------------------- + + +class TestLoss: + def test_zero_loss_never_drops(self) -> None: + net = RealisticNetwork(loss_rate=0.0, jitter_sigma=0.0) + rng = random.Random(0) + for _ in range(200): + assert net.schedule(A1, A2, 1, 0.0, rng) is not None + assert net.stats["dropped_loss"] == 0 + + def test_full_loss_always_drops(self) -> None: + net = RealisticNetwork(loss_rate=1.0) + rng = random.Random(0) + for _ in range(50): + assert net.schedule(A1, A2, 1, 0.0, rng) is None + assert net.stats["dropped_loss"] == 50 + + def test_loss_rate_approximate(self) -> None: + net = RealisticNetwork(loss_rate=0.10, jitter_sigma=0.0, bandwidth_bps=1e12) + rng = random.Random(42) + n = 5000 + dropped = sum(1 for _ in range(n) if net.schedule(A1, A2, 1, 0.0, rng) is None) + # 99% CI for binomial with p=0.1, n=5000 is roughly ±25 around 500 + assert 400 < dropped < 600 + + def test_per_link_loss_override(self) -> None: + net = RealisticNetwork( + loss_rate=0.0, + links={(str(A1), str(A2)): LinkConfig(loss_rate=1.0)}, + ) + rng = random.Random(0) + # Hot link: every message dropped. + for _ in range(20): + assert net.schedule(A1, A2, 1, 0.0, rng) is None + # Other link: never dropped. + for _ in range(20): + assert net.schedule(A1, A3, 1, 0.0, rng) is not None + + +# --------------------------------------------------------------------------- +# Determinism — same RNG seed => identical trace +# --------------------------------------------------------------------------- + + +class TestDeterminism: + def test_same_seed_same_schedule(self) -> None: + def run() -> list[float | None]: + net = RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.4, + bandwidth_bps=1_000_000.0, + loss_rate=0.05, + ) + rng = random.Random(2024) + out: list[float | None] = [] + for i in range(500): + size = 100 + (i % 10) * 50 + out.append(net.schedule(A1, A2, size, i * 0.001, rng)) + return out + + assert run() == run() + + def test_different_seed_diverges(self) -> None: + net_a = RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.4, + bandwidth_bps=1e6, + loss_rate=0.05, + ) + net_b = RealisticNetwork( + base_latency_ms=10.0, + jitter_sigma=0.4, + bandwidth_bps=1e6, + loss_rate=0.05, + ) + a = [net_a.schedule(A1, A2, 100, 0.0, random.Random(1)) for _ in range(100)] + b = [net_b.schedule(A1, A2, 100, 0.0, random.Random(2)) for _ in range(100)] + assert a != b + + +# --------------------------------------------------------------------------- +# from_config — scenario YAML wiring +# --------------------------------------------------------------------------- + + +class TestFromConfig: + def test_defaults_when_empty(self) -> None: + net = RealisticNetwork.from_config({}) + t = net.schedule(A1, A2, 1, 0.0, random.Random(0)) + # Default base 5 ms, default bandwidth huge → ~ 5 ms with jitter. + assert t is not None + assert 0.001 < t < 0.050 + + def test_parses_links_list(self) -> None: + net = RealisticNetwork.from_config( + { + "base_latency_ms": 1.0, + "jitter_sigma": 0.0, + "bandwidth_bps": 1e12, + "links": [ + {"from": str(A1), "to": str(A2), "base_latency_ms": 100.0}, + ], + } + ) + t = net.schedule(A1, A2, 1, 0.0, random.Random(0)) + assert _close(t, 0.100) + + def test_ignores_malformed_link(self) -> None: + net = RealisticNetwork.from_config( + { + "base_latency_ms": 2.0, + "jitter_sigma": 0.0, + "bandwidth_bps": 1e12, + "links": ["not a dict", {"from": str(A1)}], + }, # missing 'to' + ) + t = net.schedule(A1, A2, 1, 0.0, random.Random(0)) + assert _close(t, 0.002) + + +# --------------------------------------------------------------------------- +# Standalone RealisticTransport — non-simulator usage +# --------------------------------------------------------------------------- + + +class TestStandaloneTransport: + @pytest.mark.asyncio + async def test_send_then_receive_after_advance(self) -> None: + net = RealisticNetwork( + base_latency_ms=5.0, + jitter_sigma=0.0, + bandwidth_bps=1e12, + loss_rate=0.0, + ) + bus: dict[AgentId, list[PendingMessage]] = {} + t1 = RealisticTransport(A1, net, bus, rng=random.Random(0)) + t2 = RealisticTransport(A2, net, bus, rng=random.Random(0)) + + ok = await t1.send(A2, b"hello") + assert ok is True + + # Not ready at t=0. + with pytest.raises(LookupError): + await t2.receive() + + # Advance past delivery. + t2.advance(0.010) + sender, payload = await t2.receive() + assert sender == A1 + assert payload == b"hello" + + @pytest.mark.asyncio + async def test_drop_returns_false(self) -> None: + net = RealisticNetwork(loss_rate=1.0) + bus: dict[AgentId, list[PendingMessage]] = {} + t1 = RealisticTransport(A1, net, bus, rng=random.Random(0)) + # registering t2 so the bus knows about it + RealisticTransport(A2, net, bus, rng=random.Random(0)) + + ok = await t1.send(A2, b"hello") + assert ok is False + + @pytest.mark.asyncio + async def test_broadcast_excludes_self(self) -> None: + net = RealisticNetwork( + base_latency_ms=0.0, + jitter_sigma=0.0, + bandwidth_bps=1e12, + loss_rate=0.0, + ) + bus: dict[AgentId, list[PendingMessage]] = {} + t1 = RealisticTransport(A1, net, bus, rng=random.Random(0)) + RealisticTransport(A2, net, bus, rng=random.Random(0)) + RealisticTransport(A3, net, bus, rng=random.Random(0)) + + accepted = await t1.broadcast(b"hello") + assert accepted == 2 # A2 and A3, not self + + def test_clock_cannot_go_backwards(self) -> None: + net = RealisticNetwork() + bus: dict[AgentId, list[PendingMessage]] = {} + t1 = RealisticTransport(A1, net, bus, clock=5.0) + with pytest.raises(ValueError, match="backwards"): + t1.advance(3.0) diff --git a/scenarios/marketplace_realistic.yaml b/scenarios/marketplace_realistic.yaml new file mode 100644 index 0000000..ae3aac4 --- /dev/null +++ b/scenarios/marketplace_realistic.yaml @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: Apache-2.0 +# Marketplace scenario with the *realistic* transport plugin. +# +# Demonstrates per-hop latency, egress bandwidth, jitter, and drop-tail +# backpressure on top of the same marketplace task. Compare the trace's +# mean_latency / duration metrics against scenarios/marketplace.yaml +# (which uses the zero-latency in_memory transport). +name: marketplace_realistic +description: "50 buyers + 50 sellers over a realistic per-hop network." + +tier: 1 +seed: 42 + +agents: + count: 100 + brain: state-machine + roles: + - name: buyer + count: 50 + - name: seller + count: 50 + +layers: + transport: realistic + # The transport_config block is forwarded to RealisticNetwork.from_config. + # All units are SI: latency in milliseconds, bandwidth in bits/second. + transport_config: + base_latency_ms: 5.0 # mean propagation per hop + jitter_sigma: 0.4 # lognormal jitter; ~ P99/median = 2.5x + bandwidth_bps: 10_000_000 # 10 Mbps egress per agent + max_queue_bytes: 1_000_000 # 1 MB per-agent egress queue; drop-tail + loss_rate: 0.001 # 0.1% per-hop link-level packet loss + # Optional per-link overrides — model a slow / flaky link explicitly. + links: + - from: buyer-0 + to: seller-0 + base_latency_ms: 80.0 # cross-region hop + loss_rate: 0.02 # 2% loss for this pair + comms: nest_native + identity: did_key + registry: in_memory + auth: jwt + trust: score_average + payments: prepaid_credits + coordination: contract_net + negotiation: alternating_offers + memory: blackboard + privacy: noop + datafacts: datafacts_v1 + +task: + type: marketplace + config: + catalog_size: 200 + rounds: 10 + +failures: + # Scenario-level failure injection (Bernoulli drop above the transport) + # is orthogonal to the network's loss_rate above. Both can be combined + # to stress-test retry / deadline logic. + message_drop: 0.0 + +duration: "ticks: 50000" + +metrics: + - mean_latency + - throughput + - duration + - message_count + - dropped_count + - delivery_rate + +output: + trace: ./traces/marketplace_realistic.jsonl