Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions t3code/packages/effect-acp/.provenance.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"agent_name": "Hermes Agent (zhangjiayang6835-cyber)",
"config_snapshot": "AGENTS.md loaded for t3code before submission:\n# AGENTS.md\n\n## Task Completion Requirements\n\n- All of `bun fmt`, `bun lint`, and `bun typecheck` must pass before considering tasks completed.\n- NEVER run `bun test`. Always use `bun run test` (runs Vitest).\n\n## Project Snapshot\n\nT3 Code is a minimal web GUI for using coding agents like Codex and Claude.\n\nThis repository is a VERY EARLY WIP. Proposing sweeping changes that improve long-term maintainability is encouraged.\n\n## Core Priorities\n\n1. Performance first.\n2. Reliability first.\n3. Keep behavior predictable under load and during failures (session restarts, reconnects, partial streams).\n\nIf a tradeoff is required, choose correctness and robustness over short-term convenience.\n\n## Maintainability\n\nLong term maintainability is a core priority. If you add new functionality, first check if there is shared logic that can be extracted to a separate module. Duplicate logic across multiple files is a code smell and should be avoided. Don't be afraid to change existing code. Don't take shortcuts by just adding local logic to solve a problem.\n\n## Package Roles\n\n- `apps/server`: Node.js WebSocket server. Wraps Codex app-server (JSON-RPC over stdio), serves the React web app, and manages provider sessions.\n- `apps/web`: React/Vite UI. Owns session UX, conversation/event rendering, and client-side state. Connects to the server via WebSocket.\n- `packages/contracts`: Shared effect/Schema schemas and TypeScript contracts for provider events, WebSocket protocol, and model/session types. Keep this package schema-only — no runtime logic.\n- `packages/shared`: Shared runtime utilities consumed by both server and web.\n\n## Codex App Server (Important)\n\nT3 Code is currently Codex-first. The server starts `codex app-server` (JSON-RPC over stdio) per provider session, then streams structured events to the browser through WebSocket push messages.\n\nHow we use it in this codebase:\n\n- Session startup/resume and turn lifecycle are brokered in `apps/server/src/codexAppServerManager.ts`.\n- Provider dispatch and thread event logging are coordinated in `apps/server/src/providerManager.ts`.\n- WebSocket server routes NativeApi methods in `apps/server/src/wsServer.ts`.\n- Web app consumes orchestration domain events via WebSocket push on channel `orchestration.domainEvent` (provider runtime activity is projected into orchestration events server-side).\n\nIssue #829 requirements: implement automatic token refresh in ACP client for 401 Unauthorized responses using Effect.retry with a custom schedule. Store refresh token separately. Add onSessionExpired callback. Use Effect.acquireRelease for cleanup. Queue concurrent requests during re-auth. Include tests and provenance.json.",
"created": "2026-05-27T17:30:00Z"
}
100 changes: 100 additions & 0 deletions t3code/packages/effect-acp/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as Schema from "effect/Schema";
import * as Scope from "effect/Scope";
import * as Stream from "effect/Stream";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";

import * as NodeServices from "@effect/platform-node/NodeServices";
import { it, assert } from "@effect/vitest";
Expand All @@ -24,6 +25,18 @@ const InitializeRequest = jsonRpcRequest("initialize", AcpSchema.InitializeReque
const InitializeResponse = jsonRpcResponse(AcpSchema.InitializeResponse);
const ExtRequest = jsonRpcRequest("x/test", Schema.Struct({ hello: Schema.String }));
const ExtResponse = jsonRpcResponse(Schema.Struct({ ok: Schema.Boolean }));
const AuthenticateRequest = jsonRpcRequest("authenticate", AcpSchema.AuthenticateRequest);
const AuthenticateResponse = jsonRpcResponse(AcpSchema.AuthenticateResponse);
const PromptRequest = jsonRpcRequest("session/prompt", AcpSchema.PromptRequest);
const PromptResponse = jsonRpcResponse(AcpSchema.PromptResponse);
const encoder = new TextEncoder();
const rpcParser = RpcSerialization.ndJsonRpc().makeUnsafe();
const encodeRpcMessage = (message: unknown) => {
const encoded = (
rpcParser as { encode: (message: unknown) => string | Uint8Array | undefined }
).encode(message);
return typeof encoded === "string" ? encoder.encode(encoded) : (encoded ?? encoder.encode(""));
};
const mockPeerPath = Effect.map(Effect.service(Path.Path), (path) =>
path.join(import.meta.dirname, "../test/fixtures/acp-mock-peer.ts"),
);
Expand Down Expand Up @@ -446,4 +459,91 @@ it.layer(NodeServices.layer)("effect-acp client", (it) => {
yield* Scope.close(scope, Exit.void);
}),
);

