From aa5028049ed531d73ae82c7c5394983abf42c089 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 19:04:11 -0500 Subject: [PATCH 01/13] --- src/lib/__tests__/task-store.test.ts | 318 ++++++++++++++++++ src/lib/task-store.ts | 168 +++++++++ src/orchestrator/__tests__/dispatcher.test.ts | 246 ++++++++++++++ src/orchestrator/dispatcher.ts | 90 ++++- 4 files changed, 814 insertions(+), 8 deletions(-) create mode 100644 src/lib/__tests__/task-store.test.ts create mode 100644 src/lib/task-store.ts diff --git a/src/lib/__tests__/task-store.test.ts b/src/lib/__tests__/task-store.test.ts new file mode 100644 index 00000000..5fb3197c --- /dev/null +++ b/src/lib/__tests__/task-store.test.ts @@ -0,0 +1,318 @@ +/** + * Tests for NativeTaskStore. + * + * REQ-014 — coexistence check via hasNativeTasks() + * REQ-017 — dispatcher native store: SELECT WHERE status=ready + * REQ-020 — atomic claim, backward-compatible null taskId + */ + +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { ForemanStore } from "../store.js"; +import { NativeTaskStore } from "../task-store.js"; +import { randomUUID } from "node:crypto"; + +// ── Helpers ────────────────────────────────────────────────────────────── + +function makeTmpDir() { + return mkdtempSync(join(tmpdir(), "foreman-task-store-test-")); +} + +function makeStores(tmpDir: string) { + const foremanStore = new ForemanStore(join(tmpDir, "foreman.db")); + const taskStore = new NativeTaskStore(foremanStore.getDb()); + return { foremanStore, taskStore }; +} + +function insertTask( + db: ReturnType, + opts: Partial<{ + id: string; + title: string; + status: string; + priority: number; + type: string; + }> = {}, +) { + const id = opts.id ?? randomUUID(); + const now = new Date().toISOString(); + db.prepare( + `INSERT INTO tasks (id, title, description, type, priority, status, created_at, updated_at) + VALUES (?, ?, NULL, ?, ?, ?, ?, ?)`, + ).run( + id, + opts.title ?? "Test Task", + opts.type ?? "task", + opts.priority ?? 2, + opts.status ?? "ready", + now, + now, + ); + return id; +} + +// ── Test suite ─────────────────────────────────────────────────────────── + +describe("NativeTaskStore.hasNativeTasks()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("returns false when tasks table is empty", () => { + expect(taskStore.hasNativeTasks()).toBe(false); + }); + + it("returns true when at least one task exists", () => { + insertTask(foremanStore.getDb()); + expect(taskStore.hasNativeTasks()).toBe(true); + }); + + it("returns false after all tasks are deleted", () => { + const id = insertTask(foremanStore.getDb()); + expect(taskStore.hasNativeTasks()).toBe(true); + foremanStore.getDb().prepare("DELETE FROM tasks WHERE id = ?").run(id); + expect(taskStore.hasNativeTasks()).toBe(false); + }); +}); + +describe("NativeTaskStore.list()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("returns all tasks when no filter is provided", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "ready" }); + insertTask(db, { status: "in-progress" }); + insertTask(db, { status: "backlog" }); + + const tasks = taskStore.list(); + expect(tasks).toHaveLength(3); + }); + + it("filters by status when opts.status is provided", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "ready", title: "R1" }); + insertTask(db, { status: "ready", title: "R2" }); + insertTask(db, { status: "in-progress", title: "IP1" }); + + const readyTasks = taskStore.list({ status: "ready" }); + expect(readyTasks).toHaveLength(2); + expect(readyTasks.every((t) => t.status === "ready")).toBe(true); + }); + + it("returns an empty array when no tasks match the filter", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "backlog" }); + + const readyTasks = taskStore.list({ status: "ready" }); + expect(readyTasks).toHaveLength(0); + }); + + it("maps task rows to Issue objects", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { title: "My Task", status: "ready", priority: 1, type: "bug" }); + + const tasks = taskStore.list({ status: "ready" }); + expect(tasks).toHaveLength(1); + const task = tasks[0]; + expect(task.id).toBe(id); + expect(task.title).toBe("My Task"); + expect(task.status).toBe("ready"); + expect(task.priority).toBe("1"); + expect(task.type).toBe("bug"); + expect(task.assignee).toBeNull(); + expect(task.parent).toBeNull(); + }); + + it("sorts by priority ASC, then created_at ASC", () => { + const db = foremanStore.getDb(); + // Insert in reverse priority order + const idLow = insertTask(db, { priority: 3, title: "Low" }); + const idHigh = insertTask(db, { priority: 0, title: "High" }); + const idMed = insertTask(db, { priority: 2, title: "Med" }); + + const tasks = taskStore.list(); + expect(tasks[0].id).toBe(idHigh); // priority 0 + expect(tasks[1].id).toBe(idMed); // priority 2 + expect(tasks[2].id).toBe(idLow); // priority 3 + }); +}); + +describe("NativeTaskStore.claim()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + /** + * Insert a minimal run record to satisfy the FK constraint on tasks.run_id. + * The project_id is "proj-test" (no FK on projects in this minimal setup). + */ + function insertRun(db: ReturnType, runId: string) { + // Disable FK enforcement temporarily to insert without a matching project. + db.pragma("foreign_keys = OFF"); + const now = new Date().toISOString(); + db.prepare( + `INSERT INTO runs (id, project_id, seed_id, agent_type, status, created_at) + VALUES (?, 'proj-test', 'seed-test', 'claude-code', 'running', ?)`, + ).run(runId, now); + db.pragma("foreign_keys = ON"); + } + + it("sets status to in-progress and records run_id", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-123"; + insertRun(db, runId); + + taskStore.claim(id, runId); + + const row = db.prepare("SELECT status, run_id FROM tasks WHERE id = ?").get(id) as + | { status: string; run_id: string | null } + | undefined; + expect(row?.status).toBe("in-progress"); + expect(row?.run_id).toBe(runId); + }); + + it("is idempotent when called again with the same runId", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-123"; + insertRun(db, runId); + + taskStore.claim(id, runId); + // Calling again with the same runId should not throw + expect(() => taskStore.claim(id, runId)).not.toThrow(); + }); + + it("throws when the task is already claimed by a different run", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + insertRun(db, "run-111"); + insertRun(db, "run-222"); + + taskStore.claim(id, "run-111"); + expect(() => taskStore.claim(id, "run-222")).toThrow(/already claimed/); + }); + + it("throws when the task does not exist", () => { + expect(() => taskStore.claim("nonexistent-task", "run-xyz")).toThrow(/not found/); + }); + + it("updates updated_at timestamp", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-001"; + insertRun(db, runId); + const before = new Date().toISOString(); + + taskStore.claim(id, runId); + + const row = db.prepare("SELECT updated_at FROM tasks WHERE id = ?").get(id) as + | { updated_at: string } + | undefined; + expect(row?.updated_at).toBeDefined(); + expect(row!.updated_at >= before).toBe(true); + }); +}); + +describe("NativeTaskStore.updatePhase()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("updates the task status to the given phase", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + taskStore.updatePhase(id, "developer"); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("developer"); + }); + + it("is a no-op when taskId is null (beads fallback mode)", () => { + // Should not throw and should not touch any rows + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + expect(() => taskStore.updatePhase(null, "developer")).not.toThrow(); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("in-progress"); // unchanged + }); +}); + +describe("NativeTaskStore.updateStatus()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("updates the task status", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + taskStore.updateStatus(id, "merged"); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("merged"); + }); +}); diff --git a/src/lib/task-store.ts b/src/lib/task-store.ts new file mode 100644 index 00000000..cfbf6389 --- /dev/null +++ b/src/lib/task-store.ts @@ -0,0 +1,168 @@ +/** + * NativeTaskStore — wraps the native `tasks` SQLite table for use as a + * task-tracking back-end inside the Dispatcher. + * + * Implements a subset of ITaskClient focused on the Dispatcher's needs: + * - hasNativeTasks() — coexistence check (REQ-014) + * - list() — query tasks with optional status filter (REQ-017) + * - claim() — atomically claim a task for a run (REQ-020) + * - updatePhase() — update phase column (no-op when taskId is null) + * - updateStatus() — update task status + */ + +import type { Database } from "better-sqlite3"; +import type { Issue } from "./task-client.js"; + +// ── Row type matching TASKS_SCHEMA ─────────────────────────────────────── + +export interface TaskRow { + id: string; + title: string; + description: string | null; + type: string; + priority: number; + status: string; + run_id: string | null; + branch: string | null; + external_id: string | null; + created_at: string; + updated_at: string; + approved_at: string | null; + closed_at: string | null; +} + +// ── Helpers ────────────────────────────────────────────────────────────── + +/** + * Map a numeric priority (0–4) to the string format expected by Issue.priority. + * Stores the value as-is ("0"–"4") so normalizePriority() works correctly. + */ +function rowToIssue(row: TaskRow): Issue { + return { + id: row.id, + title: row.title, + type: row.type, + priority: String(row.priority), + status: row.status, + assignee: null, + parent: null, + created_at: row.created_at, + updated_at: row.updated_at, + description: row.description ?? null, + labels: [], + }; +} + +// ── NativeTaskStore ────────────────────────────────────────────────────── + +/** + * Provides read/write access to the `tasks` table inside the Foreman SQLite + * database. The `db` instance is obtained from `ForemanStore.getDb()`. + * + * Thread-safety: SQLite in WAL mode with busy_timeout=30 000 ms handles + * concurrent readers/writers; the claim() method uses a single synchronous + * transaction so it is effectively atomic within the same process. + */ +export class NativeTaskStore { + constructor(private readonly db: Database) {} + + /** + * Returns true when the `tasks` table contains at least one row. + * + * Used by Dispatcher.getReadyTasks() as a coexistence check: if native + * tasks exist, use the native path; otherwise fall back to BeadsRustClient. + * + * Also guards against the case where the schema migration has not yet run + * by catching SQLite errors (table not found) and returning false. + */ + hasNativeTasks(): boolean { + try { + const row = this.db + .prepare("SELECT COUNT(*) as cnt FROM tasks LIMIT 1") + .get() as { cnt: number } | undefined; + return (row?.cnt ?? 0) > 0; + } catch { + // Table may not exist (migration not yet applied) — treat as empty + return false; + } + } + + /** + * List tasks from the `tasks` table. + * + * @param opts.status — filter by exact status value (e.g. "ready") + */ + list(opts?: { status?: string }): Issue[] { + let sql = "SELECT * FROM tasks"; + const params: string[] = []; + + if (opts?.status) { + sql += " WHERE status = ?"; + params.push(opts.status); + } + + sql += " ORDER BY priority ASC, created_at ASC"; + + const rows = this.db.prepare(sql).all(...params) as TaskRow[]; + return rows.map(rowToIssue); + } + + /** + * Atomically claim a task: set status='in-progress' and run_id=runId + * in a single synchronous transaction. + * + * Throws if the task is already claimed by a different run (concurrent + * dispatch guard) or if the task does not exist. + */ + claim(id: string, runId: string): void { + const now = new Date().toISOString(); + + const tx = this.db.transaction(() => { + const row = this.db + .prepare("SELECT id, status, run_id FROM tasks WHERE id = ?") + .get(id) as { id: string; status: string; run_id: string | null } | undefined; + + if (!row) { + throw new Error(`NativeTaskStore.claim: task '${id}' not found`); + } + + // Allow re-claiming if already claimed by the same run (idempotent) + if (row.run_id && row.run_id !== runId) { + throw new Error( + `NativeTaskStore.claim: task '${id}' already claimed by run '${row.run_id}'`, + ); + } + + this.db + .prepare( + "UPDATE tasks SET status = 'in-progress', run_id = ?, updated_at = ? WHERE id = ?", + ) + .run(runId, now, id); + }); + + tx(); + } + + /** + * Update the phase of a task (used by pipeline-executor to record progress). + * No-op when taskId is null (beads fallback mode — REQ-020). + */ + updatePhase(taskId: string | null, phase: string): void { + if (!taskId) return; // beads fallback — no-op + + const now = new Date().toISOString(); + this.db + .prepare("UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?") + .run(phase, now, taskId); + } + + /** + * Update the status of a task. + */ + updateStatus(taskId: string, status: string): void { + const now = new Date().toISOString(); + this.db + .prepare("UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?") + .run(status, now, taskId); + } +} diff --git a/src/orchestrator/__tests__/dispatcher.test.ts b/src/orchestrator/__tests__/dispatcher.test.ts index 15316fb3..5963b9d1 100644 --- a/src/orchestrator/__tests__/dispatcher.test.ts +++ b/src/orchestrator/__tests__/dispatcher.test.ts @@ -1009,3 +1009,249 @@ describe("PLAN_STEP_CONFIG", () => { expect(PLAN_STEP_CONFIG.maxTurns).toBe(50); }); }); + +// ── NativeTaskStore integration ────────────────────────────────────────── + +import { NativeTaskStore } from "../../lib/task-store.js"; + +describe("Dispatcher.getReadyTasks() — native vs beads routing", () => { + function makeIssue(id: string): Issue { + return { + id, + title: `Task ${id}`, + status: "ready", + priority: "P2", + type: "task", + assignee: null, + parent: null, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }; + } + + function makeBeadsClient(issues: Issue[]): ITaskClient { + return { + ready: vi.fn().mockResolvedValue(issues), + show: vi.fn().mockResolvedValue({ status: "open" }), + update: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + list: vi.fn().mockResolvedValue([]), + }; + } + + function makeStore(): ForemanStore { + return { + getActiveRuns: vi.fn().mockReturnValue([]), + getProjectByPath: vi.fn().mockReturnValue({ id: "proj-1" }), + getRunsForSeed: vi.fn().mockReturnValue([]), + getRunsByStatus: vi.fn().mockReturnValue([]), + } as unknown as ForemanStore; + } + + it("uses beads fallback when no taskStore is injected", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp"); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + }); + + it("uses beads fallback when taskStore is injected but has no tasks", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + expect(mockTaskStore.list).not.toHaveBeenCalled(); + }); + + it("uses native store when taskStore is injected and has tasks", async () => { + const nativeIssues = [makeIssue("task-001")]; + const seedsClient = makeBeadsClient([makeIssue("bd-001")]); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(true), + list: vi.fn().mockReturnValue(nativeIssues), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(nativeIssues); + expect(mockTaskStore.list).toHaveBeenCalledWith({ status: "ready" }); + expect(seedsClient.ready).not.toHaveBeenCalled(); + }); + + it("forces beads when FOREMAN_TASK_STORE=beads even if native tasks exist", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(true), + list: vi.fn().mockReturnValue([makeIssue("task-001")]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "beads"; + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + expect(mockTaskStore.list).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); + + it("forces native store when FOREMAN_TASK_STORE=native even if no tasks", async () => { + const seedsClient = makeBeadsClient([makeIssue("bd-001")]); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "native"; + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual([]); + expect(mockTaskStore.list).toHaveBeenCalledWith({ status: "ready" }); + expect(seedsClient.ready).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); + + it("ignores invalid FOREMAN_TASK_STORE and uses auto-detection", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "invalid-value"; + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + // Should fall back to beads (hasNativeTasks() = false, so auto → beads) + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + // Should warn about invalid value + const warnCalls = consoleSpy.mock.calls.map((args) => args.join(" ")); + expect(warnCalls.some((msg) => msg.includes("not recognised"))).toBe(true); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + consoleSpy.mockRestore(); + } + }); + + it("FOREMAN_TASK_STORE=native is no-op when no taskStore injected", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "native"; + try { + // No taskStore injected + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp"); + const tasks = await dispatcher.getReadyTasks(); + // Should fall back to beads since taskStore is undefined + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); +}); + +describe("WorkerConfig — taskId field", () => { + it("WorkerConfig type accepts taskId as string or null", () => { + // Type-level test: if this compiles, WorkerConfig correctly has the taskId field. + const config1: import("../dispatcher.js").WorkerConfig = { + runId: "run-1", + projectId: "proj-1", + seedId: "seed-1", + seedTitle: "Test", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt", + prompt: "Do stuff", + env: {}, + taskId: "native-task-001", // string + }; + expect(config1.taskId).toBe("native-task-001"); + + const config2: import("../dispatcher.js").WorkerConfig = { + runId: "run-2", + projectId: "proj-2", + seedId: "seed-2", + seedTitle: "Test 2", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt2", + prompt: "Do stuff", + env: {}, + taskId: null, // null (beads fallback) + }; + expect(config2.taskId).toBeNull(); + + const config3: import("../dispatcher.js").WorkerConfig = { + runId: "run-3", + projectId: "proj-3", + seedId: "seed-3", + seedTitle: "Test 3", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt3", + prompt: "Do stuff", + env: {}, + // taskId not set (undefined — backward-compatible) + }; + expect(config3.taskId).toBeUndefined(); + }); +}); diff --git a/src/orchestrator/dispatcher.ts b/src/orchestrator/dispatcher.ts index aff54463..28fac16e 100644 --- a/src/orchestrator/dispatcher.ts +++ b/src/orchestrator/dispatcher.ts @@ -14,6 +14,7 @@ import { installDependencies, runSetupWithCache } from "../lib/git.js"; import { GitBackend } from "../lib/vcs/git-backend.js"; import { extractBranchLabel, isDefaultBranch, applyBranchLabel } from "../lib/branch-label.js"; import { BeadsRustClient } from "../lib/beads-rust.js"; +import { NativeTaskStore } from "../lib/task-store.js"; import { workerAgentMd } from "./templates.js"; import { normalizePriority } from "../lib/priority.js"; import { PLAN_STEP_CONFIG } from "./roles.js"; @@ -44,8 +45,56 @@ export class Dispatcher { private store: ForemanStore, private projectPath: string, private bvClient?: BvClient | null, + private taskStore?: NativeTaskStore, ) {} + /** + * Return the list of ready tasks, routing through the native task store + * when available (REQ-014, REQ-017) or falling back to BeadsRustClient. + * + * Resolution order (highest priority first): + * 1. FOREMAN_TASK_STORE=beads — always use BeadsRustClient + * 2. FOREMAN_TASK_STORE=native — always use NativeTaskStore + * 3. auto (default) — native if NativeTaskStore has rows, else beads + */ + async getReadyTasks(): Promise { + if (this.resolveNativeMode()) { + log("[dispatcher] Using native task store (tasks table)"); + return this.taskStore!.list({ status: "ready" }); + } + + // Beads fallback path (REQ-020 backward compatibility) + log("[dispatcher] Using beads fallback (BeadsRustClient.ready())"); + return this.seeds.ready(); + } + + /** + * Determine whether native-task-store mode is active. + * + * Resolution order (highest priority first): + * 1. FOREMAN_TASK_STORE=beads — always false (beads forced) + * 2. FOREMAN_TASK_STORE=native — always true (native forced) + * 3. auto — true if taskStore injected and has rows + * + * Also validates FOREMAN_TASK_STORE and emits a warning on invalid values. + */ + private resolveNativeMode(): boolean { + const override = process.env.FOREMAN_TASK_STORE; + + // Validate env value; warn on unrecognised values but don't crash. + if (override && override !== "native" && override !== "beads") { + console.error( + `[dispatcher] Warning: FOREMAN_TASK_STORE='${override}' is not recognised ` + + `(expected 'native' or 'beads'). Ignoring and using auto-detection.`, + ); + } + + if (override === "beads") return false; + if (override === "native") return this.taskStore !== undefined; + // Auto-detect: prefer native if NativeTaskStore is injected and has rows. + return this.taskStore !== undefined && this.taskStore.hasNativeTasks(); + } + /** * Query ready seeds, create worktrees, write TASK.md, and record runs. */ @@ -86,7 +135,7 @@ export class Dispatcher { const activeRuns = this.store.getActiveRuns(projectId); const available = Math.max(0, maxAgents - activeRuns.length); - let readySeeds = await this.seeds.ready(); + let readySeeds = await this.getReadyTasks(); // Sort ready seeds using bv triage scores when available, falling back to priority sort. if (!opts?.seedId) { @@ -452,13 +501,25 @@ export class Dispatcher { } // 6. Mark seed as in_progress before spawning agent. - // Non-fatal: br may reject the claim due to stale blocked cache (beads_rust#204). - // The agent can still run — the status update is cosmetic. - try { - await this.seeds.update(seed.id, { status: "in_progress" }); - } catch (claimErr: unknown) { - const claimMsg = claimErr instanceof Error ? claimErr.message : String(claimErr); - console.error(`[dispatch] Warning: br claim failed for ${seed.id} (non-fatal): ${claimMsg.slice(0, 200)}`); + // When native task store is active, use atomic claim() transaction (REQ-020). + // Otherwise fall back to beads update (may fail non-fatally for stale cache). + const nativeMode = this.resolveNativeMode(); + if (nativeMode && this.taskStore) { + try { + this.taskStore.claim(seed.id, run.id); + } catch (claimErr: unknown) { + const claimMsg = claimErr instanceof Error ? claimErr.message : String(claimErr); + console.error(`[dispatch] Warning: native claim failed for ${seed.id} (non-fatal): ${claimMsg.slice(0, 200)}`); + } + } else { + // Non-fatal: br may reject the claim due to stale blocked cache (beads_rust#204). + // The agent can still run — the status update is cosmetic. + try { + await this.seeds.update(seed.id, { status: "in_progress" }); + } catch (claimErr: unknown) { + const claimMsg = claimErr instanceof Error ? claimErr.message : String(claimErr); + console.error(`[dispatch] Warning: br claim failed for ${seed.id} (non-fatal): ${claimMsg.slice(0, 200)}`); + } } // 6a. Send bead-claimed mail so inbox shows bead lifecycle event @@ -475,6 +536,8 @@ export class Dispatcher { } // 7. Spawn the coding agent + // Pass taskId from native store (null in beads fallback mode — REQ-020). + const taskId = nativeMode ? seed.id : null; const { sessionKey } = await this.spawnAgent( model, worktreePath, @@ -489,6 +552,7 @@ export class Dispatcher { opts?.notifyUrl, vcsBackend, opts?.targetBranch, + taskId, ); // Update run with session key @@ -816,6 +880,7 @@ export class Dispatcher { notifyUrl?: string, vcsBackend?: VcsBackend, targetBranch?: string, + taskId?: string | null, ): Promise<{ sessionKey: string }> { const prompt = this.buildSpawnPrompt(seed.id, seed.title); @@ -847,6 +912,7 @@ export class Dispatcher { seedLabels: seed.labels, seedPriority: seed.priority, targetBranch, + taskId: taskId ?? null, }); return { sessionKey }; @@ -885,6 +951,7 @@ export class Dispatcher { env, resume: sdkSessionId, dbPath: join(this.projectPath, ".foreman", "foreman.db"), + taskId: null, // resume path: taskId not tracked (beads fallback mode) }); return { sessionKey }; @@ -1185,6 +1252,13 @@ export interface WorkerConfig { * When set, the agent worker merges into this branch instead of detectDefaultBranch(). */ targetBranch?: string; + /** + * Task ID from the native task store (null when using beads fallback mode). + * Populated by Dispatcher when FOREMAN_TASK_STORE=native or when native tasks + * are auto-detected. Pipeline executor uses this for phase progress updates. + * Null means phase updates are no-ops (REQ-020 backward compatibility). + */ + taskId?: string | null; } // ── Spawn Strategy Pattern ────────────────────────────────────────────── From f6ebeeb80402173c0a9367a4950006c305218294 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 19:24:27 -0500 Subject: [PATCH 02/13] From 68875f3ed25a2ad73bdac4fda641cb1e437efe90 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 20:22:31 -0500 Subject: [PATCH 03/13] From ca9e8dc1f376000dffc7c3163477eceded497e73 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:10:01 -0500 Subject: [PATCH 04/13] From 15911954e3e05d2da830dacf6a173fa50df35961 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:23:21 -0500 Subject: [PATCH 05/13] From 4222fa84b174824313a09f81929e2e1cb352a792 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:42:12 -0500 Subject: [PATCH 06/13] From 8105530b499fb02868495ac21f4ef6756800969f Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:46:54 -0500 Subject: [PATCH 07/13] From 093d457034e32abfa7b161937f171dbbc659ae4e Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:47:04 -0500 Subject: [PATCH 08/13] From c3dc00faeccae9302740557b863a5bf48492a836 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:48:04 -0500 Subject: [PATCH 09/13] From 49e2c1ff866cf555aa3234c5f7c155c8badeb244 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:48:57 -0500 Subject: [PATCH 10/13] From cfc354b41fe4cb18660e903e64b8a797d68cc67f Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 22:50:04 -0500 Subject: [PATCH 11/13] From 34f77f1f96cb35daf87889d51b28fec4789bceb7 Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 23:17:01 -0500 Subject: [PATCH 12/13] From 34da2aff3316e118cfcc24734b5094a6ce01e14b Mon Sep 17 00:00:00 2001 From: Leo D'Angelo Date: Sun, 29 Mar 2026 23:18:42 -0500 Subject: [PATCH 13/13]