Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ async function handlePrompt(
timeoutMs: globalFlags.timeout,
ttlMs: globalFlags.ttl,
maxQueueDepth: config.queueMaxDepth,
promptRetries: globalFlags.promptRetries,
verbose: globalFlags.verbose,
waitForCompletion: flags.wait !== false,
});
Expand Down Expand Up @@ -364,6 +365,7 @@ async function handleExec(
suppressSdkConsoleErrors: outputPolicy.suppressSdkConsoleErrors,
timeoutMs: globalFlags.timeout,
verbose: globalFlags.verbose,
promptRetries: globalFlags.promptRetries,
sessionOptions: {
model: globalFlags.model,
allowedTools: globalFlags.allowedTools,
Expand Down
15 changes: 15 additions & 0 deletions src/cli/flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type GlobalFlags = PermissionFlags & {
model?: string;
allowedTools?: string[];
maxTurns?: number;
promptRetries?: number;
};

export type PromptFlags = {
Expand Down Expand Up @@ -153,6 +154,14 @@ export function parseMaxTurns(value: string): number {
return parsed;
}

export function parsePromptRetries(value: string): number {
const parsed = Number(value);
if (!Number.isInteger(parsed) || parsed < 0) {
throw new InvalidArgumentError("Prompt retries must be a non-negative integer");
}
return parsed;
}

export function resolvePermissionMode(
flags: PermissionFlags,
defaultMode: PermissionMode,
Expand Down Expand Up @@ -200,6 +209,11 @@ export function addGlobalFlags(command: Command): Command {
parseAllowedTools,
)
.option("--max-turns <count>", "Maximum turns for the session", parseMaxTurns)
.option(
"--prompt-retries <count>",
"Retry failed prompt turns on transient errors (default: 0)",
parsePromptRetries,
)
.option(
"--json-strict",
"Strict JSON mode: requires --format json and suppresses non-JSON stderr output",
Expand Down Expand Up @@ -285,6 +299,7 @@ export function resolveGlobalFlags(command: Command, config: ResolvedAcpxConfig)
model: typeof opts.model === "string" ? parseNonEmptyValue("Model", opts.model) : undefined,
allowedTools: Array.isArray(opts.allowedTools) ? opts.allowedTools : undefined,
maxTurns: typeof opts.maxTurns === "number" ? opts.maxTurns : undefined,
promptRetries: typeof opts.promptRetries === "number" ? opts.promptRetries : undefined,
approveAll: opts.approveAll ? true : undefined,
approveReads: opts.approveReads ? true : undefined,
denyAll: opts.denyAll ? true : undefined,
Expand Down
43 changes: 43 additions & 0 deletions src/error-normalization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,49 @@ export function normalizeOutputError(
};
}

/**
* Returns true when an error from `client.prompt()` looks transient and
* can reasonably be retried (e.g. model-API 400/500, network hiccups that
* surface as ACP internal errors).
*
* Errors that are definitively non-recoverable (auth, missing session,
* invalid params, timeout, permission) return false.
*/
export function isRetryablePromptError(error: unknown): boolean {
if (error instanceof PermissionDeniedError || error instanceof PermissionPromptUnavailableError) {
return false;
}
if (isTimeoutLike(error) || isNoSessionLike(error) || isUsageLike(error)) {
return false;
}

// Extract ACP payload once and reuse for all subsequent checks.
const acp = extractAcpError(error);
if (!acp) {
// Non-ACP errors (e.g. process crash) are not retried at the prompt level.
return false;
}

// Resource-not-found (session gone) — check using the already-extracted payload.
if (acp.code === -32001 || acp.code === -32002) {
return false;
}

// Auth-required errors are never retryable. Use the same thorough check as normalizeOutputError.
if (isAcpAuthRequiredPayload(acp)) {
return false;
}

// Method-not-found or invalid-params are permanent protocol errors.
if (acp.code === -32601 || acp.code === -32602) {
return false;
}

// ACP internal errors (-32603) typically wrap model-API failures → retryable.
// Parse errors (-32700) can also be transient.
return acp.code === -32603 || acp.code === -32700;
}

export function exitCodeForOutputErrorCode(code: OutputErrorCode): ExitCode {
switch (code) {
case "USAGE":
Expand Down
4 changes: 4 additions & 0 deletions src/queue-owner-env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ export function parseQueueOwnerPayload(raw: string): QueueOwnerRuntimeOptions {
options.maxQueueDepth = Math.max(1, Math.round(record.maxQueueDepth));
}

if (typeof record.promptRetries === "number" && Number.isFinite(record.promptRetries)) {
options.promptRetries = Math.max(0, Math.round(record.promptRetries));
}

return options;
}

Expand Down
153 changes: 100 additions & 53 deletions src/session-runtime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import fs from "node:fs/promises";
import path from "node:path";
import { AcpClient } from "./client.js";
import { formatErrorMessage, normalizeOutputError } from "./error-normalization.js";
import {
formatErrorMessage,
isRetryablePromptError,
normalizeOutputError,
} from "./error-normalization.js";
import { checkpointPerfMetricsCapture } from "./perf-metrics-capture.js";
import { formatPerfMetric, measurePerf, setPerfGauge, startPerfTimer } from "./perf-metrics.js";
import { refreshQueueOwnerLease } from "./queue-lease-store.js";
Expand Down Expand Up @@ -116,6 +120,7 @@ export type RunOnceOptions = {
suppressSdkConsoleErrors?: boolean;
verbose?: boolean;
sessionOptions?: SessionAgentOptions;
promptRetries?: number;
} & TimedRunOptions;

export type SessionCreateOptions = {
Expand Down Expand Up @@ -147,6 +152,7 @@ export type SessionSendOptions = {
waitForCompletion?: boolean;
ttlMs?: number;
maxQueueDepth?: number;
promptRetries?: number;
} & TimedRunOptions;

export type SessionEnsureOptions = {
Expand Down Expand Up @@ -219,6 +225,7 @@ type RunSessionPromptOptions = {
timeoutMs?: number;
suppressSdkConsoleErrors?: boolean;
verbose?: boolean;
promptRetries?: number;
onClientAvailable?: (controller: ActiveSessionController) => void;
onClientClosed?: () => void;
onPromptActive?: () => Promise<void> | void;
Expand Down Expand Up @@ -399,6 +406,7 @@ async function runQueuedTask(
authCredentials?: Record<string, string>;
authPolicy?: AuthPolicy;
suppressSdkConsoleErrors?: boolean;
promptRetries?: number;
onClientAvailable?: (controller: ActiveSessionController) => void;
onClientClosed?: () => void;
onPromptActive?: () => Promise<void> | void;
Expand All @@ -422,6 +430,7 @@ async function runQueuedTask(
timeoutMs: task.timeoutMs,
suppressSdkConsoleErrors: task.suppressSdkConsoleErrors ?? options.suppressSdkConsoleErrors,
verbose: options.verbose,
promptRetries: options.promptRetries,
onClientAvailable: options.onClientAvailable,
onClientClosed: options.onClientClosed,
onPromptActive: options.onPromptActive,
Expand Down Expand Up @@ -616,63 +625,82 @@ async function runSessionPrompt(options: RunSessionPromptOptions): Promise<Sessi
});
await flushPendingMessages(false);

const maxRetries = options.promptRetries ?? 0;
let response;
try {
const promptStartedAt = Date.now();
const promptPromise = client.prompt(activeSessionId, options.prompt);
if (options.onPromptActive) {
try {
await options.onPromptActive();
} catch (error) {
if (options.verbose) {
process.stderr.write(
"[acpx] onPromptActive hook failed: " + formatErrorMessage(error) + "\n",
);
for (let attempt = 0; ; attempt++) {
try {
const promptStartedAt = Date.now();
const promptPromise = client.prompt(activeSessionId, options.prompt);
if (attempt === 0 && options.onPromptActive) {
try {
await options.onPromptActive();
} catch (error) {
if (options.verbose) {
process.stderr.write(
"[acpx] onPromptActive hook failed: " + formatErrorMessage(error) + "\n",
);
}
}
}
}
response = await measurePerf("runtime.prompt.agent_turn", async () => {
return await withTimeout(promptPromise, options.timeoutMs);
});
if (options.verbose) {
process.stderr.write(
`[acpx] ${formatPerfMetric("prompt.agent_turn", Date.now() - promptStartedAt)}\n`,
);
}
} catch (error) {
const snapshot = client.getAgentLifecycleSnapshot();
applyLifecycleSnapshotToRecord(record, snapshot);
if (snapshot.lastExit?.unexpectedDuringPrompt && options.verbose) {
process.stderr.write(
"[acpx] agent disconnected during prompt (" +
snapshot.lastExit.reason +
", exit=" +
snapshot.lastExit.exitCode +
", signal=" +
(snapshot.lastExit.signal ?? "none") +
")\n",
);
}
response = await measurePerf("runtime.prompt.agent_turn", async () => {
return await withTimeout(promptPromise, options.timeoutMs);
});
if (options.verbose) {
process.stderr.write(
`[acpx] ${formatPerfMetric("prompt.agent_turn", Date.now() - promptStartedAt)}\n`,
);
}
break;
} catch (error) {
const snapshot = client.getAgentLifecycleSnapshot();
const agentCrashed = snapshot.lastExit?.unexpectedDuringPrompt === true;

// Retry if: retries remain, agent is still alive, error is transient.
if (attempt < maxRetries && !agentCrashed && isRetryablePromptError(error)) {
const delayMs = Math.min(1_000 * 2 ** attempt, 10_000);
process.stderr.write(
`[acpx] prompt failed (${formatErrorMessage(error)}), retrying in ${delayMs}ms ` +
`(attempt ${attempt + 1}/${maxRetries})\n`,
);
await waitMs(delayMs);
continue;
}

const normalizedError = normalizeOutputError(error, {
origin: "runtime",
});
applyLifecycleSnapshotToRecord(record, snapshot);
const lastExit = snapshot.lastExit;
if (lastExit?.unexpectedDuringPrompt && options.verbose) {
process.stderr.write(
"[acpx] agent disconnected during prompt (" +
lastExit.reason +
", exit=" +
lastExit.exitCode +
", signal=" +
(lastExit.signal ?? "none") +
")\n",
);
}

await flushPendingMessages(false).catch(() => {
// best effort while bubbling prompt failure
});
const normalizedError = normalizeOutputError(error, {
origin: "runtime",
});

await flushPendingMessages(false).catch(() => {
// best effort while bubbling prompt failure
});

output.flush();
output.flush();

record.lastUsedAt = isoNow();
applyConversation(record, conversation);
record.acpx = acpxState;
record.lastUsedAt = isoNow();
applyConversation(record, conversation);
record.acpx = acpxState;

const propagated = error instanceof Error ? error : new Error(formatErrorMessage(error));
(propagated as { outputAlreadyEmitted?: boolean }).outputAlreadyEmitted = sawAcpMessage;
(propagated as { normalizedOutputError?: unknown }).normalizedOutputError =
normalizedError;
throw propagated;
const propagated =
error instanceof Error ? error : new Error(formatErrorMessage(error));
(propagated as { outputAlreadyEmitted?: boolean }).outputAlreadyEmitted = sawAcpMessage;
(propagated as { normalizedOutputError?: unknown }).normalizedOutputError =
normalizedError;
throw propagated;
}
}

await flushPendingMessages(false);
Expand Down Expand Up @@ -769,9 +797,27 @@ export async function runOnce(options: RunOnceOptions): Promise<RunPromptResult>
sessionId,
});

const response = await measurePerf("runtime.exec.prompt", async () => {
return await withTimeout(client.prompt(sessionId, options.prompt), options.timeoutMs);
});
const maxRetries = options.promptRetries ?? 0;
let response;
for (let attempt = 0; ; attempt++) {
try {
response = await measurePerf("runtime.exec.prompt", async () => {
return await withTimeout(client.prompt(sessionId, options.prompt), options.timeoutMs);
});
break;
} catch (error) {
if (attempt < maxRetries && isRetryablePromptError(error)) {
const delayMs = Math.min(1_000 * 2 ** attempt, 10_000);
process.stderr.write(
`[acpx] prompt failed (${formatErrorMessage(error)}), retrying in ${delayMs}ms ` +
`(attempt ${attempt + 1}/${maxRetries})\n`,
);
await waitMs(delayMs);
continue;
}
throw error;
}
}
output.flush();
return toPromptResult(response.stopReason, sessionId, client);
},
Expand Down Expand Up @@ -1083,6 +1129,7 @@ export async function runSessionQueueOwner(options: QueueOwnerRuntimeOptions): P
authCredentials: options.authCredentials,
authPolicy: options.authPolicy,
suppressSdkConsoleErrors: options.suppressSdkConsoleErrors,
promptRetries: options.promptRetries,
onClientAvailable: setActiveController,
onClientClosed: clearActiveController,
onPromptActive: async () => {
Expand Down
3 changes: 3 additions & 0 deletions src/session-runtime/queue-owner-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type QueueOwnerRuntimeOptions = {
verbose?: boolean;
ttlMs?: number;
maxQueueDepth?: number;
promptRetries?: number;
};

type SessionSendLike = {
Expand All @@ -31,6 +32,7 @@ type SessionSendLike = {
verbose?: boolean;
ttlMs?: number;
maxQueueDepth?: number;
promptRetries?: number;
};

export function resolveQueueOwnerSpawnArgs(argv: readonly string[] = process.argv): string[] {
Expand All @@ -56,6 +58,7 @@ export function queueOwnerRuntimeOptionsFromSend(
verbose: options.verbose,
ttlMs: options.ttlMs,
maxQueueDepth: options.maxQueueDepth,
promptRetries: options.promptRetries,
};
}

Expand Down
Loading