it.effect("refreshes authentication once and replays a request after 401", () =>
Effect.gen(function* () {
const expiredSessions = yield* Ref.make<Array<string>>([]);
const { stdio, input, output } = yield* makeInMemoryStdio();
const scope = yield* Scope.make();
const acp = yield* AcpClient.make(stdio, {
onSessionExpired: (sessionId) =>
Ref.update(expiredSessions, (sessions) => [...sessions, sessionId]),
}).pipe(Effect.provideService(Scope.Scope, scope));

const authFiber = yield* acp.agent
.authenticate({ methodId: "cursor_login" })
.pipe(Effect.forkScoped);
const initialAuth = yield* Schema.decodeEffect(Schema.fromJsonString(AuthenticateRequest))(
yield* Queue.take(output),
);
yield* Queue.offer(
input,
yield* encodeJsonl(AuthenticateResponse, {
jsonrpc: "2.0",
id: initialAuth.id,
result: { _meta: { refreshToken: "refresh-1" } },
}),
);
yield* Fiber.join(authFiber);

const promptPayload: AcpSchema.PromptRequest = {
sessionId: "expired-session",
prompt: [{ type: "text", text: "hello" }],
};
const promptFiber = yield* acp.agent.prompt(promptPayload).pipe(Effect.forkScoped);

const firstPrompt = yield* Schema.decodeEffect(Schema.fromJsonString(PromptRequest))(
yield* Queue.take(output),
);
yield* Queue.offer(
input,
encodeRpcMessage({
_tag: "Exit",
requestId: String(firstPrompt.id),
exit: {
_tag: "Failure",
cause: [
{
_tag: "Fail",
error: { code: -32000, message: "401 Unauthorized" },
},
],
},
}),
);

const refreshAuth = yield* Schema.decodeEffect(Schema.fromJsonString(AuthenticateRequest))(
yield* Queue.take(output),
);
assert.deepEqual(refreshAuth.params, {
methodId: "cursor_login",
_meta: { refreshToken: "refresh-1" },
});
yield* Queue.offer(
input,
yield* encodeJsonl(AuthenticateResponse, {
jsonrpc: "2.0",
id: refreshAuth.id,
result: { _meta: { refreshToken: "refresh-2" } },
}),
);

const replayedPrompt = yield* Schema.decodeEffect(Schema.fromJsonString(PromptRequest))(
yield* Queue.take(output),
);
assert.deepEqual(replayedPrompt.params, firstPrompt.params);
yield* Queue.offer(
input,
yield* encodeJsonl(PromptResponse, {
jsonrpc: "2.0",
id: replayedPrompt.id,
result: { stopReason: "end_turn" },
}),
);

assert.deepEqual(yield* Fiber.join(promptFiber), { stopReason: "end_turn" });
assert.deepEqual(yield* Ref.get(expiredSessions), ["unknown"]);
yield* Scope.close(scope, Exit.void);
}),
);
});
117 changes: 107 additions & 10 deletions t3code/packages/effect-acp/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as Stdio from "effect/Stdio";
import * as Layer from "effect/Layer";
import * as Schema from "effect/Schema";
import * as Scope from "effect/Scope";
import * as Semaphore from "effect/Semaphore";
import * as Stream from "effect/Stream";
import * as RpcClient from "effect/unstable/rpc/RpcClient";
import * as RpcServer from "effect/unstable/rpc/RpcServer";
Expand All @@ -26,6 +27,7 @@ export interface AcpClientOptions {
readonly logIncoming?: boolean;
readonly logOutgoing?: boolean;
readonly logger?: (event: AcpProtocol.AcpProtocolLogEvent) => Effect.Effect<void, never>;
readonly onSessionExpired?: (sessionId: string) => Effect.Effect<void, never>;
}

