Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ export async function handleChatStream(
event.type === "tool.preparing.done" ||
event.type === "tool.start" ||
event.type === "tool.done" ||
event.type === "llm.done" ||
event.type === "data.changed"
event.type === "llm.done"
) {
if (event.type === "chat.start") {
broadcastUserMessageOnce();
Expand Down
254 changes: 128 additions & 126 deletions src/bundles/conversations/src/index-cache.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/**
* In-memory index of conversation metadata for fast listing, searching, and filtering.
* Pull-on-demand conversation index.
*
* Built on startup by scanning all JSONL file headers. Invalidated by fs.watch()
* on the conversations directory (debounced 500ms).
*
* Types are defined locally — no imports from the runtime codebase.
* Each call reconciles in-memory state with the directory: stat every
* conversation file, re-read headers only for new or mtime-changed files,
* drop entries for files that have disappeared. There is no `fs.watch`, no
* debounce, and no broadcast-vs-debounce race — `list` and `get` always
* return disk-truth.
*/

import { type FSWatcher, watch } from "node:fs";
import { basename, join } from "node:path";
import { statSync } from "node:fs";
import { listConversationFiles, readConversationHeader } from "./jsonl-reader.ts";

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -43,78 +43,37 @@ export interface ListResult {
totalCount: number;
}

interface CachedEntry {
entry: IndexEntry;
mtimeMs: number;
}

// ---------------------------------------------------------------------------
// ConversationIndex
// ---------------------------------------------------------------------------

export class ConversationIndex {
private entries: Map<string, IndexEntry> = new Map();
/** Maps filename (e.g. "conv_abc.jsonl") to conversation ID for fast fs.watch lookups. */
private fileToId: Map<string, string> = new Map();
/** filePath → cached entry + last-seen mtime. */
private cache: Map<string, CachedEntry> = new Map();
/** id → filePath. Enables `get(id)` to stat one file instead of walking. */
private byId: Map<string, string> = new Map();
private dir: string | null = null;
private watcher: FSWatcher | null = null;
private debounceTimer: ReturnType<typeof setTimeout> | null = null;
private pendingFiles: Set<string> = new Set();

/** Build index by scanning all .jsonl files in dir. Reads only headers (line 1 + preview). */
async build(dir: string): Promise<void> {
this.dir = dir;
this.entries.clear();
this.fileToId.clear();

const files = listConversationFiles(dir);

for (const filePath of files) {
await this.indexFile(filePath);
}
}

/** Start fs.watch on dir. On change, debounce 500ms, then re-read affected file header. */
startWatching(dir: string): void {
this.stopWatching();
/** Point the index at a directory. No I/O until the first list/get. */
init(dir: string): void {
this.dir = dir;

this.watcher = watch(dir, (_eventType, filename) => {
if (!filename?.endsWith(".jsonl")) return;

this.pendingFiles.add(filename);

if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
}

this.debounceTimer = setTimeout(() => {
this.processPendingFiles();
}, 500);
});
}

/** Stop watching. */
stopWatching(): void {
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
this.pendingFiles.clear();
}

/** List conversations with pagination, sorting, date filtering, search. */
list(options?: ListOptions): ListResult {
let items = [...this.entries.values()];
async list(options?: ListOptions): Promise<ListResult> {
let items = await this.reconcile();

// Search filter: case-insensitive substring on title + preview
if (options?.search) {
const q = options.search.toLowerCase();
items = items.filter(
(e) => (e.title?.toLowerCase().includes(q) ?? false) || e.preview.toLowerCase().includes(q),
);
}

// Date filtering
if (options?.dateFrom) {
const from = options.dateFrom;
items = items.filter((e) => e.createdAt >= from);
Expand All @@ -124,19 +83,14 @@ export class ConversationIndex {
items = items.filter((e) => e.createdAt <= to);
}

// Sorting (descending — newest first)
const sortBy = options?.sortBy ?? "updated";
const sortKey = sortBy === "created" ? "createdAt" : "updatedAt";
items.sort((a, b) => b[sortKey].localeCompare(a[sortKey]));

const totalCount = items.length;

// Cursor pagination: skip entries up to and including the cursor ID
if (options?.cursor) {
const idx = items.findIndex((e) => e.id === options.cursor);
if (idx >= 0) {
items = items.slice(idx + 1);
}
if (idx >= 0) items = items.slice(idx + 1);
}

const limit = options?.limit ?? 20;
Expand All @@ -147,55 +101,34 @@ export class ConversationIndex {
return { conversations: page, nextCursor, totalCount };
}

/** Get a single entry by ID. */
get(id: string): IndexEntry | undefined {
return this.entries.get(id);
}

/** Total conversation count. */
get size(): number {
return this.entries.size;
}

// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------

private async indexFile(filePath: string): Promise<void> {
const header = await readConversationHeader(filePath);
if (!header) return;

const entry: IndexEntry = {
id: header.meta.id,
title: header.meta.title,
createdAt: header.meta.createdAt,
updatedAt: header.meta.updatedAt,
messageCount: header.messageCount,
totalInputTokens: header.meta.totalInputTokens,
totalOutputTokens: header.meta.totalOutputTokens,
lastModel: header.meta.lastModel,
preview: header.preview,
filePath,
};

this.entries.set(entry.id, entry);
this.fileToId.set(basename(filePath), entry.id);
}

private async processPendingFiles(): Promise<void> {
if (!this.dir) return;

const files = [...this.pendingFiles];
this.pendingFiles.clear();

for (const filename of files) {
const filePath = join(this.dir, filename);

// Try to read the header. If the file was deleted, readConversationHeader returns null.
const header = await readConversationHeader(filePath);

if (header) {
// File exists — update/add entry
/**
* Get a single entry by ID.
*
* Fast path: when the id has been seen before, stat one file and reuse
* the cached entry if its mtime is unchanged (or re-read just that
* header on a change). Avoids the full directory walk for the common
* case where `handleGet`/`handleFork`/`handleUpdate` operate on a
* conversation the index already knows about.
*
* Slow path: id unknown (first encounter, or the cached file
* disappeared) — full reconcile, then a Map lookup.
*/
async get(id: string): Promise<IndexEntry | undefined> {
if (!this.dir) return undefined;

const knownPath = this.byId.get(id);
if (knownPath) {
try {
const mtimeMs = statSync(knownPath).mtimeMs;
const cached = this.cache.get(knownPath);
if (cached && cached.mtimeMs === mtimeMs) return cached.entry;

const header = await readConversationHeader(knownPath);
if (!header) {
this.cache.delete(knownPath);
this.byId.delete(id);
return undefined;
}
const entry: IndexEntry = {
id: header.meta.id,
title: header.meta.title,
Expand All @@ -206,18 +139,87 @@ export class ConversationIndex {
totalOutputTokens: header.meta.totalOutputTokens,
lastModel: header.meta.lastModel,
preview: header.preview,
filePath,
filePath: knownPath,
};
this.entries.set(entry.id, entry);
this.fileToId.set(filename, entry.id);
} else {
// File was deleted or became unreadable — remove from index
const id = this.fileToId.get(filename);
if (id) {
this.entries.delete(id);
this.fileToId.delete(filename);
}
this.cache.set(knownPath, { entry, mtimeMs });
this.byId.set(entry.id, knownPath);
return entry;
} catch {
// File gone since we cached it. Forget the stale mapping and fall
// through to a full reconcile in case the id moved.
this.cache.delete(knownPath);
this.byId.delete(id);
}
}

await this.reconcile();
const path = this.byId.get(id);
return path ? this.cache.get(path)?.entry : undefined;
}

// ---------------------------------------------------------------------------
// Internal
// ---------------------------------------------------------------------------

/**
* Reconcile cached state with the directory:
* - stat every conversation file (sync, cheap)
* - reuse cached entry if mtime is unchanged
* - re-read header for new or modified files
* - drop entries whose files no longer exist
*
* Concurrent callers do their own reconcile; cache writes are idempotent
* by file path, so the worst case is a redundant header read.
*/
private async reconcile(): Promise<IndexEntry[]> {
if (!this.dir) return [];

const files = listConversationFiles(this.dir);
const present = new Set(files);
const entries: IndexEntry[] = [];

for (const filePath of files) {
let mtimeMs: number;
try {
mtimeMs = statSync(filePath).mtimeMs;
} catch {
// Race: file removed between listing and stat. Skip.
continue;
}

const cached = this.cache.get(filePath);
if (cached && cached.mtimeMs === mtimeMs) {
entries.push(cached.entry);
continue;
}

const header = await readConversationHeader(filePath);
if (!header) continue;

const entry: IndexEntry = {
id: header.meta.id,
title: header.meta.title,
createdAt: header.meta.createdAt,
updatedAt: header.meta.updatedAt,
messageCount: header.messageCount,
totalInputTokens: header.meta.totalInputTokens,
totalOutputTokens: header.meta.totalOutputTokens,
lastModel: header.meta.lastModel,
preview: header.preview,
filePath,
};
this.cache.set(filePath, { entry, mtimeMs });
this.byId.set(entry.id, filePath);
entries.push(entry);
}

for (const [cachedPath, cached] of this.cache) {
if (!present.has(cachedPath)) {
this.cache.delete(cachedPath);
this.byId.delete(cached.entry.id);
}
}

return entries;
}
}
10 changes: 3 additions & 7 deletions src/bundles/conversations/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,10 @@ async function routeToolCall(
async function main(): Promise<void> {
log(`Starting with conversations dir: ${CONVERSATIONS_DIR}`);

// Build the in-memory index from JSONL file headers
// Pull-on-demand index: each tool call reconciles with disk via mtime cache.
const index = new ConversationIndex();
await index.build(CONVERSATIONS_DIR);
log(`Indexed ${index.size} conversations`);

// Start watching for file changes
index.startWatching(CONVERSATIONS_DIR);
index.init(CONVERSATIONS_DIR);
log(`Index initialized at ${CONVERSATIONS_DIR}`);

// Create MCP server
const server = new Server(
Expand Down Expand Up @@ -245,7 +242,6 @@ async function main(): Promise<void> {
// Clean shutdown
const shutdown = async () => {
log("Shutting down...");
index.stopWatching();
await server.close();
process.exit(0);
};
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/conversations/src/tools/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ function exportMarkdown(
}

export async function handleExport(input: ExportInput, index: ConversationIndex): Promise<object> {
const entry = index.get(input.id);
const entry = await index.get(input.id);
if (!entry) {
throw new Error(`Conversation not found: ${input.id}`);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/conversations/src/tools/fork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface ForkInput {
}

export async function handleFork(input: ForkInput, index: ConversationIndex): Promise<object> {
const entry = index.get(input.id);
const entry = await index.get(input.id);
if (!entry) {
throw new Error(`Conversation not found: ${input.id}`);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/conversations/src/tools/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function selectByCharCap(messages: unknown[], cap: number): GetMessagesResult {
}

export async function handleGet(input: GetInput, index: ConversationIndex): Promise<object> {
const entry = index.get(input.id);
const entry = await index.get(input.id);
if (!entry) {
throw new Error(`Conversation not found: ${input.id}`);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bundles/conversations/src/tools/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export async function handleSearch(input: SearchInput, index: ConversationIndex)
const limit = input.limit ?? DEFAULT_LIMIT;
const queryLower = query.toLowerCase();

const allConversations = index.list({ limit: index.size || 1 });
const allConversations = await index.list({ limit: Number.MAX_SAFE_INTEGER });
const results: SearchResult[] = [];

for (const entry of allConversations.conversations) {
Expand Down
4 changes: 2 additions & 2 deletions src/bundles/conversations/src/tools/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ export async function handleStats(
const untilIso = now.toISOString();

// Get all conversations matching the date range using the index
const listResult = index.list({
limit: 999999,
const listResult = await index.list({
limit: Number.MAX_SAFE_INTEGER,
dateFrom: sinceIso || undefined,
dateTo: untilIso,
sortBy: "created",
Expand Down
Loading