diff --git a/src/core/backend/backend-connector.adapter-selection.test.ts b/src/core/backend/backend-connector.adapter-selection.test.ts index 2328cb4..888cf84 100644 --- a/src/core/backend/backend-connector.adapter-selection.test.ts +++ b/src/core/backend/backend-connector.adapter-selection.test.ts @@ -104,6 +104,7 @@ describe("BackendConnector per-session adapter", () => { metrics: null, broadcaster: { broadcast: vi.fn(), sendTo: vi.fn() } as any, routeUnifiedMessage: vi.fn(), + routeSystemSignal: vi.fn(), emitEvent: vi.fn(), getRuntime: (session: any) => { if (!runtimeCache.has(session)) { @@ -311,6 +312,7 @@ describe("BackendConnector per-session adapter", () => { metrics: null, broadcaster: { broadcast: vi.fn(), sendTo: vi.fn() } as any, routeUnifiedMessage: vi.fn(), + routeSystemSignal: vi.fn(), emitEvent: vi.fn(), getRuntime: () => mockRuntime as any, }); diff --git a/src/core/backend/backend-connector.failure-injection.test.ts b/src/core/backend/backend-connector.failure-injection.test.ts index eed02c0..e709933 100644 --- a/src/core/backend/backend-connector.failure-injection.test.ts +++ b/src/core/backend/backend-connector.failure-injection.test.ts @@ -76,6 +76,7 @@ function buildConnectorDeps( emitEvent = vi.fn(), broadcaster = { broadcast: vi.fn(), broadcastToParticipants: vi.fn(), sendTo: vi.fn() } as any, ) { + const routeSystemSignal = vi.fn(); const manager = new BackendConnector({ adapter, adapterResolver: null, @@ -83,10 +84,11 @@ function buildConnectorDeps( metrics: null, broadcaster, routeUnifiedMessage: vi.fn(), + routeSystemSignal, emitEvent, getRuntime: () => createMockRuntime(session), }); - return { manager, emitEvent, broadcaster }; + return { manager, emitEvent, broadcaster, routeSystemSignal }; } async function waitForAssertion(assertFn: () => void, timeoutMs = 500): Promise { @@ -113,17 +115,22 @@ describe("BackendConnector failure injection", () => { } as any; const session = createSession("sess-fi"); - const { manager } = buildConnectorDeps(adapter, session, emitEvent, broadcaster); + const { manager, routeSystemSignal } = buildConnectorDeps( + adapter, + session, + emitEvent, + broadcaster, + ); await manager.connectBackend(session); adapter.failStream("sess-fi", new Error("Injected stream failure")); await waitForAssertion(() => { - expect(emitEvent).toHaveBeenCalledWith( - "backend:disconnected", - expect.objectContaining({ sessionId: "sess-fi" }), - ); + expect(routeSystemSignal).toHaveBeenCalledWith(session, { + kind: "BACKEND_DISCONNECTED", + reason: "stream ended", + }); }); expect(emitEvent).toHaveBeenCalledWith( @@ -133,7 +140,6 @@ describe("BackendConnector failure injection", () => { sessionId: "sess-fi", }), ); - expect(broadcaster.broadcast).toHaveBeenCalledWith(session, { type: "cli_disconnected" }); }); it("drains pending passthroughs with slash_command_error when stream fails (lines 594-605)", async () => { @@ -145,7 +151,12 @@ describe("BackendConnector failure injection", () => { sendTo: vi.fn(), } as any; const session = createSession("sess-drain-fail"); - const { manager } = buildConnectorDeps(adapter, session, emitEvent, broadcaster); + const { manager, routeSystemSignal } = buildConnectorDeps( + adapter, + session, + emitEvent, + broadcaster, + ); // Pre-populate pending passthrough entries session.pendingPassthroughs.push(makePassthrough("/compact", "req-compact")); @@ -154,16 +165,15 @@ describe("BackendConnector failure injection", () => { adapter.failStream("sess-drain-fail", new Error("Backend crashed")); await waitForAssertion(() => { - expect(broadcaster.broadcast).toHaveBeenCalledWith( + expect(routeSystemSignal).toHaveBeenCalledWith( session, - expect.objectContaining({ type: "slash_command_error", command: "/compact" }), + expect.objectContaining({ + kind: "SLASH_PASSTHROUGH_ERROR", + command: "/compact", + error: "Backend crashed", + }), ); }); - - expect(emitEvent).toHaveBeenCalledWith( - "slash_command:failed", - expect.objectContaining({ command: "/compact", error: "Backend crashed" }), - ); }); it("drains pending passthroughs with slash_command_error when stream ends unexpectedly (lines 619-630)", async () => { @@ -175,7 +185,12 @@ describe("BackendConnector failure injection", () => { sendTo: vi.fn(), } as any; const session = createSession("sess-drain-end"); - const { manager } = buildConnectorDeps(adapter, session, emitEvent, broadcaster); + const { manager, routeSystemSignal } = buildConnectorDeps( + adapter, + session, + emitEvent, + broadcaster, + ); session.pendingPassthroughs.push(makePassthrough("/status", "req-status")); @@ -183,19 +198,14 @@ describe("BackendConnector failure injection", () => { adapter.endStream("sess-drain-end"); await waitForAssertion(() => { - expect(broadcaster.broadcast).toHaveBeenCalledWith( + expect(routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ - type: "slash_command_error", + kind: "SLASH_PASSTHROUGH_ERROR", command: "/status", error: "Backend stream ended unexpectedly", }), ); }); - - expect(emitEvent).toHaveBeenCalledWith( - "slash_command:failed", - expect.objectContaining({ command: "/status", error: "Backend stream ended unexpectedly" }), - ); }); }); diff --git a/src/core/backend/backend-connector.lifecycle.test.ts b/src/core/backend/backend-connector.lifecycle.test.ts index 3b9173f..5a33738 100644 --- a/src/core/backend/backend-connector.lifecycle.test.ts +++ b/src/core/backend/backend-connector.lifecycle.test.ts @@ -237,6 +237,7 @@ function createDeps(overrides?: Partial): BackendConnector broadcastToParticipants: vi.fn(), } as any, routeUnifiedMessage: vi.fn(), + routeSystemSignal: vi.fn(), emitEvent: vi.fn(), getRuntime: (session) => { if (!runtimeCache.has(session)) { @@ -278,8 +279,7 @@ describe("BackendConnector", () => { await mgr.connectBackend(session); expect(session.backendSession).not.toBeNull(); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith(session, { type: "cli_connected" }); - expect(deps.emitEvent).toHaveBeenCalledWith("backend:connected", { sessionId: "sess-1" }); + expect(deps.routeSystemSignal).toHaveBeenCalledWith(session, { kind: "BACKEND_CONNECTED" }); }); it("closes existing backend session on reconnect", async () => { @@ -355,9 +355,9 @@ describe("BackendConnector", () => { } as CLIMessage); expect(result).toBe(true); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, - expect.objectContaining({ type: "slash_command_result", content: "echo result" }), + expect.objectContaining({ kind: "SLASH_PASSTHROUGH_RESULT", content: "echo result" }), ); expect(session.pendingPassthroughs).toHaveLength(0); }); @@ -492,8 +492,11 @@ describe("BackendConnector", () => { await mgr.disconnectBackend(session); - // Should not broadcast or emit events for disconnection - expect(deps.emitEvent).not.toHaveBeenCalledWith("backend:disconnected", expect.anything()); + // Should not route signals for disconnection + expect(deps.routeSystemSignal).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ kind: "BACKEND_DISCONNECTED" }), + ); }); it("records metrics when disconnecting", async () => { @@ -605,12 +608,12 @@ describe("BackendConnector", () => { await tick(); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ - type: "slash_command_result", + kind: "SLASH_PASSTHROUGH_RESULT", command: "/context", - request_id: "req-ctx", + requestId: "req-ctx", content: "Context: 23% used", source: "cli", }), @@ -641,12 +644,12 @@ describe("BackendConnector", () => { await tick(); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ - type: "slash_command_result", + kind: "SLASH_PASSTHROUGH_RESULT", command: "/context", - request_id: "req-ctx", + requestId: "req-ctx", content: "Context summary line", source: "cli", }), @@ -689,12 +692,12 @@ describe("BackendConnector", () => { await tick(); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ - type: "slash_command_result", + kind: "SLASH_PASSTHROUGH_RESULT", command: "/context", - request_id: "req-ctx", + requestId: "req-ctx", content: "Context Usage\nTokens: 43.5k / 200k (22%)", source: "cli", }), @@ -734,12 +737,12 @@ describe("BackendConnector", () => { await tick(); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ - type: "slash_command_error", + kind: "SLASH_PASSTHROUGH_ERROR", command: "/context", - request_id: "req-ctx", + requestId: "req-ctx", }), ); expect(session.pendingPassthroughs).toHaveLength(0); @@ -783,7 +786,7 @@ describe("BackendConnector", () => { `); }); - it("broadcasts cli_disconnected when stream ends unexpectedly", async () => { + it("routes BACKEND_DISCONNECTED signal when stream ends unexpectedly", async () => { const testSession = new TestBackendSession("sess-1"); const adapter = new TestAdapter(); adapter.nextSession = testSession; @@ -799,16 +802,10 @@ describe("BackendConnector", () => { await tick(50); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith(session, { - type: "cli_disconnected", + expect(deps.routeSystemSignal).toHaveBeenCalledWith(session, { + kind: "BACKEND_DISCONNECTED", + reason: "stream ended", }); - expect(deps.emitEvent).toHaveBeenCalledWith( - "backend:disconnected", - expect.objectContaining({ - sessionId: "sess-1", - reason: "stream ended", - }), - ); expect(session.backendSession).toBeNull(); }); @@ -859,15 +856,9 @@ describe("BackendConnector", () => { error: expect.any(Error), }), ); - expect(deps.emitEvent).toHaveBeenCalledWith( - "backend:disconnected", - expect.objectContaining({ - sessionId: "sess-1", - reason: "stream ended", - }), - ); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith(session, { - type: "cli_disconnected", + expect(deps.routeSystemSignal).toHaveBeenCalledWith(session, { + kind: "BACKEND_DISCONNECTED", + reason: "stream ended", }); expect(session.backendSession).toBeNull(); }); @@ -939,7 +930,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { }, } as unknown as CLIMessage); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ content: "plain string and object" }), ); @@ -965,7 +956,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { message: { content: { text: "object text" } }, } as unknown as CLIMessage); - expect(deps2.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps2.routeSystemSignal).toHaveBeenCalledWith( session2, expect.objectContaining({ content: "object text" }), ); @@ -979,7 +970,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { message: { content: null }, } as unknown as CLIMessage); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ content: "" }), ); @@ -1001,7 +992,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { message: { content: { notText: "value" } }, } as unknown as CLIMessage); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ content: "" }), ); @@ -1023,7 +1014,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { message: { content: { text: 42 } }, } as unknown as CLIMessage); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ content: "" }), ); @@ -1045,7 +1036,7 @@ describe("BackendConnector — cliUserEchoToText via passthrough", () => { message: { content: "Context Usage" }, } as unknown as CLIMessage); - expect(deps.broadcaster.broadcast).toHaveBeenCalledWith( + expect(deps.routeSystemSignal).toHaveBeenCalledWith( session, expect.objectContaining({ content: "Context Usage" }), ); diff --git a/src/core/backend/backend-connector.test.ts b/src/core/backend/backend-connector.test.ts index 4995ec1..99147c5 100644 --- a/src/core/backend/backend-connector.test.ts +++ b/src/core/backend/backend-connector.test.ts @@ -47,6 +47,7 @@ function createDeps(overrides?: Partial): BackendConnector sendTo: vi.fn(), } as any, routeUnifiedMessage: vi.fn(), + routeSystemSignal: vi.fn(), emitEvent: vi.fn(), getRuntime: () => mockRuntime as any, ...overrides, diff --git a/src/core/backend/backend-connector.ts b/src/core/backend/backend-connector.ts index 1afe1d7..112f80e 100644 --- a/src/core/backend/backend-connector.ts +++ b/src/core/backend/backend-connector.ts @@ -21,6 +21,7 @@ import { CLI_ADAPTER_NAMES, type CliAdapterName } from "../interfaces/adapter-na import type { AdapterResolver } from "../interfaces/adapter-resolver.js"; import type { BackendAdapter, BackendSession } from "../interfaces/backend-adapter.js"; import { type MessageTracer, noopTracer, type TraceOutcome } from "../messaging/message-tracer.js"; +import type { SystemSignal } from "../session/session-event.js"; import type { Session } from "../session/session-repository.js"; import type { SessionRuntime } from "../session/session-runtime.js"; import type { UnifiedMessage } from "../types/unified-message.js"; @@ -39,6 +40,7 @@ export interface BackendConnectorDeps { metrics: MetricsCollector | null; broadcaster: ConsumerBroadcaster; routeUnifiedMessage: (session: Session, msg: UnifiedMessage) => void; + routeSystemSignal: (session: Session, signal: SystemSignal) => void; emitEvent: EmitEvent; getRuntime: (session: Session) => SessionRuntime; tracer?: MessageTracer; @@ -53,6 +55,7 @@ export class BackendConnector { private metrics: MetricsCollector | null; private broadcaster: ConsumerBroadcaster; private routeUnifiedMessage: (session: Session, msg: UnifiedMessage) => void; + private routeSystemSignal: (session: Session, signal: SystemSignal) => void; private emitEvent: EmitEvent; private runtime: (session: Session) => SessionRuntime; private tracer: MessageTracer; @@ -65,6 +68,7 @@ export class BackendConnector { this.metrics = deps.metrics; this.broadcaster = deps.broadcaster; this.routeUnifiedMessage = deps.routeUnifiedMessage; + this.routeSystemSignal = deps.routeSystemSignal; this.emitEvent = deps.emitEvent; this.runtime = deps.getRuntime; this.tracer = deps.tracer ?? noopTracer; @@ -286,15 +290,10 @@ export class BackendConnector { this.shiftPendingPassthroughEntry(session); this.passthroughTextBuffers.delete(session.id); const error = `Pending passthrough "${pending.command}" produced empty output`; - this.broadcaster.broadcast(session, { - type: "slash_command_error", - command: pending.command, - request_id: pending.requestId, - error, - }); - this.emitEvent("slash_command:failed", { - sessionId: session.id, + this.routeSystemSignal(session, { + kind: "SLASH_PASSTHROUGH_ERROR", command: pending.command, + requestId: pending.requestId, error, }); this.tracer.error("bridge", "slash_command_error", error, { @@ -315,10 +314,10 @@ export class BackendConnector { this.shiftPendingPassthroughEntry(session); this.passthroughTextBuffers.delete(session.id); - this.broadcaster.broadcast(session, { - type: "slash_command_result", + this.routeSystemSignal(session, { + kind: "SLASH_PASSTHROUGH_RESULT", command: pending.command, - request_id: pending.requestId, + requestId: pending.requestId, content, source: "cli", }); @@ -335,12 +334,6 @@ export class BackendConnector { outcome: "success", }, ); - this.emitEvent("slash_command:executed", { - sessionId: session.id, - command: pending.command, - source: "cli", - durationMs: 0, - }); this.emitSlashSummary(session.id, pending, "success", matchedPath); } @@ -409,10 +402,10 @@ export class BackendConnector { this.passthroughTextBuffers.delete(session.id); const content = this.cliUserEchoToText(rawMsg.message.content); - this.broadcaster.broadcast(session, { - type: "slash_command_result", + this.routeSystemSignal(session, { + kind: "SLASH_PASSTHROUGH_RESULT", command: pending.command, - request_id: pending.requestId, + requestId: pending.requestId, content, source: "cli", }); @@ -429,12 +422,6 @@ export class BackendConnector { outcome: "intercepted_user_echo", }, ); - this.emitEvent("slash_command:executed", { - sessionId: session.id, - command: pending.command, - source: "cli", - durationMs: 0, - }); this.emitSlashSummary(session.id, pending, "intercepted_user_echo", "assistant_text"); return true; }); @@ -454,8 +441,7 @@ export class BackendConnector { type: "backend:connected", sessionId: session.id, }); - this.broadcaster.broadcast(session, { type: "cli_connected" }); - this.emitEvent("backend:connected", { sessionId: session.id }); + this.routeSystemSignal(session, { kind: "BACKEND_CONNECTED" }); // Flush any pending messages const pendingMessages = this.drainPendingMessagesQueue(session); @@ -490,12 +476,7 @@ export class BackendConnector { type: "backend:disconnected", sessionId: session.id, }); - this.broadcaster.broadcast(session, { type: "cli_disconnected" }); - this.emitEvent("backend:disconnected", { - sessionId: session.id, - code: 1000, - reason: "normal", - }); + this.routeSystemSignal(session, { kind: "BACKEND_DISCONNECTED", reason: "normal" }); this.cancelPendingPermissions(session); } @@ -560,15 +541,10 @@ export class BackendConnector { while (true) { const pending = this.shiftPendingPassthroughEntry(session); if (!pending) break; - this.broadcaster.broadcast(session, { - type: "slash_command_error", - command: pending.command, - request_id: pending.requestId, - error: errorMsg, - }); - this.emitEvent("slash_command:failed", { - sessionId, + this.routeSystemSignal(session, { + kind: "SLASH_PASSTHROUGH_ERROR", command: pending.command, + requestId: pending.requestId, error: errorMsg, }); this.emitSlashSummary(sessionId, pending, "backend_error", "none", [errorMsg]); @@ -585,27 +561,17 @@ export class BackendConnector { while (true) { const pending = this.shiftPendingPassthroughEntry(session); if (!pending) break; - this.broadcaster.broadcast(session, { - type: "slash_command_error", - command: pending.command, - request_id: pending.requestId, - error: "Backend stream ended unexpectedly", - }); - this.emitEvent("slash_command:failed", { - sessionId, + this.routeSystemSignal(session, { + kind: "SLASH_PASSTHROUGH_ERROR", command: pending.command, + requestId: pending.requestId, error: "Backend stream ended unexpectedly", }); this.emitSlashSummary(sessionId, pending, "backend_error", "none", ["stream ended"]); } this.applyBackendDisconnectedState(session); this.passthroughTextBuffers.delete(session.id); - this.broadcaster.broadcast(session, { type: "cli_disconnected" }); - this.emitEvent("backend:disconnected", { - sessionId, - code: 1000, - reason: "stream ended", - }); + this.routeSystemSignal(session, { kind: "BACKEND_DISCONNECTED", reason: "stream ended" }); this.cancelPendingPermissions(session); } })(); diff --git a/src/core/consumer/consumer-gateway.test.ts b/src/core/consumer/consumer-gateway.test.ts index a36f137..c8fd148 100644 --- a/src/core/consumer/consumer-gateway.test.ts +++ b/src/core/consumer/consumer-gateway.test.ts @@ -28,6 +28,7 @@ function createMockRuntime(session: any) { getPendingPermissions: vi.fn(() => Array.from(session.data.pendingPermissions.values())), getQueuedMessage: vi.fn(() => session.data.queuedMessage), isBackendConnected: vi.fn(() => false), + process: vi.fn(), } as any; } @@ -187,7 +188,10 @@ describe("ConsumerGateway", () => { }), ); expect(sent.some((m) => m.type === "cli_disconnected")).toBe(true); - expect(emitted.some((e) => e.event === "backend:relaunch_needed")).toBe(true); + expect(vi.mocked(mockRuntime.process)).toHaveBeenCalledWith({ + type: "SYSTEM_SIGNAL", + signal: { kind: "BACKEND_RELAUNCH_NEEDED" }, + }); }); it("sends message history, capabilities, pending permissions, and queued message on open", () => { @@ -250,7 +254,7 @@ describe("ConsumerGateway", () => { }); it("sends cli_connected when backend is already connected", () => { - const { gateway, ws, sentToWs, emitted } = createHarness({ + const { gateway, ws, sentToWs, mockRuntime } = createHarness({ backendConnected: true, history: [], pendingPermissions: [], @@ -259,7 +263,11 @@ describe("ConsumerGateway", () => { gateway.handleConsumerOpen(ws, { sessionId: "s1" } as any); expect(sentToWs().some((m) => m.type === "cli_connected")).toBe(true); - expect(emitted.some((e) => e.event === "backend:relaunch_needed")).toBe(false); + expect(vi.mocked(mockRuntime.process)).not.toHaveBeenCalledWith( + expect.objectContaining({ + signal: { kind: "BACKEND_RELAUNCH_NEEDED" }, + }), + ); }); it("authenticator path accepts asynchronously", async () => { diff --git a/src/core/consumer/consumer-gateway.ts b/src/core/consumer/consumer-gateway.ts index 544ee33..df723ad 100644 --- a/src/core/consumer/consumer-gateway.ts +++ b/src/core/consumer/consumer-gateway.ts @@ -268,7 +268,7 @@ export class ConsumerGateway { this.deps.logger.info( `Consumer connected but CLI is dead for session ${sessionId}, requesting relaunch`, ); - this.deps.emit("backend:relaunch_needed", { sessionId }); + rt.process({ type: "SYSTEM_SIGNAL", signal: { kind: "BACKEND_RELAUNCH_NEEDED" } }); } } diff --git a/src/core/session-coordinator.ts b/src/core/session-coordinator.ts index cc735cf..061e645 100644 --- a/src/core/session-coordinator.ts +++ b/src/core/session-coordinator.ts @@ -158,19 +158,13 @@ export class SessionCoordinator extends TypedEventEmitter this.getOrCreateRuntime(s).process({ type: "BACKEND_MESSAGE", message: msg }), ), + routeSystemSignal: ( + session: Session, + signal: import("./session/session-event.js").SystemSignal, + ) => + this.withMutableSession(session.id, "routeSystemSignal", (s) => + this.getOrCreateRuntime(s).process({ type: "SYSTEM_SIGNAL", signal }), + ), emitEvent: this.emitEvent as ( type: keyof BridgeEventMap, payload: BridgeEventMap[keyof BridgeEventMap], @@ -772,7 +773,7 @@ export class SessionCoordinator extends TypedEventEmitter { diff --git a/src/core/session/session-event.ts b/src/core/session/session-event.ts index 807ed52..4156668 100644 --- a/src/core/session/session-event.ts +++ b/src/core/session/session-event.ts @@ -41,6 +41,10 @@ export type SystemSignal = | { kind: "RECONNECT_TIMEOUT" } /** Capabilities did not arrive within the timeout window. */ | { kind: "CAPABILITIES_TIMEOUT" } + /** Backend needs to be relaunched (e.g. consumer connected but CLI is dead). */ + | { kind: "BACKEND_RELAUNCH_NEEDED" } + /** Session is closing — initiated by coordinator before teardown. */ + | { kind: "SESSION_CLOSING" } /** Explicit session close initiated by coordinator. */ | { kind: "SESSION_CLOSED" } /** Merge a partial SessionState patch into data.state (no lifecycle change). */ @@ -50,7 +54,17 @@ export type SystemSignal = /** Set queuedMessage (managed by MessageQueueHandler). */ | { kind: "QUEUED_MESSAGE_UPDATED"; message: QueuedMessage | null } /** Optimistic model update with session_update broadcast (used by sendSetModel). */ - | { kind: "MODEL_UPDATED"; model: string }; + | { kind: "MODEL_UPDATED"; model: string } + /** Slash passthrough command completed successfully. */ + | { + kind: "SLASH_PASSTHROUGH_RESULT"; + command: string; + requestId?: string; + content: string; + source: "cli" | "emulated"; + } + /** Slash passthrough command failed. */ + | { kind: "SLASH_PASSTHROUGH_ERROR"; command: string; requestId?: string; error: string }; /** * Discriminated union of all events that SessionRuntime.process() accepts. diff --git a/src/core/session/session-reducer.ts b/src/core/session/session-reducer.ts index f41b234..179d3cd 100644 --- a/src/core/session/session-reducer.ts +++ b/src/core/session/session-reducer.ts @@ -113,6 +113,89 @@ function reduceSystemSignal(data: SessionData, signal: SystemSignal): [SessionDa } } + // Signals with custom effects but no lifecycle change + if (signal.kind === "BACKEND_RELAUNCH_NEEDED") { + return [data, [ + { type: "EMIT_EVENT", eventType: "backend:relaunch_needed", payload: {} }, + { type: "BROADCAST", message: { type: "cli_disconnected" } } + ]]; + } + + if (signal.kind === "SLASH_PASSTHROUGH_RESULT") { + return [ + data, + [ + { + type: "BROADCAST", + message: { + type: "slash_command_result", + command: signal.command, + request_id: signal.requestId, + content: signal.content, + source: signal.source, + }, + }, + { + type: "EMIT_EVENT", + eventType: "slash_command:executed", + payload: { command: signal.command, source: signal.source, durationMs: 0 }, + }, + ], + ]; + } + + if (signal.kind === "SLASH_PASSTHROUGH_ERROR") { + return [ + data, + [ + { + type: "BROADCAST", + message: { + type: "slash_command_error", + command: signal.command, + request_id: signal.requestId, + error: signal.error, + }, + }, + { + type: "EMIT_EVENT", + eventType: "slash_command:failed", + payload: { command: signal.command, error: signal.error }, + }, + ], + ]; + } + + // Signals with custom effects AND lifecycle transitions + if (signal.kind === "BACKEND_CONNECTED") { + const next: LifecycleState = "active"; + const effects: Effect[] = [ + { type: "BROADCAST", message: { type: "cli_connected" } }, + { type: "EMIT_EVENT", eventType: "backend:connected", payload: {} }, + ]; + if (isLifecycleTransitionAllowed(data.lifecycle, next)) { + return [{ ...data, lifecycle: next }, effects]; + } + return [data, effects]; + } + + if (signal.kind === "BACKEND_DISCONNECTED") { + const next: LifecycleState | null = + data.lifecycle === "active" || data.lifecycle === "idle" ? "degraded" : null; + const effects: Effect[] = [ + { type: "BROADCAST", message: { type: "cli_disconnected" } }, + { + type: "EMIT_EVENT", + eventType: "backend:disconnected", + payload: { code: 1000, reason: signal.reason }, + }, + ]; + if (next && isLifecycleTransitionAllowed(data.lifecycle, next)) { + return [{ ...data, lifecycle: next }, effects]; + } + return [data, effects]; + } + // Lifecycle-only signals const next = lifecycleForSignal(data.lifecycle, signal); if (!next || !isLifecycleTransitionAllowed(data.lifecycle, next)) { @@ -122,24 +205,29 @@ function reduceSystemSignal(data: SessionData, signal: SystemSignal): [SessionDa } /** Map a SystemSignal kind to the target lifecycle state, or null if no transition. */ -function lifecycleForSignal(current: LifecycleState, signal: SystemSignal): LifecycleState | null { +function lifecycleForSignal(_current: LifecycleState, signal: SystemSignal): LifecycleState | null { switch (signal.kind) { + // BACKEND_CONNECTED and BACKEND_DISCONNECTED are handled in reduceSystemSignal() directly case "BACKEND_CONNECTED": - return "active"; case "BACKEND_DISCONNECTED": - return current === "active" || current === "idle" ? "degraded" : null; + return null; + case "SESSION_CLOSING": + return "closing"; case "SESSION_CLOSED": return "closed"; case "RECONNECT_TIMEOUT": return "degraded"; case "IDLE_REAP": return "closing"; + case "BACKEND_RELAUNCH_NEEDED": + case "SLASH_PASSTHROUGH_RESULT": + case "SLASH_PASSTHROUGH_ERROR": case "CAPABILITIES_TIMEOUT": case "CONSUMER_CONNECTED": case "CONSUMER_DISCONNECTED": case "GIT_INFO_RESOLVED": case "CAPABILITIES_READY": - // No pure data change for these — handled by runtime orchestration. + // No pure data change for these — handled by runtime orchestration or reduceSystemSignal(). return null; // Data-patch signals handled above — not lifecycle transitions case "STATE_PATCHED": diff --git a/src/core/session/session-runtime.test.ts b/src/core/session/session-runtime.test.ts index 127d405..e490615 100644 --- a/src/core/session/session-runtime.test.ts +++ b/src/core/session/session-runtime.test.ts @@ -123,7 +123,8 @@ describe("SessionRuntime", () => { const runtime = new SessionRuntime(session, deps); const ws = createTestSocket(); - expect(runtime.transitionLifecycle("closed", "test:force-close")).toBe(true); + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSING" } }); + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSED" } }); runtime.process({ type: "INBOUND_COMMAND", @@ -731,22 +732,18 @@ describe("SessionRuntime", () => { expect(deps.slashService.executeProgrammatic).toHaveBeenCalledWith(session, "/status"); }); - it("reports invalid lifecycle transitions via callback", () => { + it("rejects invalid lifecycle transitions (closed → active)", () => { const session = createMockSession({ id: "s1" }); const deps = makeDeps(); const runtime = new SessionRuntime(session, deps); - expect(runtime.transitionLifecycle("closed", "force-close")).toBe(true); - expect(runtime.transitionLifecycle("active", "invalid-reopen")).toBe(false); + // Transition to closed via SYSTEM_SIGNAL + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSING" } }); + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSED" } }); + expect(runtime.getLifecycleState()).toBe("closed"); - expect(deps.logger.warn).toHaveBeenCalledWith( - "Session lifecycle invalid transition", - expect.objectContaining({ - sessionId: "s1", - current: "closed", - next: "active", - }), - ); + // Attempt to reopen — should stay closed (reducer rejects invalid transitions) + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "BACKEND_CONNECTED" } }); expect(runtime.getLifecycleState()).toBe("closed"); }); diff --git a/src/core/session/session-runtime.ts b/src/core/session/session-runtime.ts index d28bc6a..47d09c1 100644 --- a/src/core/session/session-runtime.ts +++ b/src/core/session/session-runtime.ts @@ -411,7 +411,7 @@ export class SessionRuntime { return limiter.tryConsume(); } - transitionLifecycle(next: LifecycleState, reason: string): boolean { + private transitionLifecycle(next: LifecycleState, reason: string): boolean { const current = this.session.data.lifecycle; if (current === next) return true; if (!isLifecycleTransitionAllowed(current, next)) { @@ -721,8 +721,6 @@ export class SessionRuntime { } this.emitTeamEvents(prevData.state.team); - - this.applyLifecycleFromBackendMessage(msg); } private orchestrateSessionInit(msg: UnifiedMessage): void { @@ -826,30 +824,6 @@ export class SessionRuntime { this.deps.slashService.handleInbound(this.session, msg); } - private applyLifecycleFromBackendMessage(msg: UnifiedMessage): void { - if (msg.type === "status_change") { - const status = typeof msg.metadata.status === "string" ? msg.metadata.status : null; - if (status === "idle") { - this.transitionLifecycle("idle", "backend:status_change:idle"); - } else if (status === "running" || status === "compacting") { - this.transitionLifecycle("active", `backend:status_change:${status}`); - } - return; - } - - if (msg.type === "result") { - this.transitionLifecycle("idle", "backend:result"); - return; - } - - if (msg.type === "stream_event") { - const event = msg.metadata.event as { type?: unknown } | undefined; - if (event?.type === "message_start" && !msg.metadata.parent_tool_use_id) { - this.transitionLifecycle("active", "backend:stream_event:message_start"); - } - } - } - private touchActivity(): void { this.session.lastActivity = Date.now(); } diff --git a/src/testing/adapter-test-helpers.ts b/src/testing/adapter-test-helpers.ts index d8c5435..82438ed 100644 --- a/src/testing/adapter-test-helpers.ts +++ b/src/testing/adapter-test-helpers.ts @@ -39,7 +39,6 @@ import { import { GitInfoTracker } from "../core/session/git-info-tracker.js"; import { MessageQueueHandler } from "../core/session/message-queue-handler.js"; import type { SessionData } from "../core/session/session-data.js"; -import type { SystemSignal } from "../core/session/session-event.js"; import { InMemorySessionLeaseCoordinator } from "../core/session/session-lease-coordinator.js"; import type { Session } from "../core/session/session-repository.js"; import { SessionRepository } from "../core/session/session-repository.js"; @@ -250,24 +249,20 @@ export function createBridgeWithAdapter(options?: { const runtimes = new Map(); // emitEvent: forwards to emitter, with lifecycle signal dispatch + // Note: backend:connected and backend:disconnected are now routed via routeSystemSignal + // in BackendConnector, so only session:closed needs interception here. const emitEvent = (type: string, payload: unknown): void => { if ( payload && typeof payload === "object" && "sessionId" in payload && - (type === "backend:connected" || type === "backend:disconnected" || type === "session:closed") + type === "session:closed" ) { const sessionId = (payload as { sessionId?: unknown }).sessionId; if (typeof sessionId === "string") { const runtime = runtimes.get(sessionId); if (runtime) { - const signal: SystemSignal = - type === "backend:connected" - ? { kind: "BACKEND_CONNECTED" } - : type === "backend:disconnected" - ? { kind: "BACKEND_DISCONNECTED", reason: "bridge-event" } - : { kind: "SESSION_CLOSED" }; - runtime.process({ type: "SYSTEM_SIGNAL", signal }); + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSED" } }); } } } @@ -420,6 +415,10 @@ export function createBridgeWithAdapter(options?: { withMutableSession(session.id, "handleBackendMessage", (s) => getOrCreateRuntime(s).process({ type: "BACKEND_MESSAGE", message: msg }), ), + routeSystemSignal: (session, signal) => + withMutableSession(session.id, "routeSystemSignal", (s) => + getOrCreateRuntime(s).process({ type: "SYSTEM_SIGNAL", signal }), + ), emitEvent: emitEvent as ( type: keyof BridgeEventMap, payload: BridgeEventMap[keyof BridgeEventMap], @@ -465,7 +464,7 @@ export function createBridgeWithAdapter(options?: { const session = store.get(sessionId); if (!session) return; const runtime = getOrCreateRuntime(session); - runtime.transitionLifecycle("closing", "session:close"); + runtime.process({ type: "SYSTEM_SIGNAL", signal: { kind: "SESSION_CLOSING" } }); capabilitiesPolicy.cancelPendingInitialize(session); if (runtime.getBackendSession()) { await runtime.closeBackendConnection().catch(() => {});