type AcpClientRaw = {
Expand Down Expand Up @@ -330,6 +332,30 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* (
let unknownExtNotificationHandler:
| ((method: string, params: unknown) => Effect.Effect<void, AcpError.AcpError>)
| undefined;
let activeSessionId: string | undefined;
let lastAuthPayload: AcpSchema.AuthenticateRequest | undefined;
let refreshToken: unknown;

const isAuthExpiredError = (error: AcpError.AcpError) => {
const message =
error._tag === "AcpRequestError"
? error.errorMessage
: error._tag === "AcpTransportError" || error._tag === "AcpProtocolParseError"
? error.detail
: error.message;
return (
(error._tag === "AcpRequestError" && error.code === -32000) ||
/\b(401|unauthori[sz]ed|auth(?:entication)? required|session expired)\b/i.test(message)
);
};

const updateRefreshToken = (response: {
readonly _meta?: { readonly [x: string]: unknown } | null;
}) => {
if (response._meta && "refreshToken" in response._meta) {
refreshToken = response._meta.refreshToken;
}
};

const runNotificationHandlers = <A>(
registration: BufferedNotificationHandler<A>,
Expand Down Expand Up @@ -455,6 +481,63 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* (
const rpc = yield* RpcClient.make(AcpRpcs.AgentRpcs, {
generateRequestId: () => nextRpcRequestId++ as never,
}).pipe(Effect.provideService(RpcClient.Protocol, transport.clientProtocol));
const refreshSemaphore = yield* Semaphore.make(1);

const authenticateRaw: (
payload: AcpSchema.AuthenticateRequest,
) => Effect.Effect<AcpSchema.AuthenticateResponse, AcpError.AcpError> = (payload) =>
callRpc(rpc[AGENT_METHODS.authenticate](payload)).pipe(
Effect.tap((response) =>
Effect.sync(() => {
lastAuthPayload = payload;
updateRefreshToken(response);
}),
),
);

const reauthenticateOnce = Effect.fn("effect-acp/AcpClient.reauthenticateOnce")(function* () {
if (!lastAuthPayload) {
return yield* AcpError.AcpRequestError.authRequired(
"ACP session expired and no authentication payload is available",
);
}

const sessionId = activeSessionId ?? "unknown";
yield* Effect.scoped(
Effect.acquireRelease(
options.onSessionExpired ? options.onSessionExpired(sessionId) : Effect.void,
() => Effect.void,
),
);

const payload =
refreshToken === undefined
? lastAuthPayload
: {
...lastAuthPayload,
_meta: {
...(lastAuthPayload._meta ?? {}),
refreshToken,
},
};
yield* authenticateRaw(payload);
});

const ensureReauthenticated = (observedRefreshToken: unknown) =>
refreshSemaphore.withPermit(
Effect.suspend(() =>
Object.is(observedRefreshToken, refreshToken) ? reauthenticateOnce() : Effect.void,
),
);

const withAuthRefresh = <A>(request: () => Effect.Effect<A, AcpError.AcpError>) =>
request().pipe(
Effect.catch((error: AcpError.AcpError) =>
isAuthExpiredError(error)
? ensureReauthenticated(refreshToken).pipe(Effect.flatMap(() => request()))
: Effect.fail(error),
),
);

return AcpClient.of({
raw: {
Expand All @@ -464,18 +547,32 @@ export const make = Effect.fn("effect-acp/AcpClient.make")(function* (
},
agent: {
initialize: (payload) => callRpc(rpc[AGENT_METHODS.initialize](payload)),
authenticate: (payload) => callRpc(rpc[AGENT_METHODS.authenticate](payload)),
authenticate: (payload) => authenticateRaw(payload),
logout: (payload) => callRpc(rpc[AGENT_METHODS.logout](payload)),
createSession: (payload) => callRpc(rpc[AGENT_METHODS.session_new](payload)),
loadSession: (payload) => callRpc(rpc[AGENT_METHODS.session_load](payload)),
listSessions: (payload) => callRpc(rpc[AGENT_METHODS.session_list](payload)),
forkSession: (payload) => callRpc(rpc[AGENT_METHODS.session_fork](payload)),
resumeSession: (payload) => callRpc(rpc[AGENT_METHODS.session_resume](payload)),
closeSession: (payload) => callRpc(rpc[AGENT_METHODS.session_close](payload)),
setSessionModel: (payload) => callRpc(rpc[AGENT_METHODS.session_set_model](payload)),
createSession: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_new](payload))).pipe(
Effect.tap((response) =>
Effect.sync(() => {
activeSessionId = response.sessionId;
}),
),
),
loadSession: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_load](payload))),
listSessions: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_list](payload))),
forkSession: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_fork](payload))),
resumeSession: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_resume](payload))),
closeSession: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_close](payload))),
setSessionModel: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_set_model](payload))),
setSessionConfigOption: (payload) =>
callRpc(rpc[AGENT_METHODS.session_set_config_option](payload)),
prompt: (payload) => callRpc(rpc[AGENT_METHODS.session_prompt](payload)),
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_set_config_option](payload))),
prompt: (payload) =>
withAuthRefresh(() => callRpc(rpc[AGENT_METHODS.session_prompt](payload))),
cancel: (payload) => transport.notify(AGENT_METHODS.session_cancel, payload),
},
handleRequestPermission: (handler) =>
Expand Down
Loading