From 86945b82dc64ee00481550b259137adfaacbd953 Mon Sep 17 00:00:00 2001 From: modelsbridgeaicom-ship-it <2496049374@qq.com> Date: Sat, 6 Jun 2026 08:31:02 +0800 Subject: [PATCH] Add ZAP wire v1 session semantics --- docs/zap-wire-spec-v1.0.md | 128 +++++++++ switchboard/zap_transport.py | 268 +++++++++++++++++- tests/conformance/go.mod | 3 + tests/conformance/zap_wire_v1_vectors.json | 34 +++ tests/conformance/zap_wire_v1_vectors_test.go | 74 +++++ tests/test_zap_wire_session.py | 130 +++++++++ 6 files changed, 636 insertions(+), 1 deletion(-) create mode 100644 docs/zap-wire-spec-v1.0.md create mode 100644 tests/conformance/go.mod create mode 100644 tests/conformance/zap_wire_v1_vectors.json create mode 100644 tests/conformance/zap_wire_v1_vectors_test.go create mode 100644 tests/test_zap_wire_session.py diff --git a/docs/zap-wire-spec-v1.0.md b/docs/zap-wire-spec-v1.0.md new file mode 100644 index 0000000..45b0b9c --- /dev/null +++ b/docs/zap-wire-spec-v1.0.md @@ -0,0 +1,128 @@ +# ZAP Wire v1.0 + +Status: v1.0 reference contract for switchboard peer-to-peer ZAP sessions. + +This document defines the connection-level semantics that sit around the +existing ZAP `PaymentOffer` and `PaymentProof` payload codecs in +`switchboard/zap_transport.py`. The payload codec still owns the payment schema. +ZAP Wire v1.0 owns session open, sequencing, ACK, retry, idempotency, and close. + +## Goals + +- Make session startup deterministic across Python and Go implementations. +- Add explicit sequence numbers so dropped frames and duplicates are observable. +- Keep the wire envelope small and independent from `zap_py`, HTTP, JSON, or a + specific transport such as TCP, QUIC, or a message queue. +- Preserve the existing ZAP payload bytes unchanged inside DATA frames. + +## Frame Layout + +All multi-byte integers are big-endian. + +| Offset | Size | Field | Notes | +| --- | ---: | --- | --- | +| 0 | 4 | magic | ASCII `ZAP!` | +| 4 | 1 | version | `0x01` for this spec | +| 5 | 1 | frame_type | See frame type table | +| 6 | 2 | capabilities | Negotiated bitmask on HELLO/HELLO_ACK; zero otherwise | +| 8 | 8 | seq | Unsigned sequence number | +| 16 | 4 | payload_len | Length in bytes, maximum 16 KiB | +| 20 | N | payload | Empty for HELLO/HELLO_ACK/FIN unless noted | + +Frame type values: + +| Value | Name | Payload | +| ---: | --- | --- | +| `0x01` | `HELLO` | Empty | +| `0x02` | `HELLO_ACK` | Empty | +| `0x03` | `DATA` | Existing ZAP payload bytes | +| `0x04` | `ACK` | Four-byte big-endian sequence echo | +| `0x05` | `FIN` | Empty | +| `0x06` | `ERROR` | UTF-8 diagnostic string | + +Capability bits: + +| Bit | Name | Meaning | +| ---: | --- | --- | +| `0x0001` | `PQ_ENVELOPE` | Payment payloads may carry post-quantum signature fields | +| `0x0002` | `NESTED_X402` | DATA payloads may carry nested x402 envelopes | +| `0x0004` | `MPP_SESSION` | Peers may use multi-party payment session metadata | + +## Handshake + +The initiator sends a `HELLO` frame with `seq = 0` and its supported capability +bitmask. The responder intersects those bits with its local supported bitmask +and returns `HELLO_ACK` with `seq = 0`. + +If either peer receives an unsupported version, invalid magic, unknown frame +type, unknown capability, or oversized payload, it must reject the session before +processing the payload. + +## Sequence Semantics + +DATA frames use monotonically increasing unsigned 64-bit sequence numbers, +starting at `1` for the first application payload. Receivers track the highest +accepted sequence number for the session: + +- `seq > highest_seen_seq`: accept the frame, update `highest_seen_seq`, and ACK. +- `seq <= highest_seen_seq`: treat as duplicate or stale, do not reprocess the + payload, and ACK the received sequence. + +The ACK frame payload is a four-byte big-endian echo of the accepted or duplicate +sequence. v1.0 therefore requires live ACKed sequences to fit `uint32`; higher +sequence ranges are reserved for a future v1.1 ACK extension. + +## Retry Policy + +Senders retain each unacknowledged DATA frame until an ACK arrives or retry +attempts are exhausted. The reference policy is: + +- first retry timeout: 200 ms; +- timeout for attempt `n`: `min(5000 ms, 200 ms * 2^n)`; +- maximum retry attempts: 5; +- after the final failed retry, drop the session and surface the unacknowledged + sequence to the application for resubmit. + +The timer source is intentionally transport-specific. The reference Python state +machine returns the next frame and delay but does not sleep. + +## Idempotency + +Every `PaymentOffer` should carry an application request id. In the current +Python model this is the `PaymentOffer.nonce` field. Receivers maintain a +per-session set of request ids: + +- first occurrence: process normally and ACK; +- duplicate request id: do not process the payment twice, but ACK the frame so + the sender can stop retrying. + +This rule is independent from the sequence check. A retry can arrive with the +same sequence and same request id; a transport replay can arrive with a new +sequence but an already-seen request id. + +## Close + +Either side may send `FIN` to close the session. On receiving `FIN`, the peer +marks the session closed and surfaces the sorted list of still-unacknowledged +outbound sequence numbers. The application can resubmit those payloads on a new +session if they are still valid and idempotent. + +## Reference Vectors + +The reference conformance vectors live in +`tests/conformance/zap_wire_v1_vectors.json`. Implementations in Go or any other +language should decode those exact bytes, verify the header fields, and re-encode +to the same lowercase hexadecimal strings before attempting live interop. + +The repository also includes a standalone Go vector consumer in +`tests/conformance/zap_wire_v1_vectors_test.go`; run it from the repository root +with `cd tests/conformance && go test .`. + +The Python reference tests cover: + +- HELLO / HELLO_ACK capability intersection; +- DATA sequence acceptance and duplicate ACK; +- request-id idempotency; +- exponential retry calculation and attempt cap; +- FIN orphaned-sequence surfacing; +- static byte-for-byte frame vectors. diff --git a/switchboard/zap_transport.py b/switchboard/zap_transport.py index acaba01..e8c4d22 100644 --- a/switchboard/zap_transport.py +++ b/switchboard/zap_transport.py @@ -31,7 +31,9 @@ from __future__ import annotations -from dataclasses import replace +import struct +from dataclasses import dataclass, field, replace +from enum import IntEnum, IntFlag from .x402_middleware import PaymentOffer, PaymentProof, PaymentScheme @@ -48,6 +50,15 @@ "ZapNotAvailable", "OFFER_SCHEMA", "PROOF_SCHEMA", + "ZAP_WIRE_MAGIC", + "ZAP_WIRE_VERSION", + "ZapRetryPolicy", + "ZapWireCapability", + "ZapWireFrame", + "ZapWireFrameType", + "ZapWireReceiveResult", + "ZapWireSession", + "ZapWireSessionError", "encode_offer", "decode_offer", "encode_proof", @@ -60,6 +71,10 @@ class ZapNotAvailable(RuntimeError): """zap_py is not installed; ZAP transport is unavailable.""" +class ZapWireSessionError(ValueError): + """Raised when a ZAP v1.0 session frame violates the wire contract.""" + + # ─── Wire constants ────────────────────────────────────────────────────────── # # Both schemas use uint256 amount-as-bytes (32 big-endian bytes) so we don't truncate @@ -88,6 +103,257 @@ class ZapNotAvailable(RuntimeError): } _TAG_TO_PQ_ALG = {v: k for k, v in _PQ_ALG_TO_TAG.items()} +ZAP_WIRE_MAGIC = b"ZAP!" +ZAP_WIRE_VERSION = 1 +_ZAP_WIRE_HEADER = struct.Struct(">4sBBHQI") +_ACK_ECHO = struct.Struct(">I") +_MAX_PAYLOAD_LEN = 16 * 1024 + + +class ZapWireFrameType(IntEnum): + """Connection-level frame types for the ZAP v1.0 session layer.""" + + HELLO = 0x01 + HELLO_ACK = 0x02 + DATA = 0x03 + ACK = 0x04 + FIN = 0x05 + ERROR = 0x06 + + +class ZapWireCapability(IntFlag): + """Capability bits negotiated during the ZAP v1.0 handshake.""" + + NONE = 0 + PQ_ENVELOPE = 1 << 0 + NESTED_X402 = 1 << 1 + MPP_SESSION = 1 << 2 + + +_KNOWN_CAPABILITIES = ( + int(ZapWireCapability.PQ_ENVELOPE) + | int(ZapWireCapability.NESTED_X402) + | int(ZapWireCapability.MPP_SESSION) +) + + +def _check_u64(name: str, value: int) -> None: + if value < 0 or value > 0xFFFFFFFFFFFFFFFF: + raise ZapWireSessionError(f"{name} must fit uint64") + + +def _check_u16(name: str, value: int) -> None: + if value < 0 or value > 0xFFFF: + raise ZapWireSessionError(f"{name} must fit uint16") + + +def _check_payload(payload: bytes) -> None: + if len(payload) > _MAX_PAYLOAD_LEN: + raise ZapWireSessionError("ZAP v1.0 frame payload exceeds 16 KiB") + + +@dataclass(frozen=True) +class ZapWireFrame: + """A ZAP v1.0 connection-level frame. + + The frame wraps existing ZAP payload bytes and adds session semantics: + magic/version negotiation, explicit sequence numbers, ACK/FIN control + frames, and a negotiated capability bitmask. + """ + + frame_type: ZapWireFrameType + seq: int = 0 + payload: bytes = b"" + capabilities: ZapWireCapability = ZapWireCapability.NONE + version: int = ZAP_WIRE_VERSION + + def encode(self) -> bytes: + _check_u64("seq", self.seq) + _check_u16("capabilities", int(self.capabilities)) + _check_payload(self.payload) + header = _ZAP_WIRE_HEADER.pack( + ZAP_WIRE_MAGIC, + self.version, + int(self.frame_type), + int(self.capabilities), + self.seq, + len(self.payload), + ) + return header + self.payload + + @classmethod + def decode(cls, wire: bytes) -> "ZapWireFrame": + if len(wire) < _ZAP_WIRE_HEADER.size: + raise ZapWireSessionError("ZAP v1.0 frame is shorter than the header") + magic, version, frame_type, capabilities, seq, payload_len = _ZAP_WIRE_HEADER.unpack( + wire[: _ZAP_WIRE_HEADER.size] + ) + if magic != ZAP_WIRE_MAGIC: + raise ZapWireSessionError("invalid ZAP v1.0 magic") + if version != ZAP_WIRE_VERSION: + raise ZapWireSessionError(f"unsupported ZAP v1.0 version {version}") + if payload_len > _MAX_PAYLOAD_LEN: + raise ZapWireSessionError("ZAP v1.0 frame payload exceeds 16 KiB") + expected_len = _ZAP_WIRE_HEADER.size + payload_len + if len(wire) != expected_len: + raise ZapWireSessionError( + f"ZAP v1.0 frame length mismatch: expected {expected_len}, got {len(wire)}" + ) + if capabilities & ~_KNOWN_CAPABILITIES: + raise ZapWireSessionError("unknown ZAP v1.0 capability bit") + try: + parsed_type = ZapWireFrameType(frame_type) + parsed_capabilities = ZapWireCapability(capabilities) + except ValueError as exc: + raise ZapWireSessionError("unknown ZAP v1.0 frame type or capability") from exc + return cls( + frame_type=parsed_type, + seq=seq, + payload=wire[_ZAP_WIRE_HEADER.size :], + capabilities=parsed_capabilities, + version=version, + ) + + @classmethod + def hello(cls, capabilities: ZapWireCapability, seq: int = 0) -> "ZapWireFrame": + return cls(ZapWireFrameType.HELLO, seq=seq, capabilities=capabilities) + + @classmethod + def hello_ack(cls, capabilities: ZapWireCapability, seq: int = 0) -> "ZapWireFrame": + return cls(ZapWireFrameType.HELLO_ACK, seq=seq, capabilities=capabilities) + + @classmethod + def ack(cls, acked_seq: int, seq: int = 0) -> "ZapWireFrame": + if acked_seq < 0 or acked_seq > 0xFFFFFFFF: + raise ZapWireSessionError("ACK echo must fit uint32") + return cls(ZapWireFrameType.ACK, seq=seq, payload=_ACK_ECHO.pack(acked_seq)) + + def acked_seq(self) -> int: + if self.frame_type != ZapWireFrameType.ACK or len(self.payload) != _ACK_ECHO.size: + raise ZapWireSessionError("frame is not a valid ZAP v1.0 ACK") + return _ACK_ECHO.unpack(self.payload)[0] + + +@dataclass(frozen=True) +class ZapRetryPolicy: + """Retry timing for unacknowledged ZAP v1.0 DATA frames.""" + + initial_timeout_ms: int = 200 + max_timeout_ms: int = 5_000 + max_attempts: int = 5 + + def timeout_ms_for_attempt(self, attempt: int) -> int: + if attempt < 0: + raise ValueError("attempt must be non-negative") + return min(self.max_timeout_ms, self.initial_timeout_ms * (2**attempt)) + + +@dataclass +class _PendingZapFrame: + frame: ZapWireFrame + request_id: str | None = None + attempts: int = 0 + + +@dataclass(frozen=True) +class ZapWireReceiveResult: + """Result of accepting or rejecting an inbound DATA frame.""" + + status: str + ack: ZapWireFrame | None = None + payload: bytes = b"" + request_id: str | None = None + + +@dataclass +class ZapWireSession: + """Minimal state machine for ZAP v1.0 peer-to-peer session semantics.""" + + capabilities: ZapWireCapability = ( + ZapWireCapability.PQ_ENVELOPE + | ZapWireCapability.NESTED_X402 + | ZapWireCapability.MPP_SESSION + ) + retry_policy: ZapRetryPolicy = field(default_factory=ZapRetryPolicy) + next_seq: int = 1 + highest_seen_seq: int = 0 + peer_capabilities: ZapWireCapability = ZapWireCapability.NONE + closed: bool = False + _pending: dict[int, _PendingZapFrame] = field(default_factory=dict) + _seen_request_ids: set[str] = field(default_factory=set) + + def make_hello(self) -> ZapWireFrame: + return ZapWireFrame.hello(self.capabilities) + + def accept_hello(self, frame: ZapWireFrame) -> ZapWireFrame: + if frame.frame_type not in {ZapWireFrameType.HELLO, ZapWireFrameType.HELLO_ACK}: + raise ZapWireSessionError("expected HELLO or HELLO_ACK frame") + self.peer_capabilities = self.capabilities & frame.capabilities + if frame.frame_type == ZapWireFrameType.HELLO: + return ZapWireFrame.hello_ack(self.peer_capabilities) + return ZapWireFrame.ack(frame.seq) + + def make_data(self, payload: bytes, request_id: str | None = None) -> ZapWireFrame: + if self.closed: + raise ZapWireSessionError("cannot send DATA on a closed ZAP session") + seq = self.next_seq + self.next_seq += 1 + frame = ZapWireFrame(ZapWireFrameType.DATA, seq=seq, payload=payload) + self._pending[seq] = _PendingZapFrame(frame=frame, request_id=request_id) + return frame + + def receive_data( + self, frame: ZapWireFrame, request_id: str | None = None + ) -> ZapWireReceiveResult: + if frame.frame_type != ZapWireFrameType.DATA: + raise ZapWireSessionError("expected DATA frame") + if frame.seq <= self.highest_seen_seq or ( + request_id is not None and request_id in self._seen_request_ids + ): + return ZapWireReceiveResult( + status="duplicate", + ack=ZapWireFrame.ack(frame.seq), + payload=frame.payload, + request_id=request_id, + ) + self.highest_seen_seq = frame.seq + if request_id is not None: + self._seen_request_ids.add(request_id) + return ZapWireReceiveResult( + status="accepted", + ack=ZapWireFrame.ack(frame.seq), + payload=frame.payload, + request_id=request_id, + ) + + def receive_ack(self, frame: ZapWireFrame) -> int: + acked_seq = frame.acked_seq() + self._pending.pop(acked_seq, None) + return acked_seq + + def next_retry(self, seq: int) -> tuple[ZapWireFrame, int] | None: + pending = self._pending.get(seq) + if pending is None: + return None + if pending.attempts >= self.retry_policy.max_attempts: + self._pending.pop(seq, None) + raise ZapWireSessionError(f"ZAP frame {seq} exceeded retry attempts") + timeout_ms = self.retry_policy.timeout_ms_for_attempt(pending.attempts) + pending.attempts += 1 + return pending.frame, timeout_ms + + def make_fin(self) -> ZapWireFrame: + self.closed = True + seq = self.next_seq + self.next_seq += 1 + return ZapWireFrame(ZapWireFrameType.FIN, seq=seq) + + def receive_fin(self, frame: ZapWireFrame) -> list[int]: + if frame.frame_type != ZapWireFrameType.FIN: + raise ZapWireSessionError("expected FIN frame") + self.closed = True + return sorted(self._pending) + def _build_offer_schema(): if not HAS_ZAP_PY: diff --git a/tests/conformance/go.mod b/tests/conformance/go.mod new file mode 100644 index 0000000..96416d4 --- /dev/null +++ b/tests/conformance/go.mod @@ -0,0 +1,3 @@ +module switchboard-zap-wire-v1-conformance + +go 1.22 diff --git a/tests/conformance/zap_wire_v1_vectors.json b/tests/conformance/zap_wire_v1_vectors.json new file mode 100644 index 0000000..3898639 --- /dev/null +++ b/tests/conformance/zap_wire_v1_vectors.json @@ -0,0 +1,34 @@ +[ + { + "name": "hello-all-capabilities", + "frame_type": "HELLO", + "seq": 0, + "capabilities": 7, + "payload_hex": "", + "wire_hex": "5a41502101010007000000000000000000000000" + }, + { + "name": "data-seq-one", + "frame_type": "DATA", + "seq": 1, + "capabilities": 0, + "payload_hex": "010203", + "wire_hex": "5a41502101030000000000000000000100000003010203" + }, + { + "name": "ack-seq-one", + "frame_type": "ACK", + "seq": 0, + "capabilities": 0, + "payload_hex": "00000001", + "wire_hex": "5a4150210104000000000000000000000000000400000001" + }, + { + "name": "fin-seq-two", + "frame_type": "FIN", + "seq": 2, + "capabilities": 0, + "payload_hex": "", + "wire_hex": "5a41502101050000000000000000000200000000" + } +] diff --git a/tests/conformance/zap_wire_v1_vectors_test.go b/tests/conformance/zap_wire_v1_vectors_test.go new file mode 100644 index 0000000..d9d5dd9 --- /dev/null +++ b/tests/conformance/zap_wire_v1_vectors_test.go @@ -0,0 +1,74 @@ +package conformance + +import ( + "encoding/binary" + "encoding/hex" + "encoding/json" + "os" + "testing" +) + +type vector struct { + Name string `json:"name"` + FrameType string `json:"frame_type"` + Seq uint64 `json:"seq"` + Capabilities uint16 `json:"capabilities"` + PayloadHex string `json:"payload_hex"` + WireHex string `json:"wire_hex"` +} + +var frameTypes = map[string]byte{ + "HELLO": 0x01, + "HELLO_ACK": 0x02, + "DATA": 0x03, + "ACK": 0x04, + "FIN": 0x05, + "ERROR": 0x06, +} + +func encodeVector(v vector) ([]byte, error) { + payload, err := hex.DecodeString(v.PayloadHex) + if err != nil { + return nil, err + } + wire := make([]byte, 20+len(payload)) + copy(wire[0:4], []byte("ZAP!")) + wire[4] = 0x01 + wire[5] = frameTypes[v.FrameType] + binary.BigEndian.PutUint16(wire[6:8], v.Capabilities) + binary.BigEndian.PutUint64(wire[8:16], v.Seq) + binary.BigEndian.PutUint32(wire[16:20], uint32(len(payload))) + copy(wire[20:], payload) + return wire, nil +} + +func TestZapWireV1Vectors(t *testing.T) { + body, err := os.ReadFile("zap_wire_v1_vectors.json") + if err != nil { + t.Fatal(err) + } + var vectors []vector + if err := json.Unmarshal(body, &vectors); err != nil { + t.Fatal(err) + } + for _, v := range vectors { + t.Run(v.Name, func(t *testing.T) { + wire, err := encodeVector(v) + if err != nil { + t.Fatal(err) + } + if got := hex.EncodeToString(wire); got != v.WireHex { + t.Fatalf("wire mismatch\nwant %s\n got %s", v.WireHex, got) + } + if string(wire[0:4]) != "ZAP!" { + t.Fatal("invalid magic") + } + if wire[4] != 0x01 { + t.Fatalf("unexpected version %d", wire[4]) + } + if binary.BigEndian.Uint64(wire[8:16]) != v.Seq { + t.Fatal("sequence mismatch") + } + }) + } +} diff --git a/tests/test_zap_wire_session.py b/tests/test_zap_wire_session.py new file mode 100644 index 0000000..cf7b856 --- /dev/null +++ b/tests/test_zap_wire_session.py @@ -0,0 +1,130 @@ +"""Tests for ZAP v1.0 connection-level session semantics.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from switchboard import zap_transport as zt + + +def test_wire_frame_roundtrips_without_zap_py(): + frame = zt.ZapWireFrame( + zt.ZapWireFrameType.DATA, + seq=7, + payload=b"offer-wire", + capabilities=zt.ZapWireCapability.NESTED_X402, + ) + + out = zt.ZapWireFrame.decode(frame.encode()) + + assert out == frame + + +def test_wire_frame_rejects_bad_magic_and_payload_size(): + frame = zt.ZapWireFrame(zt.ZapWireFrameType.HELLO) + wire = bytearray(frame.encode()) + wire[0:4] = b"NOPE" + + with pytest.raises(zt.ZapWireSessionError, match="magic"): + zt.ZapWireFrame.decode(bytes(wire)) + + with pytest.raises(zt.ZapWireSessionError, match="16 KiB"): + zt.ZapWireFrame(zt.ZapWireFrameType.DATA, payload=b"x" * (16 * 1024 + 1)).encode() + + +def test_wire_frame_rejects_unknown_capability_bits(): + wire = bytearray(zt.ZapWireFrame.hello(zt.ZapWireCapability.PQ_ENVELOPE).encode()) + wire[6:8] = (0x8000).to_bytes(2, "big") + + with pytest.raises(zt.ZapWireSessionError, match="capability"): + zt.ZapWireFrame.decode(bytes(wire)) + + +def test_handshake_negotiates_capability_intersection(): + client = zt.ZapWireSession( + capabilities=zt.ZapWireCapability.PQ_ENVELOPE | zt.ZapWireCapability.NESTED_X402 + ) + server = zt.ZapWireSession( + capabilities=zt.ZapWireCapability.NESTED_X402 | zt.ZapWireCapability.MPP_SESSION + ) + + hello_ack = server.accept_hello(client.make_hello()) + final_ack = client.accept_hello(hello_ack) + + assert hello_ack.frame_type == zt.ZapWireFrameType.HELLO_ACK + assert hello_ack.capabilities == zt.ZapWireCapability.NESTED_X402 + assert client.peer_capabilities == zt.ZapWireCapability.NESTED_X402 + assert server.peer_capabilities == zt.ZapWireCapability.NESTED_X402 + assert final_ack.frame_type == zt.ZapWireFrameType.ACK + + +def test_data_ack_and_request_id_duplicate_handling(): + sender = zt.ZapWireSession() + receiver = zt.ZapWireSession() + + data = sender.make_data(b"payment-offer-wire", request_id="req-1") + first = receiver.receive_data(data, request_id="req-1") + duplicate = receiver.receive_data(data, request_id="req-1") + + assert first.status == "accepted" + assert first.payload == b"payment-offer-wire" + assert first.ack is not None + assert sender.receive_ack(first.ack) == data.seq + assert sender.next_retry(data.seq) is None + + assert duplicate.status == "duplicate" + assert duplicate.ack is not None + assert duplicate.ack.acked_seq() == data.seq + + +def test_retry_policy_backoff_and_attempt_cap(): + session = zt.ZapWireSession( + retry_policy=zt.ZapRetryPolicy(initial_timeout_ms=100, max_timeout_ms=250, max_attempts=3) + ) + data = session.make_data(b"payment-proof-wire", request_id="req-2") + + assert session.next_retry(data.seq) == (data, 100) + assert session.next_retry(data.seq) == (data, 200) + assert session.next_retry(data.seq) == (data, 250) + + with pytest.raises(zt.ZapWireSessionError, match="exceeded retry"): + session.next_retry(data.seq) + + +def test_fin_surfaces_unacked_sequences(): + session = zt.ZapWireSession() + first = session.make_data(b"one") + second = session.make_data(b"two") + + orphaned = session.receive_fin(zt.ZapWireFrame(zt.ZapWireFrameType.FIN, seq=99)) + + assert orphaned == [first.seq, second.seq] + assert session.closed is True + + +def test_static_zap_wire_v1_conformance_vectors(): + vector_path = Path(__file__).parent / "conformance" / "zap_wire_v1_vectors.json" + vectors = json.loads(vector_path.read_text()) + + for vector in vectors: + frame = zt.ZapWireFrame( + frame_type=zt.ZapWireFrameType[vector["frame_type"]], + seq=vector["seq"], + capabilities=zt.ZapWireCapability(vector["capabilities"]), + payload=bytes.fromhex(vector["payload_hex"]), + ) + assert frame.encode().hex() == vector["wire_hex"], vector["name"] + assert zt.ZapWireFrame.decode(bytes.fromhex(vector["wire_hex"])) == frame + + +def test_ack_payload_is_four_byte_sequence_echo(): + ack = zt.ZapWireFrame.ack(0x01020304) + + assert ack.payload.hex() == "01020304" + assert ack.acked_seq() == 0x01020304 + + with pytest.raises(zt.ZapWireSessionError, match="uint32"): + zt.ZapWireFrame.ack(0x1_0000_0000)