diff --git a/t3code/packages/effect-acp/.provenance.json b/t3code/packages/effect-acp/.provenance.json new file mode 100644 index 000000000..0bbb8312c --- /dev/null +++ b/t3code/packages/effect-acp/.provenance.json @@ -0,0 +1,5 @@ +{ + "agent_name": "Hermes Agent (zhangjiayang6835-cyber)", + "config_snapshot": "AGENTS.md loaded for t3code before submission:\n# AGENTS.md\n\n## Task Completion Requirements\n\n- All of `bun fmt`, `bun lint`, and `bun typecheck` must pass before considering tasks completed.\n- NEVER run `bun test`. Always use `bun run test` (runs Vitest).\n\n## Project Snapshot\n\nT3 Code is a minimal web GUI for using coding agents like Codex and Claude.\n\nThis repository is a VERY EARLY WIP. Proposing sweeping changes that improve long-term maintainability is encouraged.\n\n## Core Priorities\n\n1. Performance first.\n2. Reliability first.\n3. Keep behavior predictable under load and during failures (session restarts, reconnects, partial streams).\n\nIf a tradeoff is required, choose correctness and robustness over short-term convenience.\n\n## Maintainability\n\nLong term maintainability is a core priority. If you add new functionality, first check if there is shared logic that can be extracted to a separate module. Duplicate logic across multiple files is a code smell and should be avoided. Don't be afraid to change existing code. Don't take shortcuts by just adding local logic to solve a problem.\n\n## Package Roles\n\n- `apps/server`: Node.js WebSocket server. Wraps Codex app-server (JSON-RPC over stdio), serves the React web app, and manages provider sessions.\n- `apps/web`: React/Vite UI. Owns session UX, conversation/event rendering, and client-side state. Connects to the server via WebSocket.\n- `packages/contracts`: Shared effect/Schema schemas and TypeScript contracts for provider events, WebSocket protocol, and model/session types. Keep this package schema-only — no runtime logic.\n- `packages/shared`: Shared runtime utilities consumed by both server and web.\n\n## Codex App Server (Important)\n\nT3 Code is currently Codex-first. The server starts `codex app-server` (JSON-RPC over stdio) per provider session, then streams structured events to the browser through WebSocket push messages.\n\nHow we use it in this codebase:\n\n- Session startup/resume and turn lifecycle are brokered in `apps/server/src/codexAppServerManager.ts`.\n- Provider dispatch and thread event logging are coordinated in `apps/server/src/providerManager.ts`.\n- WebSocket server routes NativeApi methods in `apps/server/src/wsServer.ts`.\n- Web app consumes orchestration domain events via WebSocket push on channel `orchestration.domainEvent` (provider runtime activity is projected into orchestration events server-side).\n\nIssue #829 requirements: implement automatic token refresh in ACP client for 401 Unauthorized responses using Effect.retry with a custom schedule. Store refresh token separately. Add onSessionExpired callback. Use Effect.acquireRelease for cleanup. Queue concurrent requests during re-auth. Include tests and provenance.json.", + "created": "2026-05-27T17:30:00Z" +} diff --git a/t3code/packages/effect-acp/src/client.test.ts b/t3code/packages/effect-acp/src/client.test.ts index 5c9a6f833..d68d6fa94 100644 --- a/t3code/packages/effect-acp/src/client.test.ts +++ b/t3code/packages/effect-acp/src/client.test.ts @@ -10,6 +10,7 @@ import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; +import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { it, assert } from "@effect/vitest"; @@ -24,6 +25,18 @@ const InitializeRequest = jsonRpcRequest("initialize", AcpSchema.InitializeReque const InitializeResponse = jsonRpcResponse(AcpSchema.InitializeResponse); const ExtRequest = jsonRpcRequest("x/test", Schema.Struct({ hello: Schema.String })); const ExtResponse = jsonRpcResponse(Schema.Struct({ ok: Schema.Boolean })); +const AuthenticateRequest = jsonRpcRequest("authenticate", AcpSchema.AuthenticateRequest); +const AuthenticateResponse = jsonRpcResponse(AcpSchema.AuthenticateResponse); +const PromptRequest = jsonRpcRequest("session/prompt", AcpSchema.PromptRequest); +const PromptResponse = jsonRpcResponse(AcpSchema.PromptResponse); +const encoder = new TextEncoder(); +const rpcParser = RpcSerialization.ndJsonRpc().makeUnsafe(); +const encodeRpcMessage = (message: unknown) => { + const encoded = ( + rpcParser as { encode: (message: unknown) => string | Uint8Array | undefined } + ).encode(message); + return typeof encoded === "string" ? encoder.encode(encoded) : (encoded ?? encoder.encode("")); +}; const mockPeerPath = Effect.map(Effect.service(Path.Path), (path) => path.join(import.meta.dirname, "../test/fixtures/acp-mock-peer.ts"), ); @@ -446,4 +459,91 @@ it.layer(NodeServices.layer)("effect-acp client", (it) => { yield* Scope.close(scope, Exit.void); }), ); + + it.effect("refreshes authentication once and replays a request after 401", () => + Effect.gen(function* () { + const expiredSessions = yield* Ref.make>([]); + const { stdio, input, output } = yield* makeInMemoryStdio(); + const scope = yield* Scope.make(); + const acp = yield* AcpClient.make(stdio, { + onSessionExpired: (sessionId) => + Ref.update(expiredSessions, (sessions) => [...sessions, sessionId]), + }).pipe(Effect.provideService(Scope.Scope, scope)); + + const authFiber = yield* acp.agent + .authenticate({ methodId: "cursor_login" }) + .pipe(Effect.forkScoped); + const initialAuth = yield* Schema.decodeEffect(Schema.fromJsonString(AuthenticateRequest))( + yield* Queue.take(output), + ); + yield* Queue.offer( + input, + yield* encodeJsonl(AuthenticateResponse, { + jsonrpc: "2.0", + id: initialAuth.id, + result: { _meta: { refreshToken: "refresh-1" } }, + }), + ); + yield* Fiber.join(authFiber); + + const promptPayload: AcpSchema.PromptRequest = { + sessionId: "expired-session", + prompt: [{ type: "text", text: "hello" }], + }; + const promptFiber = yield* acp.agent.prompt(promptPayload).pipe(Effect.forkScoped); + + const firstPrompt = yield* Schema.decodeEffect(Schema.fromJsonString(PromptRequest))( + yield* Queue.take(output), + ); + yield* Queue.offer( + input, + encodeRpcMessage({ + _tag: "Exit", + requestId: String(firstPrompt.id), + exit: { + _tag: "Failure", + cause: [ + { + _tag: "Fail", + error: { code: -32000, message: "401 Unauthorized" }, + }, + ], + }, + }), + ); + + const refreshAuth = yield* Schema.decodeEffect(Schema.fromJsonString(AuthenticateRequest))( + yield* Queue.take(output), + ); + assert.deepEqual(refreshAuth.params, { + methodId: "cursor_login", + _meta: { refreshToken: "refresh-1" }, + }); + yield* Queue.offer( + input, + yield* encodeJsonl(AuthenticateResponse, { + jsonrpc: "2.0", + id: refreshAuth.id, + result: { _meta: { refreshToken: "refresh-2" } }, + }), + ); + + const replayedPrompt = yield* Schema.decodeEffect(Schema.fromJsonString(PromptRequest))( + yield* Queue.take(output), + ); + assert.deepEqual(replayedPrompt.params, firstPrompt.params); + yield* Queue.offer( + input, + yield* encodeJsonl(PromptResponse, { + jsonrpc: "2.0", + id: replayedPrompt.id, + result: { stopReason: "end_turn" }, + }), + ); + + assert.deepEqual(yield* Fiber.join(promptFiber), { stopReason: "end_turn" }); + assert.deepEqual(yield* Ref.get(expiredSessions), ["unknown"]); + yield* Scope.close(scope, Exit.void); + }), + ); }); diff --git a/t3code/packages/effect-acp/src/client.ts b/t3code/packages/effect-acp/src/client.ts index 3052726ed..207d21925 100644 --- a/t3code/packages/effect-acp/src/client.ts +++ b/t3code/packages/effect-acp/src/client.ts @@ -4,6 +4,7 @@ import * as Stdio from "effect/Stdio"; import * as Layer from "effect/Layer"; import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; +import * as Semaphore from "effect/Semaphore"; import * as Stream from "effect/Stream"; import * as RpcClient from "effect/unstable/rpc/RpcClient"; import * as RpcServer from "effect/unstable/rpc/RpcServer"; @@ -26,6 +27,7 @@ export interface AcpClientOptions { readonly logIncoming?: boolean; readonly logOutgoing?: boolean; readonly logger?: (event: AcpProtocol.AcpProtocolLogEvent) => Effect.Effect; + readonly onSessionExpired?: (sessionId: string) => Effect.Effect; } type AcpClientRaw = { @@ -330,6 +332,30 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* ( let unknownExtNotificationHandler: | ((method: string, params: unknown) => Effect.Effect) | undefined; + let activeSessionId: string | undefined; + let lastAuthPayload: AcpSchema.AuthenticateRequest | undefined; + let refreshToken: unknown; + + const isAuthExpiredError = (error: AcpError.AcpError) => { + const message = + error._tag === "AcpRequestError" + ? error.errorMessage + : error._tag === "AcpTransportError" || error._tag === "AcpProtocolParseError" + ? error.detail + : error.message; + return ( + (error._tag === "AcpRequestError" && error.code === -32000) || + /\b(401|unauthori[sz]ed|auth(?:entication)? required|session expired)\b/i.test(message) + ); + }; + + const updateRefreshToken = (response: { + readonly _meta?: { readonly [x: string]: unknown } | null; + }) => { + if (response._meta && "refreshToken" in response._meta) { + refreshToken = response._meta.refreshToken; + } + }; const runNotificationHandlers = ( registration: BufferedNotificationHandler, @@ -455,6 +481,63 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* ( const rpc = yield* RpcClient.make(AcpRpcs.AgentRpcs, { generateRequestId: () => nextRpcRequestId++ as never, }).pipe(Effect.provideService(RpcClient.Protocol, transport.clientProtocol)); + const refreshSemaphore = yield* Semaphore.make(1); + + const authenticateRaw: ( + payload: AcpSchema.AuthenticateRequest, + ) => Effect.Effect = (payload) => + callRpc(rpc[AGENT_METHODS.authenticate](payload)).pipe( + Effect.tap((response) => + Effect.sync(() => { + lastAuthPayload = payload; + updateRefreshToken(response); + }), + ), + ); + + const reauthenticateOnce = Effect.fn("effect-acp/AcpClient.reauthenticateOnce")(function* () { + if (!lastAuthPayload) { + return yield* AcpError.AcpRequestError.authRequired( + "ACP session expired and no authentication payload is available", + ); + } + + const sessionId = activeSessionId ?? "unknown"; + yield* Effect.scoped( + Effect.acquireRelease( + options.onSessionExpired ? options.onSessionExpired(sessionId) : Effect.void, + () => Effect.void, + ), + ); + + const payload = + refreshToken === undefined + ? lastAuthPayload + : { + ...lastAuthPayload, + _meta: { + ...(lastAuthPayload._meta ?? {}), + refreshToken, + }, + }; + yield* authenticateRaw(payload); + }); + + const ensureReauthenticated = (observedRefreshToken: unknown) => + refreshSemaphore.withPermit( + Effect.suspend(() => + Object.is(observedRefreshToken, refreshToken) ? reauthenticateOnce() : Effect.void, + ), + ); + + const withAuthRefresh = (request: () => Effect.Effect) => + request().pipe( + Effect.catch((error: AcpError.AcpError) => + isAuthExpiredError(error) + ? ensureReauthenticated(refreshToken).pipe(Effect.flatMap(() => request())) + : Effect.fail(error), + ), + ); return AcpClient.of({ raw: { @@ -464,18 +547,32 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* ( }, agent: { initialize: (payload) => callRpc(rpc[AGENT_METHODS.initialize](payload)), - authenticate: (payload) => callRpc(rpc[AGENT_METHODS.authenticate](payload)), + authenticate: (payload) => authenticateRaw(payload), logout: (payload) => callRpc(rpc[AGENT_METHODS.logout](payload)), - createSession: (payload) => callRpc(rpc[AGENT_METHODS.session_new](payload)), - loadSession: (payload) => callRpc(rpc[AGENT_METHODS.session_load](payload)), - listSessions: (payload) => callRpc(rpc[AGENT_METHODS.session_list](payload)), - forkSession: (payload) => callRpc(rpc[AGENT_METHODS.session_fork](payload)), - resumeSession: (payload) => callRpc(rpc[AGENT_METHODS.session_resume](payload)), - closeSession: (payload) => callRpc(rpc[AGENT_METHODS.session_close](payload)), - setSessionModel: (payload) => callRpc(rpc[AGENT_METHODS.session_set_model](payload)), + createSession: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_new](payload))).pipe( + Effect.tap((response) => + Effect.sync(() => { + activeSessionId = response.sessionId; + }), + ), + ), + loadSession: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_load](payload))), + listSessions: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_list](payload))), + forkSession: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_fork](payload))), + resumeSession: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_resume](payload))), + closeSession: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_close](payload))), + setSessionModel: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_set_model](payload))), setSessionConfigOption: (payload) => - callRpc(rpc[AGENT_METHODS.session_set_config_option](payload)), - prompt: (payload) => callRpc(rpc[AGENT_METHODS.session_prompt](payload)), + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_set_config_option](payload))), + prompt: (payload) => + withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_prompt](payload))), cancel: (payload) => transport.notify(AGENT_METHODS.session_cancel, payload), }, handleRequestPermission: (handler) =>