Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ resolved by name via entry points or a built-in default.

| # | Layer | Interface | Default plugin |
|---|---|---|---|
| 1 | Transport | `Transport` | `in_memory` (in-process event queue; no network I/O) |
| 1 | Transport | `Transport` | `in_memory` (in-process event queue; no network I/O) — also `realistic` (per-hop latency, jitter, bandwidth, queueing, drop-tail backpressure) |
| 2 | Communication | `CommsProtocol` | `nest_native` (JSON envelope, base64 payload) |
| 3 | Identity | `Identity` | `did_key` (deterministic public-key signatures for simulation; not Ed25519) |
| 4 | Registry | `Registry` | `in_memory` (dict lookup, no persistence) |
Expand Down Expand Up @@ -312,7 +312,11 @@ Two things to know that aren't obvious:
*is correct*. That's what your plugin is for.
- **Single process.** No distributed execution.
- **In-memory transport only out of the box.** No TCP, gRPC, or HTTP
yet.
yet. The bundled `realistic` transport models per-hop latency, jitter,
bandwidth, queueing, and packet loss on top of the same in-process
event queue — useful for stressing protocol timing without leaving
Tier 1. Pick it via `layers.transport: realistic` and tune knobs in
`layers.transport_config` (see `scenarios/marketplace_realistic.yaml`).
- **Tier 2 is non-deterministic.** Don't use it for benchmarks.

