Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/agent/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const logger = createLogger("loop");
const MAX_TOOL_CALLS_PER_TURN = 10;
const MAX_CONSECUTIVE_ERRORS = 5;
const MAX_REPETITIVE_TURNS = 3;
let localWorkerRecoveryDone = false;

export interface AgentLoopOptions {
identity: AutomatonIdentity;
Expand Down Expand Up @@ -350,6 +351,25 @@ export async function runAgentLoop(
}
}

// One-time per process: reset tasks assigned to local workers from a previous process.
// Local workers are in-process async tasks that die with the process,
// so any 'assigned' tasks with local:// addresses are stale after restart.
if (!localWorkerRecoveryDone && hasTable(db.raw, "task_graph")) {
localWorkerRecoveryDone = true;
const staleLocalTasks = db.raw.prepare(
"SELECT id, assigned_to FROM task_graph WHERE status = 'assigned' AND assigned_to LIKE 'local://%'",
).all() as { id: string; assigned_to: string }[];
for (const task of staleLocalTasks) {
logger.info("Resetting stale local worker task from previous process", {
taskId: task.id,
worker: task.assigned_to,
});
db.raw.prepare(
"UPDATE task_graph SET status = 'pending', assigned_to = NULL, started_at = NULL WHERE id = ?",
).run(task.id);
}
}

// Set start time
if (!db.getKV("start_time")) {
db.setKV("start_time", new Date().toISOString());
Expand Down
21 changes: 20 additions & 1 deletion src/orchestration/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ export class Orchestrator {
const assignedTasks = getTasksByGoal(this.params.db, goal.id)
.filter((t) => t.status === "assigned" && t.assignedTo);
for (const task of assignedTasks) {
// Local workers are in-process async tasks — they either complete
// (calling completeTask/failTask) or die with the process.
// Per-tick recovery only applies to remote sandbox workers.
if (task.assignedTo?.startsWith("local://")) {
continue;
}
const alive = this.params.isWorkerAlive(task.assignedTo!);
if (!alive) {
logger.warn("Recovering stale task from dead worker", {
Expand Down Expand Up @@ -964,13 +970,26 @@ export class Orchestrator {
}

const phase = asPhase(parsed.phase);
return {
const state: OrchestratorState = {
phase: phase ?? DEFAULT_STATE.phase,
goalId: typeof parsed.goalId === "string" ? parsed.goalId : null,
replanCount: typeof parsed.replanCount === "number" ? Math.max(0, Math.floor(parsed.replanCount)) : 0,
failedTaskId: typeof parsed.failedTaskId === "string" ? parsed.failedTaskId : null,
failedError: typeof parsed.failedError === "string" ? parsed.failedError : null,
};

// Validate goalId still points to a goal the orchestrator should track.
// Reset only for user-initiated terminal states (completed, cancelled)
// or missing goals. "failed" goals are kept because the orchestrator's
// own replan/failure flow puts goals into "failed" status.
if (state.goalId) {
const goal = getGoalById(this.params.db, state.goalId);
if (!goal || (["completed", "cancelled"].includes(goal.status) && state.phase !== "complete")) {
return { ...DEFAULT_STATE };
}
}

return state;
}

private saveState(state: OrchestratorState): void {
Expand Down
Loading