From d266adddbcc43edb16b72b861d00a25215b65a03 Mon Sep 17 00:00:00 2001 From: Mason <31372737+Ovaculos@users.noreply.github.com> Date: Wed, 20 May 2026 11:55:59 -0500 Subject: [PATCH 1/2] refactor(conversations): switch ConversationIndex to pull-on-demand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the fs.watch + 500ms-debounce model with a pull-on-demand index. Each `list` / `get` / `search` reconciles in-memory state with the directory: stat every conversation file, reuse cached entries when mtime is unchanged, re-read the header for new or modified files, drop entries whose files have disappeared. Removes a class of timing races: `fs.watch` delivery is variable (especially under FSEvents on macOS), the 500ms debounce window let write-then-list refetches return stale data, and the prior `flushPending` shortcut had a concurrent-caller gap (caller B could share caller A's in-flight snapshot and miss a filename queued mid-flush). `get(id)` keeps an `id → filePath` map so the common callers (`handleGet`, `handleFork`, `handleUpdate`) stat one file instead of walking the directory. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/bundles/conversations/src/index-cache.ts | 254 +++++++++--------- src/bundles/conversations/src/server.ts | 10 +- src/bundles/conversations/src/tools/export.ts | 2 +- src/bundles/conversations/src/tools/fork.ts | 2 +- src/bundles/conversations/src/tools/get.ts | 2 +- src/bundles/conversations/src/tools/search.ts | 2 +- src/bundles/conversations/src/tools/stats.ts | 4 +- src/bundles/conversations/src/tools/update.ts | 2 +- src/tools/platform/conversations.ts | 3 +- .../bundles/conversations/index-cache.test.ts | 160 +++++------ .../conversations/tools/export.test.ts | 5 +- .../bundles/conversations/tools/fork.test.ts | 4 +- .../bundles/conversations/tools/get.test.ts | 21 +- .../bundles/conversations/tools/list.test.ts | 24 +- .../conversations/tools/search.test.ts | 23 +- .../bundles/conversations/tools/stats.test.ts | 26 +- .../conversations/tools/update.test.ts | 12 +- 17 files changed, 275 insertions(+), 281 deletions(-) diff --git a/src/bundles/conversations/src/index-cache.ts b/src/bundles/conversations/src/index-cache.ts index b5255391..4cb2f9fe 100644 --- a/src/bundles/conversations/src/index-cache.ts +++ b/src/bundles/conversations/src/index-cache.ts @@ -1,14 +1,14 @@ /** - * In-memory index of conversation metadata for fast listing, searching, and filtering. + * Pull-on-demand conversation index. * - * Built on startup by scanning all JSONL file headers. Invalidated by fs.watch() - * on the conversations directory (debounced 500ms). - * - * Types are defined locally — no imports from the runtime codebase. + * Each call reconciles in-memory state with the directory: stat every + * conversation file, re-read headers only for new or mtime-changed files, + * drop entries for files that have disappeared. There is no `fs.watch`, no + * debounce, and no broadcast-vs-debounce race — `list` and `get` always + * return disk-truth. */ -import { type FSWatcher, watch } from "node:fs"; -import { basename, join } from "node:path"; +import { statSync } from "node:fs"; import { listConversationFiles, readConversationHeader } from "./jsonl-reader.ts"; // --------------------------------------------------------------------------- @@ -43,78 +43,37 @@ export interface ListResult { totalCount: number; } +interface CachedEntry { + entry: IndexEntry; + mtimeMs: number; +} + // --------------------------------------------------------------------------- // ConversationIndex // --------------------------------------------------------------------------- export class ConversationIndex { - private entries: Map = new Map(); - /** Maps filename (e.g. "conv_abc.jsonl") to conversation ID for fast fs.watch lookups. */ - private fileToId: Map = new Map(); + /** filePath → cached entry + last-seen mtime. */ + private cache: Map = new Map(); + /** id → filePath. Enables `get(id)` to stat one file instead of walking. */ + private byId: Map = new Map(); private dir: string | null = null; - private watcher: FSWatcher | null = null; - private debounceTimer: ReturnType | null = null; - private pendingFiles: Set = new Set(); - - /** Build index by scanning all .jsonl files in dir. Reads only headers (line 1 + preview). */ - async build(dir: string): Promise { - this.dir = dir; - this.entries.clear(); - this.fileToId.clear(); - - const files = listConversationFiles(dir); - - for (const filePath of files) { - await this.indexFile(filePath); - } - } - /** Start fs.watch on dir. On change, debounce 500ms, then re-read affected file header. */ - startWatching(dir: string): void { - this.stopWatching(); + /** Point the index at a directory. No I/O until the first list/get. */ + init(dir: string): void { this.dir = dir; - - this.watcher = watch(dir, (_eventType, filename) => { - if (!filename?.endsWith(".jsonl")) return; - - this.pendingFiles.add(filename); - - if (this.debounceTimer) { - clearTimeout(this.debounceTimer); - } - - this.debounceTimer = setTimeout(() => { - this.processPendingFiles(); - }, 500); - }); - } - - /** Stop watching. */ - stopWatching(): void { - if (this.watcher) { - this.watcher.close(); - this.watcher = null; - } - if (this.debounceTimer) { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } - this.pendingFiles.clear(); } /** List conversations with pagination, sorting, date filtering, search. */ - list(options?: ListOptions): ListResult { - let items = [...this.entries.values()]; + async list(options?: ListOptions): Promise { + let items = await this.reconcile(); - // Search filter: case-insensitive substring on title + preview if (options?.search) { const q = options.search.toLowerCase(); items = items.filter( (e) => (e.title?.toLowerCase().includes(q) ?? false) || e.preview.toLowerCase().includes(q), ); } - - // Date filtering if (options?.dateFrom) { const from = options.dateFrom; items = items.filter((e) => e.createdAt >= from); @@ -124,19 +83,14 @@ export class ConversationIndex { items = items.filter((e) => e.createdAt <= to); } - // Sorting (descending — newest first) const sortBy = options?.sortBy ?? "updated"; const sortKey = sortBy === "created" ? "createdAt" : "updatedAt"; items.sort((a, b) => b[sortKey].localeCompare(a[sortKey])); const totalCount = items.length; - - // Cursor pagination: skip entries up to and including the cursor ID if (options?.cursor) { const idx = items.findIndex((e) => e.id === options.cursor); - if (idx >= 0) { - items = items.slice(idx + 1); - } + if (idx >= 0) items = items.slice(idx + 1); } const limit = options?.limit ?? 20; @@ -147,55 +101,34 @@ export class ConversationIndex { return { conversations: page, nextCursor, totalCount }; } - /** Get a single entry by ID. */ - get(id: string): IndexEntry | undefined { - return this.entries.get(id); - } - - /** Total conversation count. */ - get size(): number { - return this.entries.size; - } - - // --------------------------------------------------------------------------- - // Internal helpers - // --------------------------------------------------------------------------- - - private async indexFile(filePath: string): Promise { - const header = await readConversationHeader(filePath); - if (!header) return; - - const entry: IndexEntry = { - id: header.meta.id, - title: header.meta.title, - createdAt: header.meta.createdAt, - updatedAt: header.meta.updatedAt, - messageCount: header.messageCount, - totalInputTokens: header.meta.totalInputTokens, - totalOutputTokens: header.meta.totalOutputTokens, - lastModel: header.meta.lastModel, - preview: header.preview, - filePath, - }; - - this.entries.set(entry.id, entry); - this.fileToId.set(basename(filePath), entry.id); - } - - private async processPendingFiles(): Promise { - if (!this.dir) return; - - const files = [...this.pendingFiles]; - this.pendingFiles.clear(); - - for (const filename of files) { - const filePath = join(this.dir, filename); - - // Try to read the header. If the file was deleted, readConversationHeader returns null. - const header = await readConversationHeader(filePath); - - if (header) { - // File exists — update/add entry + /** + * Get a single entry by ID. + * + * Fast path: when the id has been seen before, stat one file and reuse + * the cached entry if its mtime is unchanged (or re-read just that + * header on a change). Avoids the full directory walk for the common + * case where `handleGet`/`handleFork`/`handleUpdate` operate on a + * conversation the index already knows about. + * + * Slow path: id unknown (first encounter, or the cached file + * disappeared) — full reconcile, then a Map lookup. + */ + async get(id: string): Promise { + if (!this.dir) return undefined; + + const knownPath = this.byId.get(id); + if (knownPath) { + try { + const mtimeMs = statSync(knownPath).mtimeMs; + const cached = this.cache.get(knownPath); + if (cached && cached.mtimeMs === mtimeMs) return cached.entry; + + const header = await readConversationHeader(knownPath); + if (!header) { + this.cache.delete(knownPath); + this.byId.delete(id); + return undefined; + } const entry: IndexEntry = { id: header.meta.id, title: header.meta.title, @@ -206,18 +139,87 @@ export class ConversationIndex { totalOutputTokens: header.meta.totalOutputTokens, lastModel: header.meta.lastModel, preview: header.preview, - filePath, + filePath: knownPath, }; - this.entries.set(entry.id, entry); - this.fileToId.set(filename, entry.id); - } else { - // File was deleted or became unreadable — remove from index - const id = this.fileToId.get(filename); - if (id) { - this.entries.delete(id); - this.fileToId.delete(filename); - } + this.cache.set(knownPath, { entry, mtimeMs }); + this.byId.set(entry.id, knownPath); + return entry; + } catch { + // File gone since we cached it. Forget the stale mapping and fall + // through to a full reconcile in case the id moved. + this.cache.delete(knownPath); + this.byId.delete(id); + } + } + + await this.reconcile(); + const path = this.byId.get(id); + return path ? this.cache.get(path)?.entry : undefined; + } + + // --------------------------------------------------------------------------- + // Internal + // --------------------------------------------------------------------------- + + /** + * Reconcile cached state with the directory: + * - stat every conversation file (sync, cheap) + * - reuse cached entry if mtime is unchanged + * - re-read header for new or modified files + * - drop entries whose files no longer exist + * + * Concurrent callers do their own reconcile; cache writes are idempotent + * by file path, so the worst case is a redundant header read. + */ + private async reconcile(): Promise { + if (!this.dir) return []; + + const files = listConversationFiles(this.dir); + const present = new Set(files); + const entries: IndexEntry[] = []; + + for (const filePath of files) { + let mtimeMs: number; + try { + mtimeMs = statSync(filePath).mtimeMs; + } catch { + // Race: file removed between listing and stat. Skip. + continue; + } + + const cached = this.cache.get(filePath); + if (cached && cached.mtimeMs === mtimeMs) { + entries.push(cached.entry); + continue; } + + const header = await readConversationHeader(filePath); + if (!header) continue; + + const entry: IndexEntry = { + id: header.meta.id, + title: header.meta.title, + createdAt: header.meta.createdAt, + updatedAt: header.meta.updatedAt, + messageCount: header.messageCount, + totalInputTokens: header.meta.totalInputTokens, + totalOutputTokens: header.meta.totalOutputTokens, + lastModel: header.meta.lastModel, + preview: header.preview, + filePath, + }; + this.cache.set(filePath, { entry, mtimeMs }); + this.byId.set(entry.id, filePath); + entries.push(entry); } + + for (const [cachedPath, cached] of this.cache) { + if (!present.has(cachedPath)) { + this.cache.delete(cachedPath); + this.byId.delete(cached.entry.id); + } + } + + return entries; } } diff --git a/src/bundles/conversations/src/server.ts b/src/bundles/conversations/src/server.ts index e8ad71ad..78534c3b 100644 --- a/src/bundles/conversations/src/server.ts +++ b/src/bundles/conversations/src/server.ts @@ -153,13 +153,10 @@ async function routeToolCall( async function main(): Promise { log(`Starting with conversations dir: ${CONVERSATIONS_DIR}`); - // Build the in-memory index from JSONL file headers + // Pull-on-demand index: each tool call reconciles with disk via mtime cache. const index = new ConversationIndex(); - await index.build(CONVERSATIONS_DIR); - log(`Indexed ${index.size} conversations`); - - // Start watching for file changes - index.startWatching(CONVERSATIONS_DIR); + index.init(CONVERSATIONS_DIR); + log(`Index initialized at ${CONVERSATIONS_DIR}`); // Create MCP server const server = new Server( @@ -245,7 +242,6 @@ async function main(): Promise { // Clean shutdown const shutdown = async () => { log("Shutting down..."); - index.stopWatching(); await server.close(); process.exit(0); }; diff --git a/src/bundles/conversations/src/tools/export.ts b/src/bundles/conversations/src/tools/export.ts index b0770799..b97a70ba 100644 --- a/src/bundles/conversations/src/tools/export.ts +++ b/src/bundles/conversations/src/tools/export.ts @@ -84,7 +84,7 @@ function exportMarkdown( } export async function handleExport(input: ExportInput, index: ConversationIndex): Promise { - const entry = index.get(input.id); + const entry = await index.get(input.id); if (!entry) { throw new Error(`Conversation not found: ${input.id}`); } diff --git a/src/bundles/conversations/src/tools/fork.ts b/src/bundles/conversations/src/tools/fork.ts index 8bd7c108..9b41bbdd 100644 --- a/src/bundles/conversations/src/tools/fork.ts +++ b/src/bundles/conversations/src/tools/fork.ts @@ -18,7 +18,7 @@ export interface ForkInput { } export async function handleFork(input: ForkInput, index: ConversationIndex): Promise { - const entry = index.get(input.id); + const entry = await index.get(input.id); if (!entry) { throw new Error(`Conversation not found: ${input.id}`); } diff --git a/src/bundles/conversations/src/tools/get.ts b/src/bundles/conversations/src/tools/get.ts index 3e7befd5..e75b26be 100644 --- a/src/bundles/conversations/src/tools/get.ts +++ b/src/bundles/conversations/src/tools/get.ts @@ -67,7 +67,7 @@ function selectByCharCap(messages: unknown[], cap: number): GetMessagesResult { } export async function handleGet(input: GetInput, index: ConversationIndex): Promise { - const entry = index.get(input.id); + const entry = await index.get(input.id); if (!entry) { throw new Error(`Conversation not found: ${input.id}`); } diff --git a/src/bundles/conversations/src/tools/search.ts b/src/bundles/conversations/src/tools/search.ts index b2507f71..2ae286c9 100644 --- a/src/bundles/conversations/src/tools/search.ts +++ b/src/bundles/conversations/src/tools/search.ts @@ -48,7 +48,7 @@ export async function handleSearch(input: SearchInput, index: ConversationIndex) const limit = input.limit ?? DEFAULT_LIMIT; const queryLower = query.toLowerCase(); - const allConversations = index.list({ limit: index.size || 1 }); + const allConversations = await index.list({ limit: Number.MAX_SAFE_INTEGER }); const results: SearchResult[] = []; for (const entry of allConversations.conversations) { diff --git a/src/bundles/conversations/src/tools/stats.ts b/src/bundles/conversations/src/tools/stats.ts index d628d943..c6ec3f3a 100644 --- a/src/bundles/conversations/src/tools/stats.ts +++ b/src/bundles/conversations/src/tools/stats.ts @@ -61,8 +61,8 @@ export async function handleStats( const untilIso = now.toISOString(); // Get all conversations matching the date range using the index - const listResult = index.list({ - limit: 999999, + const listResult = await index.list({ + limit: Number.MAX_SAFE_INTEGER, dateFrom: sinceIso || undefined, dateTo: untilIso, sortBy: "created", diff --git a/src/bundles/conversations/src/tools/update.ts b/src/bundles/conversations/src/tools/update.ts index d88298d8..337bd69a 100644 --- a/src/bundles/conversations/src/tools/update.ts +++ b/src/bundles/conversations/src/tools/update.ts @@ -15,7 +15,7 @@ export interface UpdateInput { } export async function handleUpdate(input: UpdateInput, index: ConversationIndex): Promise { - const entry = index.get(input.id); + const entry = await index.get(input.id); if (!entry) { throw new Error(`Conversation not found: ${input.id}`); } diff --git a/src/tools/platform/conversations.ts b/src/tools/platform/conversations.ts index b25abc74..758f8ea0 100644 --- a/src/tools/platform/conversations.ts +++ b/src/tools/platform/conversations.ts @@ -48,8 +48,7 @@ export async function createConversationsSource( let index = indexCache.get(cacheKey); if (!index) { index = new ConversationIndex(); - await index.build(dir); - index.startWatching(dir); + index.init(dir); indexCache.set(cacheKey, index); } return { index, dir }; diff --git a/test/unit/bundles/conversations/index-cache.test.ts b/test/unit/bundles/conversations/index-cache.test.ts index 87e2db32..f7757d23 100644 --- a/test/unit/bundles/conversations/index-cache.test.ts +++ b/test/unit/bundles/conversations/index-cache.test.ts @@ -78,11 +78,11 @@ afterEach(() => { }); // --------------------------------------------------------------------------- -// build() +// init + initial read // --------------------------------------------------------------------------- -describe("build", () => { - test("builds index from a directory with 3 JSONL files", async () => { +describe("init", () => { + test("reads a directory with 3 JSONL files", async () => { writeConvFile({ id: "aaa", createdAt: "2025-01-01T00:00:00.000Z", @@ -122,11 +122,11 @@ describe("build", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); - expect(index.size).toBe(3); + expect((await index.list()).totalCount).toBe(3); - const a = index.get("aaa"); + const a = await index.get("aaa"); expect(a).toBeDefined(); expect(a!.title).toBe("First conversation"); expect(a!.messageCount).toBe(2); @@ -135,24 +135,23 @@ describe("build", () => { expect(a!.lastModel).toBe("claude-sonnet-4-5-20250929"); expect(a!.preview).toBe("Hello world"); - const b = index.get("bbb"); + const b = await index.get("bbb"); expect(b).toBeDefined(); expect(b!.messageCount).toBe(1); expect(b!.preview).toBe("How does MCP work?"); - const c = index.get("ccc"); + const c = await index.get("ccc"); expect(c).toBeDefined(); expect(c!.title).toBeNull(); expect(c!.messageCount).toBe(3); expect(c!.preview).toBe("Deploy to production"); }); - test("empty directory results in size 0", async () => { + test("empty directory results in totalCount 0", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); - expect(index.size).toBe(0); + index.init(TMP_DIR); - const result = index.list(); + const result = await index.list(); expect(result.conversations).toEqual([]); expect(result.nextCursor).toBeNull(); expect(result.totalCount).toBe(0); @@ -174,9 +173,9 @@ describe("build", () => { writeFileSync(join(TMP_DIR, "conv_broken.jsonl"), "this is not valid json\n"); const index = new ConversationIndex(); - await index.build(TMP_DIR); - expect(index.size).toBe(1); - expect(index.get("good")).toBeDefined(); + index.init(TMP_DIR); + expect((await index.list()).totalCount).toBe(1); + expect(await index.get("good")).toBeDefined(); }); }); @@ -209,16 +208,16 @@ describe("list pagination", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); // First page - const page1 = index.list({ limit: 2 }); + const page1 = await index.list({ limit: 2 }); expect(page1.conversations).toHaveLength(2); expect(page1.totalCount).toBe(3); expect(page1.nextCursor).not.toBeNull(); // Second page using cursor - const page2 = index.list({ limit: 2, cursor: page1.nextCursor! }); + const page2 = await index.list({ limit: 2, cursor: page1.nextCursor! }); expect(page2.conversations).toHaveLength(1); expect(page2.nextCursor).toBeNull(); expect(page2.totalCount).toBe(3); @@ -261,26 +260,26 @@ describe("list search", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); // Search by title - const r1 = index.list({ search: "kubernetes" }); + const r1 = await index.list({ search: "kubernetes" }); expect(r1.conversations).toHaveLength(2); expect(r1.totalCount).toBe(2); const ids1 = r1.conversations.map((c) => c.id).sort(); expect(ids1).toEqual(["s1", "s3"]); // Search by preview content - const r2 = index.list({ search: "postgres" }); + const r2 = await index.list({ search: "postgres" }); expect(r2.conversations).toHaveLength(1); expect(r2.conversations[0]!.id).toBe("s2"); // Case insensitive - const r3 = index.list({ search: "DEPLOY" }); + const r3 = await index.list({ search: "DEPLOY" }); expect(r3.conversations).toHaveLength(2); // No match - const r4 = index.list({ search: "nonexistent" }); + const r4 = await index.list({ search: "nonexistent" }); expect(r4.conversations).toHaveLength(0); expect(r4.totalCount).toBe(0); }); @@ -315,20 +314,20 @@ describe("list date filtering", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); // Only February onwards - const r1 = index.list({ dateFrom: "2025-02-01T00:00:00.000Z" }); + const r1 = await index.list({ dateFrom: "2025-02-01T00:00:00.000Z" }); expect(r1.totalCount).toBe(2); expect(r1.conversations.map((c) => c.id).sort()).toEqual(["d2", "d3"]); // Only up to February - const r2 = index.list({ dateTo: "2025-02-28T00:00:00.000Z" }); + const r2 = await index.list({ dateTo: "2025-02-28T00:00:00.000Z" }); expect(r2.totalCount).toBe(2); expect(r2.conversations.map((c) => c.id).sort()).toEqual(["d1", "d2"]); // Exact range: February only - const r3 = index.list({ + const r3 = await index.list({ dateFrom: "2025-02-01T00:00:00.000Z", dateTo: "2025-02-28T23:59:59.999Z", }); @@ -371,14 +370,14 @@ describe("list sorting", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); // Sort by created (desc): sort3, sort2, sort1 - const byCreated = index.list({ sortBy: "created" }); + const byCreated = await index.list({ sortBy: "created" }); expect(byCreated.conversations.map((c) => c.id)).toEqual(["sort3", "sort2", "sort1"]); // Sort by updated (desc): sort1, sort2, sort3 - const byUpdated = index.list({ sortBy: "updated" }); + const byUpdated = await index.list({ sortBy: "updated" }); expect(byUpdated.conversations.map((c) => c.id)).toEqual(["sort1", "sort2", "sort3"]); }); @@ -399,9 +398,9 @@ describe("list sorting", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); - const result = index.list(); + const result = await index.list(); // def1 has later updatedAt, so comes first expect(result.conversations[0]!.id).toBe("def1"); }); @@ -422,54 +421,41 @@ describe("get", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); - expect(index.get("get1")).toBeDefined(); - expect(index.get("get1")!.title).toBe("Get test"); - expect(index.get("nonexistent")).toBeUndefined(); + expect(await index.get("get1")).toBeDefined(); + expect((await index.get("get1"))!.title).toBe("Get test"); + expect(await index.get("nonexistent")).toBeUndefined(); }); }); // --------------------------------------------------------------------------- -// fs.watch integration +// Directory reconciliation — each list/get reads fresh from disk // --------------------------------------------------------------------------- -describe("fs.watch integration", () => { - test("indexes a new file when processPendingFiles runs", async () => { - // Tests the deterministic post-debounce path. We do NOT exercise - // fs.watch here because macOS FSEvents is unreliable for new-file - // creation under parallel-test load — it occasionally drops the - // event entirely, producing a flake. The sibling deletion test - // uses the same private-method pattern. The actual fs.watch → - // debounce → processPendingFiles wiring is exercised end-to-end - // in the integration suite where retries / longer timeouts apply. +describe("directory reconciliation", () => { + test("picks up a file written after init", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); - expect(index.size).toBe(0); + index.init(TMP_DIR); + expect((await index.list()).totalCount).toBe(0); writeConvFile({ - id: "watch1", + id: "fresh", createdAt: "2025-01-01T00:00:00.000Z", updatedAt: "2025-01-01T00:00:00.000Z", - title: "Watched file", - messages: [{ role: "user", content: "new message", timestamp: "2025-01-01T00:01:00.000Z" }], + title: "Written after init", + messages: [{ role: "user", content: "hi", timestamp: "2025-01-01T00:01:00.000Z" }], }); - // Drive the debounce-flush path directly. Mirrors what fs.watch's - // 500ms timer eventually invokes. - const priv = index as any; - priv.dir = TMP_DIR; - priv.pendingFiles.add("conv_watch1.jsonl"); - await priv.processPendingFiles(); - - expect(index.size).toBe(1); - expect(index.get("watch1")).toBeDefined(); - expect(index.get("watch1")!.title).toBe("Watched file"); + expect((await index.list()).totalCount).toBe(1); + const entry = await index.get("fresh"); + expect(entry).toBeDefined(); + expect(entry!.title).toBe("Written after init"); }); - test("removes deleted file from index when processPendingFiles runs", async () => { + test("drops a file that has been deleted", async () => { const filePath = writeConvFile({ - id: "watch_del", + id: "doomed", createdAt: "2025-01-01T00:00:00.000Z", updatedAt: "2025-01-01T00:00:00.000Z", title: "Will be deleted", @@ -477,43 +463,39 @@ describe("fs.watch integration", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); - expect(index.size).toBe(1); - expect(index.get("watch_del")).toBeDefined(); + index.init(TMP_DIR); + expect(await index.get("doomed")).toBeDefined(); - // Delete the file, then simulate a watch event by directly invoking - // processPendingFiles. This tests the cleanup logic without depending - // on fs.watch event delivery timing, which is unreliable under CI load. rmSync(filePath); - // Access private pendingFiles + processPendingFiles via the instance - const priv = index as any; - priv.dir = TMP_DIR; - priv.pendingFiles.add("conv_watch_del.jsonl"); - await priv.processPendingFiles(); - - expect(index.size).toBe(0); - expect(index.get("watch_del")).toBeUndefined(); + expect(await index.get("doomed")).toBeUndefined(); + expect((await index.list()).totalCount).toBe(0); }); - test("stopWatching cleans up", async () => { - const index = new ConversationIndex(); - await index.build(TMP_DIR); + test("re-reads a file when its content (and mtime) changes", async () => { + writeConvFile({ + id: "mutable", + createdAt: "2025-01-01T00:00:00.000Z", + updatedAt: "2025-01-01T00:00:00.000Z", + title: "Original", + messages: [{ role: "user", content: "first", timestamp: "2025-01-01T00:01:00.000Z" }], + }); - index.startWatching(TMP_DIR); - index.stopWatching(); + const index = new ConversationIndex(); + index.init(TMP_DIR); + expect((await index.get("mutable"))!.title).toBe("Original"); - // Write a file after stopping — should NOT be indexed + // Bump mtime so the mtime cache invalidates. Some filesystems quantize + // mtime to ms, so a same-ms rewrite can be invisible — wait a hair. + await new Promise((resolve) => setTimeout(resolve, 25)); writeConvFile({ - id: "after_stop", + id: "mutable", createdAt: "2025-01-01T00:00:00.000Z", updatedAt: "2025-01-01T00:00:00.000Z", - title: "Should not appear", - messages: [{ role: "user", content: "ignored", timestamp: "2025-01-01T00:01:00.000Z" }], + title: "Updated", + messages: [{ role: "user", content: "first", timestamp: "2025-01-01T00:01:00.000Z" }], }); - await new Promise((resolve) => setTimeout(resolve, 800)); - - expect(index.size).toBe(0); + expect((await index.get("mutable"))!.title).toBe("Updated"); }); }); diff --git a/test/unit/bundles/conversations/tools/export.test.ts b/test/unit/bundles/conversations/tools/export.test.ts index ef966dcb..efef6484 100644 --- a/test/unit/bundles/conversations/tools/export.test.ts +++ b/test/unit/bundles/conversations/tools/export.test.ts @@ -49,7 +49,6 @@ beforeEach(async () => { }); afterEach(() => { - index.stopWatching(); rmSync(TMP_DIR, { recursive: true, force: true }); }); @@ -62,7 +61,7 @@ async function setupConversation( const fname = filename ?? `${id}.jsonl`; const lines = [JSON.stringify(meta), ...messages.map((m) => JSON.stringify(m))]; writeTmpFile(fname, lines); - await index.build(TMP_DIR); + index.init(TMP_DIR); } // --------------------------------------------------------------------------- @@ -262,7 +261,7 @@ describe("handleExport — json", () => { describe("handleExport — errors", () => { test("throws for non-existent conversation ID", async () => { - await index.build(TMP_DIR); + index.init(TMP_DIR); expect( handleExport({ id: "conv_nonexistent", format: "markdown" }, index), diff --git a/test/unit/bundles/conversations/tools/fork.test.ts b/test/unit/bundles/conversations/tools/fork.test.ts index ef74fb6d..655f9d09 100644 --- a/test/unit/bundles/conversations/tools/fork.test.ts +++ b/test/unit/bundles/conversations/tools/fork.test.ts @@ -75,9 +75,9 @@ function writeSourceConversation(): string { return writeTmpFile(`${SOURCE_ID}.jsonl`, lines); } -async function buildIndex(): Promise { +function buildIndex(): ConversationIndex { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); return index; } diff --git a/test/unit/bundles/conversations/tools/get.test.ts b/test/unit/bundles/conversations/tools/get.test.ts index 6dbcf263..a644c43c 100644 --- a/test/unit/bundles/conversations/tools/get.test.ts +++ b/test/unit/bundles/conversations/tools/get.test.ts @@ -79,7 +79,6 @@ describe("conversations__get", () => { }); afterEach(() => { - index.stopWatching(); rmSync(dir, { recursive: true, force: true }); }); @@ -97,7 +96,7 @@ describe("conversations__get", () => { { role: "user", content: "Message 3", timestamp: ts }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-5msgs" }, index)) as { metadata: Record; @@ -129,7 +128,7 @@ describe("conversations__get", () => { { role: "user", content: "Fifth", timestamp: ts }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-limited", limit: 2 }, index)) as { messages: Array<{ role: string; content: string }>; @@ -143,7 +142,7 @@ describe("conversations__get", () => { }); it("returns isError response for non-existent conversation ID", async () => { - await index.build(dir); + index.init(dir); await expect(handleGet({ id: "does-not-exist" }, index)).rejects.toThrow( "Conversation not found: does-not-exist", @@ -181,7 +180,7 @@ describe("conversations__get", () => { }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-meta" }, index)) as { messages: Array<{ @@ -222,7 +221,7 @@ describe("conversations__get", () => { { role: "assistant", content: "a2", timestamp: ts }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleGet( { id: "conv-meta-only", expand: "metadata" }, @@ -248,7 +247,7 @@ describe("conversations__get", () => { timestamp: ts, })); writeConversation(dir, "conv-many", { messages }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-many" }, index)) as { messages: Array<{ content: string }>; @@ -275,7 +274,7 @@ describe("conversations__get", () => { timestamp: ts, })); writeConversation(dir, "conv-full", { messages }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-full", expand: "full" }, index)) as { messages: unknown[]; @@ -297,7 +296,7 @@ describe("conversations__get", () => { timestamp: ts, })); writeConversation(dir, "conv-bigwindow", { messages }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-bigwindow" }, index)) as { messages: Array<{ content: string }>; @@ -325,7 +324,7 @@ describe("conversations__get", () => { { role: "assistant", content: overCap, timestamp: ts }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleGet({ id: "conv-onehuge" }, index)) as { messages: Array<{ content: string }>; @@ -365,7 +364,7 @@ describe("conversations__get", () => { usage: { inputTokens: 100, outputTokens: 50, model: "claude-sonnet-4-6", llmMs: 1200 }, })); writeConversation(dir, "conv-fits-under-engine-cap", { messages }); - await index.build(dir); + index.init(dir); const result = await handleGet({ id: "conv-fits-under-engine-cap" }, index); const serialized = JSON.stringify(result, null, 2); diff --git a/test/unit/bundles/conversations/tools/list.test.ts b/test/unit/bundles/conversations/tools/list.test.ts index 7306049e..cdb48e4e 100644 --- a/test/unit/bundles/conversations/tools/list.test.ts +++ b/test/unit/bundles/conversations/tools/list.test.ts @@ -44,12 +44,12 @@ function writeConvFile(spec: ConvSpec): string { return path; } -async function buildIndex(specs: ConvSpec[]): Promise { +function buildIndex(specs: ConvSpec[]): ConversationIndex { for (const spec of specs) { writeConvFile(spec); } const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); return index; } @@ -240,7 +240,7 @@ describe("handleList", () => { test("empty directory returns empty array and totalCount 0", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleList({}, index); expect(result.conversations).toEqual([]); @@ -279,6 +279,24 @@ describe("handleList", () => { } }); + test("handleList reflects a file written after init (issue #155)", async () => { + // Pull-on-demand: each handleList call reconciles with disk. No + // debounce, no flush plumbing — the file just shows up. + const index = new ConversationIndex(); + index.init(TMP_DIR); + + writeConvFile({ + id: "fresh", + createdAt: "2025-05-01T00:00:00.000Z", + updatedAt: "2025-05-01T00:00:00.000Z", + title: "Written after init", + messages: [{ role: "user", content: "hello", timestamp: "2025-05-01T00:01:00.000Z" }], + }); + + const result = await handleList({}, index); + expect(result.conversations.find((c) => c.id === "fresh")).toBeDefined(); + }); + test("passes through all input fields to the index", async () => { const index = await buildIndex(CONVS); diff --git a/test/unit/bundles/conversations/tools/search.test.ts b/test/unit/bundles/conversations/tools/search.test.ts index f5fc6d54..f82ec05a 100644 --- a/test/unit/bundles/conversations/tools/search.test.ts +++ b/test/unit/bundles/conversations/tools/search.test.ts @@ -56,7 +56,6 @@ describe("conversations__search", () => { }); afterEach(() => { - index.stopWatching(); rmSync(dir, { recursive: true, force: true }); }); @@ -68,7 +67,7 @@ describe("conversations__search", () => { { role: "assistant", content: "Kubernetes is an orchestration platform for containers." }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "orchestration" }, index)) as { results: Array<{ id: string; matches: Array<{ snippet: string }> }>; @@ -86,7 +85,7 @@ describe("conversations__search", () => { { role: "assistant", content: "Here is the Auth configuration." }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "Auth" }, index)) as { results: Array<{ id: string; matches: Array<{ messageIndex: number }> }>; @@ -107,7 +106,7 @@ describe("conversations__search", () => { { role: "user", content: "deploy step 5" }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "deploy" }, index)) as { results: Array<{ id: string; matches: Array<{ messageIndex: number }> }>; @@ -124,7 +123,7 @@ describe("conversations__search", () => { writeConversation(dir, "conv-b", { messages: [{ role: "user", content: "database backup" }], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "database", limit: 1 }, index)) as { results: Array<{ id: string }>; @@ -134,7 +133,7 @@ describe("conversations__search", () => { }); it("throws error for empty query", async () => { - await index.build(dir); + index.init(dir); await expect(handleSearch({ query: "" }, index)).rejects.toThrow( "query is required and cannot be empty", @@ -142,7 +141,7 @@ describe("conversations__search", () => { }); it("throws error for whitespace-only query", async () => { - await index.build(dir); + index.init(dir); await expect(handleSearch({ query: " " }, index)).rejects.toThrow( "query is required and cannot be empty", @@ -156,7 +155,7 @@ describe("conversations__search", () => { { role: "assistant", content: "Greetings!" }, ], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "xyznonexistent" }, index)) as { results: Array; @@ -173,7 +172,7 @@ describe("conversations__search", () => { writeConversation(dir, "conv-ctx", { messages: [{ role: "user", content }], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "target_word" }, index)) as { results: Array<{ matches: Array<{ snippet: string }> }>; @@ -196,7 +195,7 @@ describe("conversations__search", () => { title: "My Important Chat", messages: [{ role: "user", content: "special keyword here" }], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "special keyword" }, index)) as { results: Array<{ id: string; title: string | null; matches: Array<{ snippet: string }> }>; @@ -209,7 +208,7 @@ describe("conversations__search", () => { }); it("returns empty results for empty directory", async () => { - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "anything" }, index)) as { results: Array; @@ -231,7 +230,7 @@ describe("conversations__search", () => { title: "Chat Z", messages: [{ role: "user", content: "no match here" }], }); - await index.build(dir); + index.init(dir); const result = (await handleSearch({ query: "shared term" }, index)) as { results: Array<{ id: string }>; diff --git a/test/unit/bundles/conversations/tools/stats.test.ts b/test/unit/bundles/conversations/tools/stats.test.ts index d01e790d..7c7c6fa5 100644 --- a/test/unit/bundles/conversations/tools/stats.test.ts +++ b/test/unit/bundles/conversations/tools/stats.test.ts @@ -92,7 +92,7 @@ afterEach(() => { describe("handleStats", () => { test("returns zeros for empty directory", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -119,7 +119,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -153,7 +153,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -205,7 +205,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -242,7 +242,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -269,7 +269,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "day" }, index); @@ -296,7 +296,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "week" }, index); @@ -322,7 +322,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); // No period specified — should default to "week" const result = await handleStats({}, index); @@ -333,7 +333,7 @@ describe("handleStats", () => { test("period.since and period.until are ISO strings", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "week" }, index); @@ -350,7 +350,7 @@ describe("handleStats", () => { test("period 'all' has empty since string", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -382,7 +382,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -391,7 +391,7 @@ describe("handleStats", () => { test("does not include USD cost in output", async () => { const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); @@ -413,7 +413,7 @@ describe("handleStats", () => { }); const index = new ConversationIndex(); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = await handleStats({ period: "all" }, index); diff --git a/test/unit/bundles/conversations/tools/update.test.ts b/test/unit/bundles/conversations/tools/update.test.ts index 9776a978..db5c153e 100644 --- a/test/unit/bundles/conversations/tools/update.test.ts +++ b/test/unit/bundles/conversations/tools/update.test.ts @@ -69,7 +69,7 @@ describe("handleUpdate", () => { const lines = [JSON.stringify(meta), ...messages.map((m) => JSON.stringify(m))]; writeTmpFile("conv_test001.jsonl", lines); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = (await handleUpdate({ id: "conv_test001", title: "New title" }, index)) as Record; @@ -90,7 +90,7 @@ describe("handleUpdate", () => { const lines = [JSON.stringify(meta), ...messages.map((m) => JSON.stringify(m))]; writeTmpFile("conv_test001.jsonl", lines); - await index.build(TMP_DIR); + index.init(TMP_DIR); await handleUpdate({ id: "conv_test001", title: "Updated" }, index); @@ -113,7 +113,7 @@ describe("handleUpdate", () => { const lines = [JSON.stringify(meta), ...messages.map((m) => JSON.stringify(m))]; writeTmpFile("conv_test001.jsonl", lines); - await index.build(TMP_DIR); + index.init(TMP_DIR); await handleUpdate({ id: "conv_test001", title: "Integrity check" }, index); @@ -144,7 +144,7 @@ describe("handleUpdate", () => { const lines = [JSON.stringify(meta), ...messages.map((m) => JSON.stringify(m))]; writeTmpFile("conv_test001.jsonl", lines); - await index.build(TMP_DIR); + index.init(TMP_DIR); await handleUpdate({ id: "conv_test001", title: "After" }, index); @@ -159,7 +159,7 @@ describe("handleUpdate", () => { // --------------------------------------------------------------------------- test("throws error for non-existent conversation ID", async () => { - await index.build(TMP_DIR); + index.init(TMP_DIR); await expect( handleUpdate({ id: "conv_nonexistent", title: "Nope" }, index), @@ -174,7 +174,7 @@ describe("handleUpdate", () => { const meta = makeMeta({ id: "conv_empty" }); writeTmpFile("conv_empty.jsonl", [JSON.stringify(meta)]); - await index.build(TMP_DIR); + index.init(TMP_DIR); const result = (await handleUpdate({ id: "conv_empty", title: "Empty conv" }, index)) as Record; From b43ca7d7bbcd4cb75779fe05d5e64c7e0feb30ef Mon Sep 17 00:00:00 2001 From: Mason <31372737+Ovaculos@users.noreply.github.com> Date: Wed, 20 May 2026 11:56:16 -0500 Subject: [PATCH 2/2] fix(conversations): broadcast new chat to list live A brand-new conversation did not appear in the Conversations list until the user refreshed or switched tabs (#155). The runtime emitted the data.changed signal onto the per-request chat sink, which never reaches the /v1/events SSE channel that useDataSync consumes. Route the emit through the runtime's default sink (the one api/server.ts wraps for /v1/events). Fire once right after the user message is persisted so the conversation surfaces with its first-message preview as the label, and once more after auto-title generation settles so the label flips to the generated title. Two adjacent issues surfaced during investigation and are filed separately: - #253: auto-generated titles often contain assistant response content instead of a short summary (pre-existing prompt-shape bug) - #254: mid-turn conversation switch bleeds the streaming response into the destination chat (pre-existing client-side state contamination) Supporting hardening in this commit, in service of the broadcast both reaching its destination and surviving along the way: - FaultIsolatedSink wraps each sink in the defaultEvents fan-out so a logger throw can't abort the SSE-broadcast wrap downstream. Engine-time sink chain stays loud. - generateTitle uses AbortSignal.timeout to hard-cap a hung fast-model call so the title-block .finally always fires. - Dead chat-stream data.changed branch removed from handlers.ts (nothing relays it now; confirmed via grep). Closes #155 Co-Authored-By: Claude Opus 4.7 (1M context) --- src/api/handlers.ts | 3 +- src/conversation/auto-title.ts | 4 + src/runtime/runtime.ts | 70 +++++++--- .../runtime/chat-start-event.test.ts | 128 ++++++++++++------ 4 files changed, 141 insertions(+), 64 deletions(-) diff --git a/src/api/handlers.ts b/src/api/handlers.ts index 90dd5a5a..d9d21609 100644 --- a/src/api/handlers.ts +++ b/src/api/handlers.ts @@ -163,8 +163,7 @@ export async function handleChatStream( event.type === "tool.preparing.done" || event.type === "tool.start" || event.type === "tool.done" || - event.type === "llm.done" || - event.type === "data.changed" + event.type === "llm.done" ) { if (event.type === "chat.start") { broadcastUserMessageOnce(); diff --git a/src/conversation/auto-title.ts b/src/conversation/auto-title.ts index 2330ac92..aa723900 100644 --- a/src/conversation/auto-title.ts +++ b/src/conversation/auto-title.ts @@ -1,5 +1,8 @@ import type { LanguageModelV3 } from "@ai-sdk/provider"; +/** Hard cap so a hung fast-model call can't leave the caller's promise pending. */ +const TITLE_TIMEOUT_MS = 10_000; + /** * Generate a short conversation title using the provided model. * Non-blocking — call fire-and-forget after first turn. @@ -28,6 +31,7 @@ export async function generateTitle( }, ], maxOutputTokens: 30, + abortSignal: AbortSignal.timeout(TITLE_TIMEOUT_MS), }); const textBlock = result.content.find((b) => b.type === "text"); if (textBlock?.type === "text") { diff --git a/src/runtime/runtime.ts b/src/runtime/runtime.ts index 6d3a6739..f2692d3d 100644 --- a/src/runtime/runtime.ts +++ b/src/runtime/runtime.ts @@ -133,6 +133,24 @@ class MultiEventSink implements EventSink { } } +/** + * Wraps a sink so a throw in `emit` is logged instead of propagating. Used + * only at the `defaultEvents` fan-out (`buildEventSink`) — server.ts wraps + * defaultEvents.emit to chain the SSE broadcast after the log writer; an + * unhandled throw in the log writer would otherwise abort the broadcast. + * Engine-time sink chains stay loud (a throwing engine sink is a bug). + */ +class FaultIsolatedSink implements EventSink { + constructor(private inner: EventSink) {} + emit(event: EngineEvent): void { + try { + this.inner.emit(event); + } catch (err) { + console.error("[runtime] event sink threw on emit:", err); + } + } +} + /** * Tracks parent engine run state for delegate context. * Listens to engine events to maintain current runId and iteration count. @@ -1071,22 +1089,21 @@ export class Runtime { }; engineConfig.toolPromotion = this.buildToolPromotionFactory(); - // Emit chat.start so the client knows the conversation ID immediately - // and conversation list UIs can refresh + // Emit chat.start so the client knows the conversation ID immediately. if (requestSink) { requestSink.emit({ type: "chat.start", data: { conversationId: conversation.id }, }); - // Notify conversation browser UIs that a new conversation exists - if (!request.conversationId) { - requestSink.emit({ - type: "data.changed", - data: { server: "conversations", tool: "list" }, - }); - } } + // Surface a new conversation the moment its file exists — the user + // message is already on disk by this point, so the list shows the + // conversation with the message preview as its label. Once title + // generation settles, the title-block `.finally` fires a second + // broadcast that flips the label to the generated title (#155). + if (!request.conversationId) this.notifyConversationsChanged(); + const result = await runWithRequestContext(reqCtx, () => engine.run(engineConfig, systemPrompt, messages, tools), ); @@ -1121,18 +1138,19 @@ export class Runtime { }); } - // Fire-and-forget title generation on first turn (use "fast" slot for cost savings) + // Fire-and-forget title generation on first turn (use "fast" slot for cost savings). + // The post-turn broadcast above already surfaced this conversation in the + // list (labelled with its message preview); broadcasting again from the + // `.finally` updates that label to the generated title. if (conversation.title === null) { const titleModel = this.resolveModelFn(this.getModelSlot("fast")); const titleInput = request.message || `[Uploaded: ${request.fileRefs?.map((f) => f.filename).join(", ") || "files"}]`; - void generateTitle(titleModel, titleInput, result.output).then( - (title) => { - void store.update(conversation.id, { title }); - }, - (err) => console.error("[runtime] title generation failed:", err), - ); + void generateTitle(titleModel, titleInput, result.output) + .then((title) => store.update(conversation.id, { title })) + .catch((err) => console.error("[runtime] title generation or persist failed:", err)) + .finally(() => this.notifyConversationsChanged()); } return { @@ -1491,6 +1509,20 @@ export class Runtime { return this.defaultEvents; } + /** + * Broadcast a conversations-list refresh on the runtime's default sink — + * the one api/server.ts wraps to drive the `/v1/events` SSE broadcast that + * `useDataSync` consumes. The per-request chat sink never reaches that + * channel, so a conversation created mid-chat would otherwise stay absent + * from the list until a manual refresh (#155). + */ + private notifyConversationsChanged(): void { + this.defaultEvents.emit({ + type: "data.changed", + data: { server: "conversations", tool: "list" }, + }); + } + /** * Get a per-workdir `InstructionsStore` for the org / workspace overlays. * Per-bundle instructions are NOT stored here — bundles own their storage @@ -2306,7 +2338,11 @@ function buildEventSink(config: RuntimeConfig): { logLevel: config.logging?.level ?? "normal", }); } - const events: EventSink = sinks.length > 0 ? new MultiEventSink(sinks) : new NoopEventSink(); + // Isolate each sink in the default fan-out: a logging throw must not abort + // later sinks in the chain (notably the SSE broadcast wrap in api/server.ts). + const isolated = sinks.map((s) => new FaultIsolatedSink(s)); + const events: EventSink = + isolated.length > 0 ? new MultiEventSink(isolated) : new NoopEventSink(); return { events, eventStore }; } diff --git a/test/integration/runtime/chat-start-event.test.ts b/test/integration/runtime/chat-start-event.test.ts index c46fa7b1..80d8cdf8 100644 --- a/test/integration/runtime/chat-start-event.test.ts +++ b/test/integration/runtime/chat-start-event.test.ts @@ -40,7 +40,7 @@ describe("chat.start event", () => { await runtime.shutdown(); }); - it("emits data.changed when a new conversation is created", async () => { + it("emits data.changed on the default sink when a new conversation is created", async () => { const workDir = join(testDir, "chat-start-new-conv"); mkdirSync(workDir, { recursive: true }); @@ -51,24 +51,50 @@ describe("chat.start event", () => { }); await provisionTestWorkspace(runtime); - const events: EngineEvent[] = []; - const sink: EventSink = { emit: (e) => events.push(e) }; - - // No conversationId provided — triggers new conversation creation - await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID }, sink); - - const dataChangedEvents = events.filter((e) => e.type === "data.changed"); - expect(dataChangedEvents.length).toBeGreaterThanOrEqual(1); - - const convListChange = dataChangedEvents.find( - (e) => e.data.server === "conversations" && e.data.tool === "list", - ); - expect(convListChange).toBeDefined(); - - await runtime.shutdown(); + // Capture events on the runtime's default sink — the one api/server.ts + // wraps to drive the `/v1/events` SSE broadcast (`useDataSync`). A + // data.changed that only reaches the per-request chat sink never gets + // there, so conversation-list iframes stay stale (issue #155). + const sseEvents: EngineEvent[] = []; + const defaultSink = runtime.getEventSink(); + const origEmit = defaultSink.emit.bind(defaultSink); + defaultSink.emit = (e) => { + sseEvents.push(e); + origEmit(e); + }; + + try { + // No conversationId provided — triggers new conversation creation. + await runtime.chat({ message: "Hello", workspaceId: TEST_WORKSPACE_ID }); + + const isConvListChange = (e: EngineEvent) => + e.type === "data.changed" && + e.data.server === "conversations" && + e.data.tool === "list"; + + // Two broadcasts per new conversation: one in the post-turn `finally` + // (surfaces it immediately, labelled with the message preview) and + // one after fire-and-forget title generation settles (updates the + // label to the generated title). + const deadline = Date.now() + 5000; + while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + // Let any further (regression) broadcasts land before asserting the count. + await new Promise((resolve) => setTimeout(resolve, 400)); + + // >=2 not ===2: the assertion guards against the conversation-list + // signal regressing to nothing, but stays robust to unrelated + // data.changed emits (e.g. non-nb tool calls fanning out via + // api/server.ts) that may legitimately arrive during a real turn. + expect(sseEvents.filter(isConvListChange).length).toBeGreaterThanOrEqual(2); + } finally { + defaultSink.emit = origEmit; + await runtime.shutdown(); + } }); - it("does NOT emit data.changed when resuming an existing conversation", async () => { + it("does NOT emit data.changed on the default sink when resuming an existing conversation", async () => { const workDir = join(testDir, "chat-start-resume"); mkdirSync(workDir, { recursive: true }); @@ -79,39 +105,51 @@ describe("chat.start event", () => { }); await provisionTestWorkspace(runtime); - // First chat — creates a new conversation - const first = await runtime.chat({ - message: "First message", - workspaceId: TEST_WORKSPACE_ID, - }); - - // Second chat — resume existing conversation, capture events - const events: EngineEvent[] = []; - const sink: EventSink = { emit: (e) => events.push(e) }; + // Wrap before the first chat so the title-gen broadcast is observable. + // A fixed pre-second-chat sleep would race the first turn's async title + // generation under CI load — poll for both first-turn broadcasts instead. + const sseEvents: EngineEvent[] = []; + const defaultSink = runtime.getEventSink(); + const origEmit = defaultSink.emit.bind(defaultSink); + defaultSink.emit = (e) => { + sseEvents.push(e); + origEmit(e); + }; + + try { + const isConvListChange = (e: EngineEvent) => + e.type === "data.changed" && + e.data.server === "conversations" && + e.data.tool === "list"; - await runtime.chat( - { + // First chat — creates a new conversation; emits twice (post-turn + post-title). + const first = await runtime.chat({ + message: "First message", + workspaceId: TEST_WORKSPACE_ID, + }); + + const deadline = Date.now() + 5000; + while (Date.now() < deadline && sseEvents.filter(isConvListChange).length < 2) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + const countAfterFirst = sseEvents.filter(isConvListChange).length; + expect(countAfterFirst).toBeGreaterThanOrEqual(2); + + // Second chat — resume existing conversation. Not new, title set → + // neither the post-turn nor the title-gen path should fire. + await runtime.chat({ message: "Second message", conversationId: first.conversationId, workspaceId: TEST_WORKSPACE_ID, - }, - sink, - ); + }); - // chat.start should still be emitted - const chatStartEvents = events.filter((e) => e.type === "chat.start"); - expect(chatStartEvents).toHaveLength(1); - expect(chatStartEvents[0]!.data.conversationId).toBe(first.conversationId); - - // data.changed with server=conversations should NOT be emitted - const convListChange = events.filter( - (e) => - e.type === "data.changed" && - e.data.server === "conversations" && - e.data.tool === "list", - ); - expect(convListChange).toHaveLength(0); + // Allow any straggler broadcast to land before asserting unchanged. + await new Promise((resolve) => setTimeout(resolve, 400)); - await runtime.shutdown(); + expect(sseEvents.filter(isConvListChange).length).toBe(countAfterFirst); + } finally { + defaultSink.emit = origEmit; + await runtime.shutdown(); + } }); });