---
Expand Down
1 change: 1 addition & 0 deletions packages/nest-core/nest_core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_REF = "nest_plugins_reference"
_BUILTINS: dict[tuple[str, str], str] = {
("transport", "in_memory"): f"{_REF}.transport.in_memory:StandaloneInMemoryTransport",
("transport", "realistic"): f"{_REF}.transport.realistic:RealisticTransport",
("comms", "nest_native"): f"{_REF}.comms.nest_native:NestNativeComms",
("identity", "did_key"): f"{_REF}.identity.did_key:DidKeyIdentity",
("registry", "in_memory"): f"{_REF}.registry.in_memory:InMemoryRegistry",
Expand Down
22 changes: 22 additions & 0 deletions packages/nest-core/nest_core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,25 @@ def _create_shell_agents(self, plugins: dict[str, Any]) -> dict[AgentId, Any]:
raise KeyError(msg)
return factory_fn(self._config, plugins, backend=backend)

def _build_network_model(self) -> Any:
"""Build a NetworkModel from scenario config, or return None.

For ``transport: realistic``, instantiates
:class:`~nest_plugins_reference.transport.realistic.RealisticNetwork`
from ``layers.transport_config``. Other transport plugins fall
back to the simulator's default (zero-latency) model.

Example::

model = runner._build_network_model()
"""
layers = self._config.layers
if layers.transport != "realistic":
return None
from nest_plugins_reference.transport.realistic import RealisticNetwork

return RealisticNetwork.from_config(layers.transport_config or {})

async def run(self) -> Path:
"""Run the scenario and return the trace file path.

Expand All @@ -159,13 +178,16 @@ async def run(self) -> Path:
raw_groups = failures.network_partition.get("groups")
partition_groups = _parse_partition_groups(raw_groups)

network_model = self._build_network_model()

sim = Simulator(
seed=self._config.seed,
trace_path=trace_path,
message_drop_rate=failures.message_drop,
byzantine_fraction=failures.byzantine_agents,
partition_groups=partition_groups,
plugins=plugins,
network_model=network_model,
)

agents = self._create_agents(plugins)
Expand Down
5 changes: 5 additions & 0 deletions packages/nest-core/nest_core/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ def _count_non_negative(cls, value: int) -> int:
class LayerConfig(BaseModel):
"""Plugin selection for each of the 12 layers.

``transport_config`` is an optional dict forwarded to the transport
plugin (e.g. ``RealisticNetwork.from_config``). Most plugins accept
no config; the field is ignored when empty.

Example::

layers = LayerConfig(transport="in_memory", comms="nest_native")
"""

transport: str = "in_memory"
transport_config: dict[str, Any] = Field(default_factory=dict)
comms: str = "nest_native"
identity: str = "did_key"
registry: str = "in_memory"
Expand Down
4 changes: 4 additions & 0 deletions packages/nest-core/nest_core/sim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from nest_core.sim.clock import VirtualClock as VirtualClock
from nest_core.sim.events import Event as Event
from nest_core.sim.events import EventQueue as EventQueue
from nest_core.sim.network import NetworkModel as NetworkModel
from nest_core.sim.network import ZeroLatencyNetworkModel as ZeroLatencyNetworkModel
from nest_core.sim.simulator import Simulator as Simulator
from nest_core.sim.trace import TraceWriter as TraceWriter
from nest_core.sim.transport import InMemoryTransport as InMemoryTransport
Expand All @@ -20,8 +22,10 @@
"Event",
"EventQueue",
"InMemoryTransport",
"NetworkModel",
"Simulator",
"StateMachineAgent",
"TraceWriter",
"VirtualClock",
"ZeroLatencyNetworkModel",
]
95 changes: 95 additions & 0 deletions packages/nest-core/nest_core/sim/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-License-Identifier: Apache-2.0
"""Network model abstraction for the discrete-event simulator.

The default ``in_memory`` transport delivers messages at ``time = now`` — fine
for correctness testing, but useless for any property that depends on
latency, queueing, or backpressure. ``NetworkModel`` is the seam: given a
send event, it returns the virtual time at which the message should be
delivered (or ``None`` to drop the message).

The default :class:`ZeroLatencyNetworkModel` reproduces the existing
zero-latency semantics so simulations that don't configure a network model
behave exactly as before.

Plugin transports can ship their own :class:`NetworkModel` subclass and
expose it via a ``network_model`` attribute; the scenario runner picks it
up automatically and wires it into the simulator.

Example::

class FixedDelayNetwork(NetworkModel):
def schedule(self, sender, target, payload_size, t_now, rng):
return t_now + 0.010 # 10 ms per hop, no jitter, no drops

sim = Simulator(seed=42, network_model=FixedDelayNetwork())
"""

from __future__ import annotations

import random
from typing import Protocol, runtime_checkable

from nest_core.types import AgentId


@runtime_checkable
class NetworkModel(Protocol):
"""Pluggable per-hop scheduling for the simulator's transport.

Implementations must be deterministic given the supplied ``rng``: the
simulator passes its own failure-injection RNG so that traces remain
reproducible across runs with the same seed.

Example::

class MyNetwork:
def schedule(self, sender, target, payload_size, t_now, rng):
return t_now + rng.uniform(0.001, 0.005)
"""

def schedule(
self,
sender: AgentId,
target: AgentId,
payload_size: int,
t_now: float,
rng: random.Random,
) -> float | None:
"""Return the delivery time for a message, or ``None`` to drop it.

The returned time must be ``>= t_now``. Returning ``None`` signals
a transport-level drop; the simulator records a ``dropped`` event
with reason ``"network"``.

Example::

t_deliver = model.schedule(a1, a2, 128, ctx.time, rng)
"""
...


class ZeroLatencyNetworkModel:
"""Default model: every message is delivered at the current simulation time.

This preserves the existing in-memory behavior and ensures backwards
compatibility for traces and validators that assume zero-latency
delivery.

Example::

sim = Simulator(seed=0, network_model=ZeroLatencyNetworkModel())
"""

def schedule(
self,
sender: AgentId, # noqa: ARG002 — part of the protocol surface
target: AgentId, # noqa: ARG002
payload_size: int, # noqa: ARG002
t_now: float,
rng: random.Random, # noqa: ARG002
) -> float | None:
return t_now


# Verify the default satisfies the protocol at import time.
_proto_check: type[NetworkModel] = ZeroLatencyNetworkModel # noqa: F841
99 changes: 91 additions & 8 deletions packages/nest-core/nest_core/sim/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from nest_core.sim.agent import AgentContext, StateMachineAgent
from nest_core.sim.clock import VirtualClock
from nest_core.sim.events import Event, EventQueue
from nest_core.sim.network import NetworkModel, ZeroLatencyNetworkModel
from nest_core.sim.trace import TraceWriter
from nest_core.sim.transport import InMemoryTransport
from nest_core.types import AgentId, CorrelationId
Expand Down Expand Up @@ -99,7 +100,20 @@ async def send(self, to: AgentId, payload: bytes) -> None:
"corr": str(cid),
}
)
await self._transport.send(to, payload, correlation_id=cid)
_, accepted = await self._transport.send(to, payload, correlation_id=cid)
if not accepted and self._trace:
self._trace.record(
{
"ts": self._clock.now,
"agent": str(to),
"kind": "dropped",
"from": str(self._agent_id),
"size": len(payload),
"msg": payload.decode("utf-8", errors="replace"),
"corr": str(cid),
"reason": "network",
}
)

async def broadcast(self, payload: bytes) -> None:
cid = self._corr.next()
Expand All @@ -114,7 +128,22 @@ async def broadcast(self, payload: bytes) -> None:
"corr": str(cid),
}
)
await self._transport.broadcast(payload, correlation_id=cid)
results = await self._transport.broadcast(payload, correlation_id=cid)
if self._trace:
for target, _t, accepted in results:
if not accepted:
self._trace.record(
{
"ts": self._clock.now,
"agent": str(target),
"kind": "dropped",
"from": str(self._agent_id),
"size": len(payload),
"msg": payload.decode("utf-8", errors="replace"),
"corr": str(cid),
"reason": "network",
}
)

async def schedule(self, delay: float, payload: bytes) -> None:
self._queue.push(
Expand Down Expand Up @@ -150,6 +179,7 @@ def __init__(
byzantine_fraction: float = 0.0,
partition_groups: list[list[str]] | None = None,
plugins: dict[str, Any] | None = None,
network_model: NetworkModel | None = None,
) -> None:
if not 0.0 <= message_drop_rate <= 1.0:
msg = f"message_drop_rate must be between 0 and 1: {message_drop_rate}"
Expand All @@ -175,6 +205,10 @@ def __init__(
self._byzantine_agents: set[AgentId] = set()
self._partition_map: dict[AgentId, int] = {}
self._failure_rng = random.Random(self._master_rng.randint(0, 2**63))
# Separate RNG for the network model so byzantine/partition draws are
# not perturbed by per-hop latency sampling.
self._network_rng = random.Random(self._master_rng.randint(0, 2**63))
self._network_model: NetworkModel = network_model or ZeroLatencyNetworkModel()
self._plugins: dict[str, Any] = plugins or {}
self._agent_plugins: dict[AgentId, dict[str, Any]] = {}

Expand Down Expand Up @@ -227,7 +261,14 @@ def add_agent(self, agent_id: AgentId, agent: StateMachineAgent) -> None:
"""
agent_rng = random.Random(self._master_rng.randint(0, 2**63))
all_ids = [aid for aid in self._agents]
transport = InMemoryTransport(agent_id, self._queue, self._clock, all_ids)
transport = InMemoryTransport(
agent_id,
self._queue,
self._clock,
all_ids,
network_model=self._network_model,
network_rng=self._network_rng,
)
self._agents[agent_id] = _AgentSlot(
agent=agent,
transport=transport,
Expand All @@ -250,17 +291,32 @@ def _init_failures(self) -> None:
if aid in self._agents:
self._partition_map[aid] = group_idx

def _should_drop(self, sender: AgentId, target: AgentId) -> bool:
def _drop_reason(self, sender: AgentId, target: AgentId) -> str | None:
"""Return a reason string if the message should be dropped, else None.

Failure-injection drop and network-partition drop are distinguished
in the trace so downstream metrics can attribute them correctly.
"""
if self._message_drop_rate > 0 and self._failure_rng.random() < self._message_drop_rate:
return True
return "failure_injection"

if self._partition_map:
s_group = self._partition_map.get(sender, -1)
t_group = self._partition_map.get(target, -2)
if s_group >= 0 and t_group >= 0 and s_group != t_group:
return True
return "partition"

return None

return False
def _should_drop(self, sender: AgentId, target: AgentId) -> bool:
"""Backwards-compatible alias for ``_drop_reason() is not None``.

Example::

if sim._should_drop(sender, target):
...
"""
return self._drop_reason(sender, target) is not None

async def run(self, max_ticks: int = 100_000, max_time: float | None = None) -> None:
"""Run the simulation until events are exhausted or limits are reached.
Expand Down Expand Up @@ -314,7 +370,8 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) ->
if target_slot is None:
continue

if self._should_drop(event.target_id, event.agent_id):
drop_reason = self._drop_reason(event.target_id, event.agent_id)
if drop_reason is not None:
self._dropped_count += 1
if self._trace:
drop_rec: dict[str, Any] = {
Expand All @@ -324,6 +381,7 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) ->
"from": str(event.target_id),
"size": len(event.payload),
"msg": event.payload.decode("utf-8", errors="replace"),
"reason": drop_reason,
}
if event.correlation_id is not None:
drop_rec["corr"] = str(event.correlation_id)
Expand Down Expand Up @@ -368,6 +426,31 @@ async def run(self, max_ticks: int = 100_000, max_time: float | None = None) ->
if self._trace:
self._trace.close()

@property
def network_model(self) -> NetworkModel:
"""The currently installed network model.

Example::

sim.network_model
"""
return self._network_model

def set_network_model(self, model: NetworkModel) -> None:
"""Install a network model after construction.

Must be called before :meth:`add_agent` for new agents to pick it
up; existing agents are also updated so it is safe to call right
before :meth:`run`.

Example::

sim.set_network_model(RealisticNetwork(...))
"""
self._network_model = model
for slot in self._agents.values():
slot.transport.set_network(model, self._network_rng)

def set_agent_plugins(self, agent_id: AgentId, overrides: dict[str, Any]) -> None:
"""Set per-agent plugin overrides (merged on top of shared plugins).

Expand Down
Loading
Loading