diff --git a/src/lib/__tests__/task-store.test.ts b/src/lib/__tests__/task-store.test.ts new file mode 100644 index 00000000..5fb3197c --- /dev/null +++ b/src/lib/__tests__/task-store.test.ts @@ -0,0 +1,318 @@ +/** + * Tests for NativeTaskStore. + * + * REQ-014 — coexistence check via hasNativeTasks() + * REQ-017 — dispatcher native store: SELECT WHERE status=ready + * REQ-020 — atomic claim, backward-compatible null taskId + */ + +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { ForemanStore } from "../store.js"; +import { NativeTaskStore } from "../task-store.js"; +import { randomUUID } from "node:crypto"; + +// ── Helpers ────────────────────────────────────────────────────────────── + +function makeTmpDir() { + return mkdtempSync(join(tmpdir(), "foreman-task-store-test-")); +} + +function makeStores(tmpDir: string) { + const foremanStore = new ForemanStore(join(tmpDir, "foreman.db")); + const taskStore = new NativeTaskStore(foremanStore.getDb()); + return { foremanStore, taskStore }; +} + +function insertTask( + db: ReturnType, + opts: Partial<{ + id: string; + title: string; + status: string; + priority: number; + type: string; + }> = {}, +) { + const id = opts.id ?? randomUUID(); + const now = new Date().toISOString(); + db.prepare( + `INSERT INTO tasks (id, title, description, type, priority, status, created_at, updated_at) + VALUES (?, ?, NULL, ?, ?, ?, ?, ?)`, + ).run( + id, + opts.title ?? "Test Task", + opts.type ?? "task", + opts.priority ?? 2, + opts.status ?? "ready", + now, + now, + ); + return id; +} + +// ── Test suite ─────────────────────────────────────────────────────────── + +describe("NativeTaskStore.hasNativeTasks()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("returns false when tasks table is empty", () => { + expect(taskStore.hasNativeTasks()).toBe(false); + }); + + it("returns true when at least one task exists", () => { + insertTask(foremanStore.getDb()); + expect(taskStore.hasNativeTasks()).toBe(true); + }); + + it("returns false after all tasks are deleted", () => { + const id = insertTask(foremanStore.getDb()); + expect(taskStore.hasNativeTasks()).toBe(true); + foremanStore.getDb().prepare("DELETE FROM tasks WHERE id = ?").run(id); + expect(taskStore.hasNativeTasks()).toBe(false); + }); +}); + +describe("NativeTaskStore.list()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("returns all tasks when no filter is provided", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "ready" }); + insertTask(db, { status: "in-progress" }); + insertTask(db, { status: "backlog" }); + + const tasks = taskStore.list(); + expect(tasks).toHaveLength(3); + }); + + it("filters by status when opts.status is provided", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "ready", title: "R1" }); + insertTask(db, { status: "ready", title: "R2" }); + insertTask(db, { status: "in-progress", title: "IP1" }); + + const readyTasks = taskStore.list({ status: "ready" }); + expect(readyTasks).toHaveLength(2); + expect(readyTasks.every((t) => t.status === "ready")).toBe(true); + }); + + it("returns an empty array when no tasks match the filter", () => { + const db = foremanStore.getDb(); + insertTask(db, { status: "backlog" }); + + const readyTasks = taskStore.list({ status: "ready" }); + expect(readyTasks).toHaveLength(0); + }); + + it("maps task rows to Issue objects", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { title: "My Task", status: "ready", priority: 1, type: "bug" }); + + const tasks = taskStore.list({ status: "ready" }); + expect(tasks).toHaveLength(1); + const task = tasks[0]; + expect(task.id).toBe(id); + expect(task.title).toBe("My Task"); + expect(task.status).toBe("ready"); + expect(task.priority).toBe("1"); + expect(task.type).toBe("bug"); + expect(task.assignee).toBeNull(); + expect(task.parent).toBeNull(); + }); + + it("sorts by priority ASC, then created_at ASC", () => { + const db = foremanStore.getDb(); + // Insert in reverse priority order + const idLow = insertTask(db, { priority: 3, title: "Low" }); + const idHigh = insertTask(db, { priority: 0, title: "High" }); + const idMed = insertTask(db, { priority: 2, title: "Med" }); + + const tasks = taskStore.list(); + expect(tasks[0].id).toBe(idHigh); // priority 0 + expect(tasks[1].id).toBe(idMed); // priority 2 + expect(tasks[2].id).toBe(idLow); // priority 3 + }); +}); + +describe("NativeTaskStore.claim()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + /** + * Insert a minimal run record to satisfy the FK constraint on tasks.run_id. + * The project_id is "proj-test" (no FK on projects in this minimal setup). + */ + function insertRun(db: ReturnType, runId: string) { + // Disable FK enforcement temporarily to insert without a matching project. + db.pragma("foreign_keys = OFF"); + const now = new Date().toISOString(); + db.prepare( + `INSERT INTO runs (id, project_id, seed_id, agent_type, status, created_at) + VALUES (?, 'proj-test', 'seed-test', 'claude-code', 'running', ?)`, + ).run(runId, now); + db.pragma("foreign_keys = ON"); + } + + it("sets status to in-progress and records run_id", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-123"; + insertRun(db, runId); + + taskStore.claim(id, runId); + + const row = db.prepare("SELECT status, run_id FROM tasks WHERE id = ?").get(id) as + | { status: string; run_id: string | null } + | undefined; + expect(row?.status).toBe("in-progress"); + expect(row?.run_id).toBe(runId); + }); + + it("is idempotent when called again with the same runId", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-123"; + insertRun(db, runId); + + taskStore.claim(id, runId); + // Calling again with the same runId should not throw + expect(() => taskStore.claim(id, runId)).not.toThrow(); + }); + + it("throws when the task is already claimed by a different run", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + insertRun(db, "run-111"); + insertRun(db, "run-222"); + + taskStore.claim(id, "run-111"); + expect(() => taskStore.claim(id, "run-222")).toThrow(/already claimed/); + }); + + it("throws when the task does not exist", () => { + expect(() => taskStore.claim("nonexistent-task", "run-xyz")).toThrow(/not found/); + }); + + it("updates updated_at timestamp", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "ready" }); + const runId = "run-001"; + insertRun(db, runId); + const before = new Date().toISOString(); + + taskStore.claim(id, runId); + + const row = db.prepare("SELECT updated_at FROM tasks WHERE id = ?").get(id) as + | { updated_at: string } + | undefined; + expect(row?.updated_at).toBeDefined(); + expect(row!.updated_at >= before).toBe(true); + }); +}); + +describe("NativeTaskStore.updatePhase()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("updates the task status to the given phase", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + taskStore.updatePhase(id, "developer"); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("developer"); + }); + + it("is a no-op when taskId is null (beads fallback mode)", () => { + // Should not throw and should not touch any rows + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + expect(() => taskStore.updatePhase(null, "developer")).not.toThrow(); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("in-progress"); // unchanged + }); +}); + +describe("NativeTaskStore.updateStatus()", () => { + let tmpDir: string; + let foremanStore: ForemanStore; + let taskStore: NativeTaskStore; + + beforeEach(() => { + tmpDir = makeTmpDir(); + ({ foremanStore, taskStore } = makeStores(tmpDir)); + }); + + afterEach(() => { + foremanStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("updates the task status", () => { + const db = foremanStore.getDb(); + const id = insertTask(db, { status: "in-progress" }); + + taskStore.updateStatus(id, "merged"); + + const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as + | { status: string } + | undefined; + expect(row?.status).toBe("merged"); + }); +}); diff --git a/src/orchestrator/__tests__/dispatcher.test.ts b/src/orchestrator/__tests__/dispatcher.test.ts index 2628f6c7..b87b68c8 100644 --- a/src/orchestrator/__tests__/dispatcher.test.ts +++ b/src/orchestrator/__tests__/dispatcher.test.ts @@ -1047,3 +1047,249 @@ describe("PLAN_STEP_CONFIG", () => { expect(PLAN_STEP_CONFIG.maxTurns).toBe(50); }); }); + +// ── NativeTaskStore integration ────────────────────────────────────────── + +import { NativeTaskStore } from "../../lib/task-store.js"; + +describe("Dispatcher.getReadyTasks() — native vs beads routing", () => { + function makeIssue(id: string): Issue { + return { + id, + title: `Task ${id}`, + status: "ready", + priority: "P2", + type: "task", + assignee: null, + parent: null, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + }; + } + + function makeBeadsClient(issues: Issue[]): ITaskClient { + return { + ready: vi.fn().mockResolvedValue(issues), + show: vi.fn().mockResolvedValue({ status: "open" }), + update: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + list: vi.fn().mockResolvedValue([]), + }; + } + + function makeStore(): ForemanStore { + return { + getActiveRuns: vi.fn().mockReturnValue([]), + getProjectByPath: vi.fn().mockReturnValue({ id: "proj-1" }), + getRunsForSeed: vi.fn().mockReturnValue([]), + getRunsByStatus: vi.fn().mockReturnValue([]), + } as unknown as ForemanStore; + } + + it("uses beads fallback when no taskStore is injected", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp"); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + }); + + it("uses beads fallback when taskStore is injected but has no tasks", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + expect(mockTaskStore.list).not.toHaveBeenCalled(); + }); + + it("uses native store when taskStore is injected and has tasks", async () => { + const nativeIssues = [makeIssue("task-001")]; + const seedsClient = makeBeadsClient([makeIssue("bd-001")]); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(true), + list: vi.fn().mockReturnValue(nativeIssues), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(nativeIssues); + expect(mockTaskStore.list).toHaveBeenCalledWith({ status: "ready" }); + expect(seedsClient.ready).not.toHaveBeenCalled(); + }); + + it("forces beads when FOREMAN_TASK_STORE=beads even if native tasks exist", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(true), + list: vi.fn().mockReturnValue([makeIssue("task-001")]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "beads"; + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + expect(mockTaskStore.list).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); + + it("forces native store when FOREMAN_TASK_STORE=native even if no tasks", async () => { + const seedsClient = makeBeadsClient([makeIssue("bd-001")]); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "native"; + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + expect(tasks).toEqual([]); + expect(mockTaskStore.list).toHaveBeenCalledWith({ status: "ready" }); + expect(seedsClient.ready).not.toHaveBeenCalled(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); + + it("ignores invalid FOREMAN_TASK_STORE and uses auto-detection", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const mockTaskStore = { + hasNativeTasks: vi.fn().mockReturnValue(false), + list: vi.fn().mockReturnValue([]), + claim: vi.fn(), + updatePhase: vi.fn(), + updateStatus: vi.fn(), + } as unknown as NativeTaskStore; + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "invalid-value"; + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + try { + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp", null, mockTaskStore); + const tasks = await dispatcher.getReadyTasks(); + // Should fall back to beads (hasNativeTasks() = false, so auto → beads) + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + // Should warn about invalid value + const warnCalls = consoleSpy.mock.calls.map((args) => args.join(" ")); + expect(warnCalls.some((msg) => msg.includes("not recognised"))).toBe(true); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + consoleSpy.mockRestore(); + } + }); + + it("FOREMAN_TASK_STORE=native is no-op when no taskStore injected", async () => { + const beadsIssues = [makeIssue("bd-001")]; + const seedsClient = makeBeadsClient(beadsIssues); + + const orig = process.env.FOREMAN_TASK_STORE; + process.env.FOREMAN_TASK_STORE = "native"; + try { + // No taskStore injected + const dispatcher = new Dispatcher(seedsClient, makeStore(), "/tmp"); + const tasks = await dispatcher.getReadyTasks(); + // Should fall back to beads since taskStore is undefined + expect(tasks).toEqual(beadsIssues); + expect(seedsClient.ready).toHaveBeenCalledOnce(); + } finally { + if (orig === undefined) { + delete process.env.FOREMAN_TASK_STORE; + } else { + process.env.FOREMAN_TASK_STORE = orig; + } + } + }); +}); + +describe("WorkerConfig — taskId field", () => { + it("WorkerConfig type accepts taskId as string or null", () => { + // Type-level test: if this compiles, WorkerConfig correctly has the taskId field. + const config1: import("../dispatcher.js").WorkerConfig = { + runId: "run-1", + projectId: "proj-1", + seedId: "seed-1", + seedTitle: "Test", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt", + prompt: "Do stuff", + env: {}, + taskId: "native-task-001", // string + }; + expect(config1.taskId).toBe("native-task-001"); + + const config2: import("../dispatcher.js").WorkerConfig = { + runId: "run-2", + projectId: "proj-2", + seedId: "seed-2", + seedTitle: "Test 2", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt2", + prompt: "Do stuff", + env: {}, + taskId: null, // null (beads fallback) + }; + expect(config2.taskId).toBeNull(); + + const config3: import("../dispatcher.js").WorkerConfig = { + runId: "run-3", + projectId: "proj-3", + seedId: "seed-3", + seedTitle: "Test 3", + model: "anthropic/claude-sonnet-4-6", + worktreePath: "/tmp/wt3", + prompt: "Do stuff", + env: {}, + // taskId not set (undefined — backward-compatible) + }; + expect(config3.taskId).toBeUndefined(); + }); +}); diff --git a/src/orchestrator/dispatcher.ts b/src/orchestrator/dispatcher.ts index d53558a8..a9a5143b 100644 --- a/src/orchestrator/dispatcher.ts +++ b/src/orchestrator/dispatcher.ts @@ -14,6 +14,7 @@ import { installDependencies, runSetupWithCache } from "../lib/git.js"; import { GitBackend } from "../lib/vcs/git-backend.js"; import { extractBranchLabel, isDefaultBranch, applyBranchLabel } from "../lib/branch-label.js"; import { BeadsRustClient } from "../lib/beads-rust.js"; +import { NativeTaskStore } from "../lib/task-store.js"; import { workerAgentMd } from "./templates.js"; import { normalizePriority } from "../lib/priority.js"; import { PLAN_STEP_CONFIG } from "./roles.js"; @@ -91,8 +92,56 @@ export class Dispatcher { private store: ForemanStore, private projectPath: string, private bvClient?: BvClient | null, + private taskStore?: NativeTaskStore, ) {} + /** + * Return the list of ready tasks, routing through the native task store + * when available (REQ-014, REQ-017) or falling back to BeadsRustClient. + * + * Resolution order (highest priority first): + * 1. FOREMAN_TASK_STORE=beads — always use BeadsRustClient + * 2. FOREMAN_TASK_STORE=native — always use NativeTaskStore + * 3. auto (default) — native if NativeTaskStore has rows, else beads + */ + async getReadyTasks(): Promise { + if (this.resolveNativeMode()) { + log("[dispatcher] Using native task store (tasks table)"); + return this.taskStore!.list({ status: "ready" }); + } + + // Beads fallback path (REQ-020 backward compatibility) + log("[dispatcher] Using beads fallback (BeadsRustClient.ready())"); + return this.seeds.ready(); + } + + /** + * Determine whether native-task-store mode is active. + * + * Resolution order (highest priority first): + * 1. FOREMAN_TASK_STORE=beads — always false (beads forced) + * 2. FOREMAN_TASK_STORE=native — always true (native forced) + * 3. auto — true if taskStore injected and has rows + * + * Also validates FOREMAN_TASK_STORE and emits a warning on invalid values. + */ + private resolveNativeMode(): boolean { + const override = process.env.FOREMAN_TASK_STORE; + + // Validate env value; warn on unrecognised values but don't crash. + if (override && override !== "native" && override !== "beads") { + console.error( + `[dispatcher] Warning: FOREMAN_TASK_STORE='${override}' is not recognised ` + + `(expected 'native' or 'beads'). Ignoring and using auto-detection.`, + ); + } + + if (override === "beads") return false; + if (override === "native") return this.taskStore !== undefined; + // Auto-detect: prefer native if NativeTaskStore is injected and has rows. + return this.taskStore !== undefined && this.taskStore.hasNativeTasks(); + } + /** * Query ready seeds, create worktrees, write TASK.md, and record runs. */ @@ -672,6 +721,8 @@ export class Dispatcher { } // 7. Spawn the coding agent + // Pass taskId from native store (null in beads fallback mode — REQ-020). + const taskId = nativeMode ? seed.id : null; const { sessionKey } = await this.spawnAgent( model, worktreePath, @@ -686,6 +737,7 @@ export class Dispatcher { opts?.notifyUrl, vcsBackend, opts?.targetBranch, + taskId, ); // Update run with session key @@ -1013,6 +1065,7 @@ export class Dispatcher { notifyUrl?: string, vcsBackend?: VcsBackend, targetBranch?: string, + taskId?: string | null, ): Promise<{ sessionKey: string }> { const prompt = this.buildSpawnPrompt(seed.id, seed.title); @@ -1044,6 +1097,7 @@ export class Dispatcher { seedLabels: seed.labels, seedPriority: seed.priority, targetBranch, + taskId: taskId ?? null, }); return { sessionKey }; @@ -1082,6 +1136,7 @@ export class Dispatcher { env, resume: sdkSessionId, dbPath: join(this.projectPath, ".foreman", "foreman.db"), + taskId: null, // resume path: taskId not tracked (beads fallback mode) }); return { sessionKey };