Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 17 additions & 31 deletions src/core/backend/backend-connector.adapter-selection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,8 @@ function mockResolver(adapters: Record<string, BackendAdapter>): AdapterResolver
*/
function createSessionAwareRuntime(session: any) {
return {
attachBackendConnection: vi.fn((params: any) => {
session.backendSession = params.backendSession;
session.backendAbort = params.backendAbort;
if (session.data)
session.data.adapterSupportsSlashPassthrough = params.supportsSlashPassthrough;
session.adapterSlashExecutor = params.slashExecutor;
}),
resetBackendConnectionState: vi.fn(() => {
session.backendSession = null;
session.backendAbort = null;
if (session.data) {
session.data.backendSessionId = undefined;
session.data.adapterSupportsSlashPassthrough = false;
}
session.adapterSlashExecutor = null;
}),
getBackendSession: vi.fn(() => session.backendSession ?? null),
getBackendAbort: vi.fn(() => session.backendAbort ?? null),
drainPendingMessages: vi.fn(() => {
const pending = session.data?.pendingMessages ?? [];
if (session.data) session.data.pendingMessages = [];
return pending;
}),
drainPendingPermissionIds: vi.fn(() => {
const pendingPermissions = session.data?.pendingPermissions ?? new Map();
const ids = Array.from(pendingPermissions.keys());
pendingPermissions.clear();
if (session.data) session.data.pendingPermissions = pendingPermissions;
return ids;
}),
peekPendingPassthrough: vi.fn(() => session.pendingPassthroughs?.[0]),
shiftPendingPassthrough: vi.fn(() => session.pendingPassthroughs?.shift()),
getState: vi.fn(() => {
Expand Down Expand Up @@ -102,9 +74,24 @@ describe("BackendConnector per-session adapter", () => {
return {
logger,
metrics: null,
broadcaster: { broadcast: vi.fn(), sendTo: vi.fn() } as any,
routeUnifiedMessage: vi.fn(),
routeSystemSignal: vi.fn(),
routeSystemSignal: vi.fn((session: any, signal: any) => {
if (signal.kind === "BACKEND_CONNECTED") {
session.backendSession = signal.backendSession;
session.backendAbort = signal.backendAbort;
if (session.data)
session.data.adapterSupportsSlashPassthrough = signal.supportsSlashPassthrough;
session.adapterSlashExecutor = signal.slashExecutor;
} else if (signal.kind === "BACKEND_DISCONNECTED") {
session.backendSession = null;
session.backendAbort = null;
if (session.data) {
session.data.backendSessionId = undefined;
session.data.adapterSupportsSlashPassthrough = false;
}
session.adapterSlashExecutor = null;
}
}),
emitEvent: vi.fn(),
getRuntime: (session: any) => {
if (!runtimeCache.has(session)) {
Expand Down Expand Up @@ -310,7 +297,6 @@ describe("BackendConnector per-session adapter", () => {
adapterResolver: null,
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } as any,
metrics: null,
broadcaster: { broadcast: vi.fn(), sendTo: vi.fn() } as any,
routeUnifiedMessage: vi.fn(),
routeSystemSignal: vi.fn(),
emitEvent: vi.fn(),
Expand Down
78 changes: 18 additions & 60 deletions src/core/backend/backend-connector.failure-injection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,8 @@ function makePassthrough(command: string, requestId = "req-1") {

function createMockRuntime(session: any) {
return {
attachBackendConnection: (params: any) => {
session.backendSession = params.backendSession;
session.backendAbort = params.backendAbort;
session.data.adapterSupportsSlashPassthrough = params.supportsSlashPassthrough;
session.adapterSlashExecutor = params.slashExecutor;
},
resetBackendConnectionState: () => {
session.backendSession = null;
session.backendAbort = null;
session.data.backendSessionId = undefined;
session.data.adapterSupportsSlashPassthrough = false;
session.adapterSlashExecutor = null;
},
getBackendSession: () => session.backendSession ?? null,
getBackendAbort: () => session.backendAbort ?? null,
drainPendingMessages: () => {
const p = session.data.pendingMessages;
session.data.pendingMessages = [];
return p;
},
drainPendingPermissionIds: () => {
const ids = Array.from(session.data.pendingPermissions.keys());
session.data.pendingPermissions.clear();
return ids;
},
peekPendingPassthrough: () => session.pendingPassthroughs[0],
shiftPendingPassthrough: () => session.pendingPassthroughs.shift(),
getState: () => session.data.state,
Expand All @@ -74,21 +51,32 @@ function buildConnectorDeps(
adapter: InstanceType<typeof FailureInjectionBackendAdapter>,
session: any,
emitEvent = vi.fn(),
broadcaster = { broadcast: vi.fn(), broadcastToParticipants: vi.fn(), sendTo: vi.fn() } as any,
) {
const routeSystemSignal = vi.fn();
const routeSystemSignal = vi.fn((sess: any, signal: any) => {
if (signal.kind === "BACKEND_CONNECTED") {
sess.backendSession = signal.backendSession;
sess.backendAbort = signal.backendAbort;
sess.data.adapterSupportsSlashPassthrough = signal.supportsSlashPassthrough;
sess.adapterSlashExecutor = signal.slashExecutor;
} else if (signal.kind === "BACKEND_DISCONNECTED") {
sess.backendSession = null;
sess.backendAbort = null;
sess.data.backendSessionId = undefined;
sess.data.adapterSupportsSlashPassthrough = false;
sess.adapterSlashExecutor = null;
}
});
const manager = new BackendConnector({
adapter,
adapterResolver: null,
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } as any,
metrics: null,
broadcaster,
routeUnifiedMessage: vi.fn(),
routeSystemSignal,
emitEvent,
getRuntime: () => createMockRuntime(session),
});
return { manager, emitEvent, broadcaster, routeSystemSignal };
return { manager, emitEvent, routeSystemSignal };
}

async function waitForAssertion(assertFn: () => void, timeoutMs = 500): Promise<void> {
Expand All @@ -108,19 +96,9 @@ describe("BackendConnector failure injection", () => {
it("emits disconnect and error events when backend stream fails", async () => {
const adapter = new FailureInjectionBackendAdapter();
const emitEvent = vi.fn();
const broadcaster = {
broadcast: vi.fn(),
broadcastToParticipants: vi.fn(),
sendTo: vi.fn(),
} as any;

const session = createSession("sess-fi");
const { manager, routeSystemSignal } = buildConnectorDeps(
adapter,
session,
emitEvent,
broadcaster,
);
const { manager, routeSystemSignal } = buildConnectorDeps(adapter, session, emitEvent);

await manager.connectBackend(session);

Expand All @@ -145,18 +123,8 @@ describe("BackendConnector failure injection", () => {
it("drains pending passthroughs with slash_command_error when stream fails (lines 594-605)", async () => {
const adapter = new FailureInjectionBackendAdapter();
const emitEvent = vi.fn();
const broadcaster = {
broadcast: vi.fn(),
broadcastToParticipants: vi.fn(),
sendTo: vi.fn(),
} as any;
const session = createSession("sess-drain-fail");
const { manager, routeSystemSignal } = buildConnectorDeps(
adapter,
session,
emitEvent,
broadcaster,
);
const { manager, routeSystemSignal } = buildConnectorDeps(adapter, session, emitEvent);

// Pre-populate pending passthrough entries
session.pendingPassthroughs.push(makePassthrough("/compact", "req-compact"));
Expand All @@ -179,18 +147,8 @@ describe("BackendConnector failure injection", () => {
it("drains pending passthroughs with slash_command_error when stream ends unexpectedly (lines 619-630)", async () => {
const adapter = new FailureInjectionBackendAdapter();
const emitEvent = vi.fn();
const broadcaster = {
broadcast: vi.fn(),
broadcastToParticipants: vi.fn(),
sendTo: vi.fn(),
} as any;
const session = createSession("sess-drain-end");
const { manager, routeSystemSignal } = buildConnectorDeps(
adapter,
session,
emitEvent,
broadcaster,
);
const { manager, routeSystemSignal } = buildConnectorDeps(adapter, session, emitEvent);

session.pendingPassthroughs.push(makePassthrough("/status", "req-status"));

Expand Down
78 changes: 35 additions & 43 deletions src/core/backend/backend-connector.lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,38 +167,8 @@ function tick(ms = 10): Promise<void> {
*/
function createSessionAwareRuntime(session: any) {
return {
attachBackendConnection: vi.fn((params: any) => {
session.backendSession = params.backendSession;
session.backendAbort = params.backendAbort;
if (!session.data) session.data = session;
session.data.adapterSupportsSlashPassthrough = params.supportsSlashPassthrough;
session.adapterSlashExecutor = params.slashExecutor;
}),
resetBackendConnectionState: vi.fn(() => {
session.backendSession = null;
session.backendAbort = null;
if (!session.data) session.data = session;
session.data.backendSessionId = undefined;
session.data.adapterSupportsSlashPassthrough = false;
session.adapterSlashExecutor = null;
}),
getBackendSession: vi.fn(() => session.backendSession ?? null),
getBackendAbort: vi.fn(() => session.backendAbort ?? null),
drainPendingMessages: vi.fn(() => {
const pending = ((session as any).pendingMessages ?? []) as UnifiedMessage[];
(session as any).pendingMessages = [];
if (session.data) session.data.pendingMessages = [];
return pending;
}),
drainPendingPermissionIds: vi.fn(() => {
const pendingPermissions =
((session as any).pendingPermissions as Map<string, unknown> | undefined) ?? new Map();
const ids = Array.from(pendingPermissions.keys());
pendingPermissions.clear();
(session as any).pendingPermissions = pendingPermissions;
if (session.data) session.data.pendingPermissions = pendingPermissions;
return ids;
}),
peekPendingPassthrough: vi.fn(() => (session as any).pendingPassthroughs?.[0]),
shiftPendingPassthrough: vi.fn(() => (session as any).pendingPassthroughs?.shift()),
getState: vi.fn(() => {
Expand Down Expand Up @@ -227,17 +197,31 @@ function createDeps(overrides?: Partial<BackendConnectorDeps>): BackendConnector
// We use a proxy-based getRuntime so each session gets its own runtime instance
// that mutates the session's own fields (mirroring the real SessionRuntime behavior).
const runtimeCache = new WeakMap<object, ReturnType<typeof createSessionAwareRuntime>>();
// routeSystemSignal mock applies BACKEND_CONNECTED and BACKEND_DISCONNECTED handle mutations
// (mirroring the real SessionRuntime post-reducer hook behavior).
const routeSystemSignal = vi.fn((session: any, signal: any) => {
if (signal.kind === "BACKEND_CONNECTED") {
session.backendSession = signal.backendSession;
session.backendAbort = signal.backendAbort;
if (!session.data) session.data = session;
session.data.adapterSupportsSlashPassthrough = signal.supportsSlashPassthrough;
session.adapterSlashExecutor = signal.slashExecutor;
} else if (signal.kind === "BACKEND_DISCONNECTED") {
session.backendSession = null;
session.backendAbort = null;
if (!session.data) session.data = session;
session.data.backendSessionId = undefined;
session.data.adapterSupportsSlashPassthrough = false;
session.adapterSlashExecutor = null;
}
});
return {
adapter: new TestAdapter(),
adapterResolver: null,
logger: noopLogger,
metrics: null,
broadcaster: {
broadcast: vi.fn(),
broadcastToParticipants: vi.fn(),
} as any,
routeUnifiedMessage: vi.fn(),
routeSystemSignal: vi.fn(),
routeSystemSignal,
emitEvent: vi.fn(),
getRuntime: (session) => {
if (!runtimeCache.has(session)) {
Expand Down Expand Up @@ -279,7 +263,10 @@ describe("BackendConnector", () => {
await mgr.connectBackend(session);

expect(session.backendSession).not.toBeNull();
expect(deps.routeSystemSignal).toHaveBeenCalledWith(session, { kind: "BACKEND_CONNECTED" });
expect(deps.routeSystemSignal).toHaveBeenCalledWith(
session,
expect.objectContaining({ kind: "BACKEND_CONNECTED" }),
);
});

it("closes existing backend session on reconnect", async () => {
Expand Down Expand Up @@ -314,7 +301,7 @@ describe("BackendConnector", () => {
);
});

it("flushes pending messages on connect", async () => {
it("routes BACKEND_CONNECTED signal (pending messages are drained via reducer effects)", async () => {
const testSession = new TestBackendSession("sess-1");
const adapter = new TestAdapter();
adapter.nextSession = testSession;
Expand All @@ -328,8 +315,12 @@ describe("BackendConnector", () => {

await mgr.connectBackend(session);

expect(testSession.sentMessages).toEqual([msg1, msg2]);
expect(session.data.pendingMessages).toEqual([]);
// The connector routes BACKEND_CONNECTED — the reducer drains pending messages
// via SEND_TO_BACKEND effects executed by SessionRuntime.
expect(deps.routeSystemSignal).toHaveBeenCalledWith(
session,
expect.objectContaining({ kind: "BACKEND_CONNECTED", backendSession: testSession }),
);
});

it("sets up passthrough handler when session supports it", async () => {
Expand Down Expand Up @@ -463,7 +454,7 @@ describe("BackendConnector", () => {
});

describe("disconnectBackend", () => {
it("disconnects and cancels pending permissions", async () => {
it("disconnects and routes BACKEND_DISCONNECTED signal (permission cancellation handled by reducer)", async () => {
const deps = createDeps();
const mgr = new BackendConnector(deps);
const testSession = new TestBackendSession("sess-1");
Expand All @@ -478,10 +469,11 @@ describe("BackendConnector", () => {
expect(testSession.closed).toBe(true);
expect(session.backendSession).toBeNull();
expect(session.backendAbort).toBeNull();
expect(session.data.pendingPermissions.size).toBe(0);
expect(deps.broadcaster.broadcastToParticipants).toHaveBeenCalledWith(
// Permission cancellation is now handled via BACKEND_DISCONNECTED reducer effects
// (BROADCAST_TO_PARTICIPANTS effects executed by SessionRuntime.executeEffects).
expect(deps.routeSystemSignal).toHaveBeenCalledWith(
session,
expect.objectContaining({ type: "permission_cancelled" }),
expect.objectContaining({ kind: "BACKEND_DISCONNECTED" }),
);
});

Expand Down
10 changes: 0 additions & 10 deletions src/core/backend/backend-connector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ import { BackendConnector } from "./backend-connector.js";

function createMockRuntime() {
return {
attachBackendConnection: vi.fn(),
resetBackendConnectionState: vi.fn(),
getBackendSession: vi.fn(() => null),
getBackendAbort: vi.fn(() => null),
drainPendingMessages: vi.fn(() => []),
drainPendingPermissionIds: vi.fn(() => []),
peekPendingPassthrough: vi.fn(() => undefined),
shiftPendingPassthrough: vi.fn(() => undefined),
getState: vi.fn(() => ({ slash_commands: [] })),
Expand Down Expand Up @@ -41,11 +37,6 @@ function createDeps(overrides?: Partial<BackendConnectorDeps>): BackendConnector
adapterResolver: null,
logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() } as any,
metrics: null,
broadcaster: {
broadcast: vi.fn(),
broadcastToParticipants: vi.fn(),
sendTo: vi.fn(),
} as any,
routeUnifiedMessage: vi.fn(),
routeSystemSignal: vi.fn(),
emitEvent: vi.fn(),
Expand Down Expand Up @@ -165,7 +156,6 @@ describe("BackendConnector", () => {
const mockRuntime = createMockRuntime();
mockRuntime.getBackendSession.mockReturnValue(backendSession);
mockRuntime.getBackendAbort.mockReturnValue({ abort: vi.fn() } as any);
mockRuntime.drainPendingPermissionIds.mockReturnValue([]);

const deps = createDeps({
getRuntime: () => mockRuntime as any,
Expand Down
Loading