From e5fac9b4e91dfae3b912a38bfb4dbe21410c2f86 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 06:11:31 +0000 Subject: [PATCH 1/3] chore: add temp review body file --- .agents/review_body.md | 74 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 .agents/review_body.md diff --git a/.agents/review_body.md b/.agents/review_body.md new file mode 100644 index 0000000000..4995076903 --- /dev/null +++ b/.agents/review_body.md @@ -0,0 +1,74 @@ +## PR Review: `fix(pegboard-runner): clear terminal tunnel routes` + +### Summary + +This PR fixes a resource leak in the pegboard-runner's WebSocket-to-tunnel message handler. Previously, tunnel route authorizations (entries in `authorized_tunnel_routes`) were never removed when processing terminal tunnel messages (final HTTP responses, aborted requests, closed WebSocket connections). After this fix, terminal messages also remove the corresponding route entry, preventing stale route entries from accumulating across the lifetime of a runner connection. + +The fix is applied to both mk2 and mk1 protocol paths via new pure helper functions `should_clear_tunnel_route_mk2` and `should_clear_tunnel_route_mk1`. + +--- + +### Code Quality + +**Positive:** +- Extracting `route` and `clear_route` before consuming `msg` is correct and avoids borrow issues since `msg` is moved into the serialization call. +- `should_clear_tunnel_route_*` helpers are pure functions that clearly express intent. +- Uses `scc::HashMap` async API (`contains_async`, `remove_async`) consistently with the codebase's concurrency model. +- mk1 and mk2 paths are kept at feature parity per the Engine Runner Parity guideline. +- Commit message follows conventional commits format. + +--- + +### Issues + +#### Medium: Test helpers are dead code; clearing behavior is untested + +The test support file adds several helper constructors (`response_start_message_mk2`, `response_chunk_message_mk2`, `response_start_message_mk1`, `response_chunk_message_mk1`, etc.) that are never called in any test. The updated tests only verify that a `WebSocketMessage` (non-terminal) does **not** clear the route. There are no tests verifying the positive case that terminal messages **do** clear the route. + +Missing test cases: +- `ToServerResponseStart` with `stream: false` -> route cleared +- `ToServerResponseStart` with `stream: true` -> route **not** cleared +- `ToServerResponseChunk` with `finish: true` -> route cleared +- `ToServerResponseChunk` with `finish: false` -> route **not** cleared +- `ToServerResponseAbort` -> route cleared +- `ToServerWebSocketClose` -> route cleared +- Symmetric coverage for mk1 variants + +The existing `republishes_issued_mk*_tunnel_message_pairs` tests now only exercise the non-clearing path, so there is no test that sends a terminal message and asserts the route entry is subsequently absent. + +#### Low: Implicit fallthrough in `should_clear_tunnel_route_*` for future variants + +Both `should_clear` functions use `_ => false` as the catch-all. This means any future protocol variant added to the enum would silently default to not clearing the route. Depending on the variant this could be correct (safe default) or a bug. Explicitly enumerating all non-clearing variants or adding a comment would make the intent clear and surface a compile error if a new variant is added without deliberate handling. + +#### Low: `DeprecatedTunnelAck` early-return computes `clear_route` unnecessarily + +In `handle_tunnel_message_mk1`, `clear_route` is computed before the `DeprecatedTunnelAck` early-return check. Since `should_clear_tunnel_route_mk1` returns `false` for that variant anyway, there is no logic issue, but the value is computed and then immediately discarded. Minor ordering cleanup would eliminate the dead computation. + +--- + +### Security + +The fix is directly security-relevant: it enforces the one-request-one-response invariant at the runner level. Without this fix, a route authorization for a completed/aborted request could persist indefinitely, allowing responses to continue being forwarded after the logical request lifecycle has ended. No new concerns are introduced. + +--- + +### Performance + +No concerns. `scc::HashMap::remove_async` is O(1) and does not hold a lock across `.await` points. + +--- + +### Potential Edge Case + +Route clearing only happens after a **successful** publish. If the NATS publish call fails, the error returns before the `if clear_route` block, leaving the route in place. This is pre-existing behavior and arguably correct (caller can retry), but worth a comment in the code to make the intention explicit. + +--- + +### Summary + +| Severity | Finding | +|---|---| +| Medium | Test helpers added but never called; no tests assert terminal messages actually clear the route | +| Low | `_ => false` catchall silently handles unknown future protocol variants | +| Low | `DeprecatedTunnelAck` path computes `clear_route` before the early-return that discards it | +| Info | Failed publish leaves route in place (pre-existing; worth a comment) | From 23cd248a9df2840a700c80b92881755f4439a0a3 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 06:12:31 +0000 Subject: [PATCH 2/3] chore: remove temp review body file --- .agents/review_body.md | 74 ------------------------------------------ 1 file changed, 74 deletions(-) delete mode 100644 .agents/review_body.md diff --git a/.agents/review_body.md b/.agents/review_body.md deleted file mode 100644 index 4995076903..0000000000 --- a/.agents/review_body.md +++ /dev/null @@ -1,74 +0,0 @@ -## PR Review: `fix(pegboard-runner): clear terminal tunnel routes` - -### Summary - -This PR fixes a resource leak in the pegboard-runner's WebSocket-to-tunnel message handler. Previously, tunnel route authorizations (entries in `authorized_tunnel_routes`) were never removed when processing terminal tunnel messages (final HTTP responses, aborted requests, closed WebSocket connections). After this fix, terminal messages also remove the corresponding route entry, preventing stale route entries from accumulating across the lifetime of a runner connection. - -The fix is applied to both mk2 and mk1 protocol paths via new pure helper functions `should_clear_tunnel_route_mk2` and `should_clear_tunnel_route_mk1`. - ---- - -### Code Quality - -**Positive:** -- Extracting `route` and `clear_route` before consuming `msg` is correct and avoids borrow issues since `msg` is moved into the serialization call. -- `should_clear_tunnel_route_*` helpers are pure functions that clearly express intent. -- Uses `scc::HashMap` async API (`contains_async`, `remove_async`) consistently with the codebase's concurrency model. -- mk1 and mk2 paths are kept at feature parity per the Engine Runner Parity guideline. -- Commit message follows conventional commits format. - ---- - -### Issues - -#### Medium: Test helpers are dead code; clearing behavior is untested - -The test support file adds several helper constructors (`response_start_message_mk2`, `response_chunk_message_mk2`, `response_start_message_mk1`, `response_chunk_message_mk1`, etc.) that are never called in any test. The updated tests only verify that a `WebSocketMessage` (non-terminal) does **not** clear the route. There are no tests verifying the positive case that terminal messages **do** clear the route. - -Missing test cases: -- `ToServerResponseStart` with `stream: false` -> route cleared -- `ToServerResponseStart` with `stream: true` -> route **not** cleared -- `ToServerResponseChunk` with `finish: true` -> route cleared -- `ToServerResponseChunk` with `finish: false` -> route **not** cleared -- `ToServerResponseAbort` -> route cleared -- `ToServerWebSocketClose` -> route cleared -- Symmetric coverage for mk1 variants - -The existing `republishes_issued_mk*_tunnel_message_pairs` tests now only exercise the non-clearing path, so there is no test that sends a terminal message and asserts the route entry is subsequently absent. - -#### Low: Implicit fallthrough in `should_clear_tunnel_route_*` for future variants - -Both `should_clear` functions use `_ => false` as the catch-all. This means any future protocol variant added to the enum would silently default to not clearing the route. Depending on the variant this could be correct (safe default) or a bug. Explicitly enumerating all non-clearing variants or adding a comment would make the intent clear and surface a compile error if a new variant is added without deliberate handling. - -#### Low: `DeprecatedTunnelAck` early-return computes `clear_route` unnecessarily - -In `handle_tunnel_message_mk1`, `clear_route` is computed before the `DeprecatedTunnelAck` early-return check. Since `should_clear_tunnel_route_mk1` returns `false` for that variant anyway, there is no logic issue, but the value is computed and then immediately discarded. Minor ordering cleanup would eliminate the dead computation. - ---- - -### Security - -The fix is directly security-relevant: it enforces the one-request-one-response invariant at the runner level. Without this fix, a route authorization for a completed/aborted request could persist indefinitely, allowing responses to continue being forwarded after the logical request lifecycle has ended. No new concerns are introduced. - ---- - -### Performance - -No concerns. `scc::HashMap::remove_async` is O(1) and does not hold a lock across `.await` points. - ---- - -### Potential Edge Case - -Route clearing only happens after a **successful** publish. If the NATS publish call fails, the error returns before the `if clear_route` block, leaving the route in place. This is pre-existing behavior and arguably correct (caller can retry), but worth a comment in the code to make the intention explicit. - ---- - -### Summary - -| Severity | Finding | -|---|---| -| Medium | Test helpers added but never called; no tests assert terminal messages actually clear the route | -| Low | `_ => false` catchall silently handles unknown future protocol variants | -| Low | `DeprecatedTunnelAck` path computes `clear_route` before the early-return that discards it | -| Info | Failed publish leaves route in place (pre-existing; worth a comment) | From da0a966ff1b6c2ebe6904dafec90e0a2d137ccf4 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 12 Apr 2026 19:13:04 -0700 Subject: [PATCH 3/3] fix(pegboard-gateway): enforce tunnel message state --- engine/packages/pegboard-gateway/src/lib.rs | 20 ++- .../pegboard-gateway/src/shared_state.rs | 126 +++++++++++++++++- 2 files changed, 142 insertions(+), 4 deletions(-) diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index a340811ca2..6e0c073d44 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -27,7 +27,7 @@ use tokio_tungstenite::tungstenite::{ }; use universaldb::utils::IsolationLevel::*; -use crate::shared_state::{InFlightRequestHandle, SharedState}; +use crate::shared_state::{InFlightRequestHandle, InFlightRequestState, SharedState}; mod keepalive_task; mod metrics; @@ -178,7 +178,12 @@ impl PegboardGateway { .. } = self .shared_state - .start_in_flight_request(tunnel_subject, runner_protocol_version, request_id) + .start_in_flight_request( + tunnel_subject, + runner_protocol_version, + request_id, + InFlightRequestState::AwaitingHttpResponseStart, + ) .await; // Start request @@ -304,7 +309,16 @@ impl PegboardGateway { new, } = self .shared_state - .start_in_flight_request(tunnel_subject.clone(), runner_protocol_version, request_id) + .start_in_flight_request( + tunnel_subject.clone(), + runner_protocol_version, + request_id, + if after_hibernation { + InFlightRequestState::ActiveWebSocket + } else { + InFlightRequestState::AwaitingWebSocketOpen + }, + ) .await; ensure!( diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index f2ca560d93..d504f74961 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -26,10 +26,60 @@ pub struct InFlightRequestHandle { pub new: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InFlightRequestState { + AwaitingHttpResponseStart, + AwaitingWebSocketOpen, + ActiveWebSocket, + Closed, +} + +impl InFlightRequestState { + fn accept_message(&mut self, message_kind: &protocol::mk2::ToServerTunnelMessageKind) -> bool { + use protocol::mk2::ToServerTunnelMessageKind; + + match (self, message_kind) { + ( + state @ InFlightRequestState::AwaitingHttpResponseStart, + ToServerTunnelMessageKind::ToServerResponseStart(_) + | ToServerTunnelMessageKind::ToServerResponseAbort, + ) => { + *state = InFlightRequestState::Closed; + true + } + ( + state @ InFlightRequestState::AwaitingWebSocketOpen, + ToServerTunnelMessageKind::ToServerWebSocketOpen(_), + ) => { + *state = InFlightRequestState::ActiveWebSocket; + true + } + ( + state @ InFlightRequestState::AwaitingWebSocketOpen, + ToServerTunnelMessageKind::ToServerWebSocketClose(_), + ) + | ( + state @ InFlightRequestState::ActiveWebSocket, + ToServerTunnelMessageKind::ToServerWebSocketClose(_), + ) => { + *state = InFlightRequestState::Closed; + true + } + ( + InFlightRequestState::ActiveWebSocket, + ToServerTunnelMessageKind::ToServerWebSocketMessage(_) + | ToServerTunnelMessageKind::ToServerWebSocketMessageAck(_), + ) => true, + _ => false, + } + } +} + struct InFlightRequest { /// UPS subject to send messages to for this request. receiver_subject: String, protocol_version: u16, + state: InFlightRequestState, /// Sender for incoming messages to this request. msg_tx: mpsc::Sender, /// Used to check if the request handler has been dropped. @@ -134,6 +184,7 @@ impl SharedState { receiver_subject: String, protocol_version: u16, request_id: protocol::mk2::RequestId, + state: InFlightRequestState, ) -> InFlightRequestHandle { let (msg_tx, msg_rx) = mpsc::channel(128); let (drop_tx, drop_rx) = watch::channel(None); @@ -143,6 +194,7 @@ impl SharedState { entry.insert_entry(InFlightRequest { receiver_subject, protocol_version, + state, msg_tx, drop_tx, opened: false, @@ -159,6 +211,7 @@ impl SharedState { entry.receiver_subject = receiver_subject; entry.msg_tx = msg_tx; entry.drop_tx = drop_tx; + entry.state = state; entry.opened = false; entry.last_pong = util::timestamp::now(); @@ -355,7 +408,7 @@ impl SharedState { Ok(protocol::mk2::ToGateway::ToServerTunnelMessage(msg)) => { let message_id = msg.message_id; - let Some(in_flight) = self + let Some(mut in_flight) = self .in_flight_requests .get_async(&message_id.request_id) .await @@ -369,6 +422,18 @@ impl SharedState { continue; }; + if !in_flight.state.accept_message(&msg.message_kind) { + tracing::warn!( + gateway_id=%protocol::util::id_to_string(&message_id.gateway_id), + request_id=%protocol::util::id_to_string(&message_id.request_id), + message_index=message_id.message_index, + state=?in_flight.state, + message_kind=?msg.message_kind, + "dropping invalid tunnel message for request state" + ); + continue; + } + // Send message to the request handler to emulate the real network action let inner_size = match &msg.message_kind { protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketMessage( @@ -619,6 +684,65 @@ fn wrapping_gt(a: u16, b: u16) -> bool { a != b && a.wrapping_sub(b) < u16::MAX / 2 } +#[cfg(test)] +mod tests { + use super::InFlightRequestState; + use rivet_runner_protocol as protocol; + + #[test] + fn http_requests_only_accept_http_terminal_messages() { + let mut state = InFlightRequestState::AwaitingHttpResponseStart; + assert!( + state.accept_message(&protocol::mk2::ToServerTunnelMessageKind::ToServerResponseAbort,) + ); + assert_eq!(state, InFlightRequestState::Closed); + + let mut state = InFlightRequestState::AwaitingHttpResponseStart; + assert!(!state.accept_message( + &protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketMessage( + protocol::mk2::ToServerWebSocketMessage { + data: Vec::new(), + binary: false, + }, + ), + )); + assert_eq!(state, InFlightRequestState::AwaitingHttpResponseStart); + } + + #[test] + fn websockets_must_open_before_streaming() { + let mut state = InFlightRequestState::AwaitingWebSocketOpen; + assert!(!state.accept_message( + &protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketMessage( + protocol::mk2::ToServerWebSocketMessage { + data: Vec::new(), + binary: false, + }, + ), + )); + assert_eq!(state, InFlightRequestState::AwaitingWebSocketOpen); + + assert!(state.accept_message( + &protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketOpen( + protocol::mk2::ToServerWebSocketOpen { + can_hibernate: false, + }, + ), + )); + assert_eq!(state, InFlightRequestState::ActiveWebSocket); + } + + #[test] + fn active_websockets_reject_http_messages() { + let mut state = InFlightRequestState::ActiveWebSocket; + assert!( + !state + .accept_message(&protocol::mk2::ToServerTunnelMessageKind::ToServerResponseAbort,) + ); + assert_eq!(state, InFlightRequestState::ActiveWebSocket); + } +} + // fn wrapping_lt(a: u16, b: u16) -> bool { // b.wrapping_sub(a) < u16::MAX / 2 // }