diff --git a/AGENTS.md b/AGENTS.md index 883d0152..bea8f109 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -363,6 +363,8 @@ Routing requests to the process owning a session's transport is the **load balan 3. `sessionStore.type: "redis"`. Each tenant gets its own Redis instance in its own namespace (see `infra/CLAUDE.md` per-tenant Redis pattern). Default `nb:mcp:session:` keyPrefix is correct under that model. 4. `platform.strategy.type: RollingUpdate`. Only after (1). +**Known limitation under `replicas > 1`: RunBus is single-process.** Chat turn replay/resume (the SSE-stream-backed viewer attaches to a per-conversation event log) lives in-memory on the pod that started the turn. A viewer landing on a different pod sees `isActive:false` for an in-flight turn elsewhere and the live frames don't fan out cross-pod. Sticky routing on `Mcp-Session-Id` (prereq #2) mitigates for the active tab; a pod restart or any cross-pod viewer (other tab/device) still drops resume mid-turn. The clustered Redis-backed RunBus is deferred work, tracked in `src/runtime/run-bus.ts` — `serve` warns at boot when `sessionStore.type === "redis"` so the gap is visible. + **TTL units: seconds at the surface, ms internally.** Operator-facing: `MCP_SESSION_TTL_SECONDS` env (highest priority) > `sessionStore.ttlSeconds` config > 8h default. Conversion to ms happens in `Runtime.getSessionStoreTtlMs()` only — registry constructors and the host's idle sweep both take ms from there. Don't add mixed-unit code elsewhere. ## MCP App Bridge Rules diff --git a/scripts/check-code-style.ts b/scripts/check-code-style.ts index 6b07af62..935f7259 100644 --- a/scripts/check-code-style.ts +++ b/scripts/check-code-style.ts @@ -58,6 +58,13 @@ function checkNoInlineTypeImports(): CheckResult { // this skip the check passes in CI but fails on a developer's machine. if (file.split(/[\\/]/).includes("node_modules")) continue; const rel = relative(ROOT, file); + // Skip bundle subtrees (their UIs have their own conventions, per the + // doc comment) and vendored deps. `bun run build:bundles` installs + // node_modules under each bundle's UI, so an unfiltered walk picks + // up thousands of vendored `.d.ts` violations that have nothing to + // do with our source. + if (rel.includes("/node_modules/")) continue; + if (rel.startsWith("src/bundles/")) continue; const content = readFileSync(file, "utf-8"); const source = ts.createSourceFile(file, content, ts.ScriptTarget.Latest, true); diff --git a/src/adapters/structured-log-sink.ts b/src/adapters/structured-log-sink.ts index 0d37a270..acecbaf6 100644 --- a/src/adapters/structured-log-sink.ts +++ b/src/adapters/structured-log-sink.ts @@ -39,6 +39,10 @@ export class StructuredLogSink implements EventSink { private conversationId: string | undefined; private userId: string | undefined; private workspaceId: string | undefined; + /** True after a write failure surfaced a console.warn; reset on the next + * successful write so a recurring failure (after intermittent recovery) + * warns again. Avoids spamming during a sustained outage. */ + private writeWarned = false; constructor(config: StructuredLogConfig) { this.dir = config.dir; @@ -92,7 +96,23 @@ export class StructuredLogSink implements EventSink { private writeLine(record: Record): void { const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD const filename = `nimblebrain-${today}.jsonl`; - appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`); + try { + appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`); + this.writeWarned = false; + } catch (err) { + // Best-effort logging: a write failure (disk full, perms, or a detached + // turn emitting after the workdir was torn down) must never throw into + // the event-emit path and crash the caller. Surface the first failure of + // an episode so operators see disk/perms incidents; suppress until a + // subsequent success re-arms. + if (!this.writeWarned) { + this.writeWarned = true; + console.warn( + `[structured-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`, + err instanceof Error ? err.message : err, + ); + } + } } /** Remove log files older than the retention threshold. */ diff --git a/src/adapters/workspace-log-sink.ts b/src/adapters/workspace-log-sink.ts index 788b60d6..353b79d1 100644 --- a/src/adapters/workspace-log-sink.ts +++ b/src/adapters/workspace-log-sink.ts @@ -44,6 +44,8 @@ export interface WorkspaceLogConfig { */ export class WorkspaceLogSink implements EventSink { private dir: string; + /** First-failure-of-an-episode flag — see structured-log-sink. */ + private writeWarned = false; constructor(config: WorkspaceLogConfig) { this.dir = join(config.dir, "workspace"); @@ -65,7 +67,23 @@ export class WorkspaceLogSink implements EventSink { const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD const filename = `${today}.jsonl`; - appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`); + try { + appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`); + this.writeWarned = false; + } catch (err) { + // Best-effort logging: a write failure (disk full, perms, or a detached + // turn emitting after the workdir was torn down) must never throw into + // the event-emit path and crash the caller. Surface the first failure of + // an episode so operators see disk/perms incidents; suppress until a + // subsequent success re-arms. + if (!this.writeWarned) { + this.writeWarned = true; + console.warn( + `[workspace-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`, + err instanceof Error ? err.message : err, + ); + } + } } /** No-op — kept for API compatibility. Writes are synchronous. */ diff --git a/src/api/conversation-events.ts b/src/api/conversation-events.ts index 368110e6..95b41aa2 100644 --- a/src/api/conversation-events.ts +++ b/src/api/conversation-events.ts @@ -8,6 +8,8 @@ * Separate from SseEventManager which handles workspace-level events. */ +import type { BufferedRunEvent } from "../runtime/run-bus.ts"; + /** A subscriber watching a specific conversation's events. */ interface ConversationSubscriber { id: string; @@ -19,6 +21,13 @@ interface ConversationSubscriber { const encoder = new TextEncoder(); +/** Format an SSE frame. `seq`, when present, is sent as the `id:` line so a + * reconnecting viewer can resume from its last-seen sequence number. */ +function frame(eventType: string, data: unknown, seq?: number): Uint8Array { + const idLine = seq != null ? `id: ${seq}\n` : ""; + return encoder.encode(`event: ${eventType}\n${idLine}data: ${JSON.stringify(data)}\n\n`); +} + export class ConversationEventManager { private subscribers = new Map(); private heartbeatTimer: ReturnType | null = null; @@ -73,6 +82,8 @@ export class ConversationEventManager { addSubscriber( conversationId: string, userId: string, + replay?: BufferedRunEvent[], + meta?: { isActive: boolean; activeSeq: number }, ): { stream: ReadableStream; subscriberId: string } { const id = crypto.randomUUID(); let sub: ConversationSubscriber; @@ -80,9 +91,24 @@ export class ConversationEventManager { const stream = new ReadableStream({ start: (controller) => { sub = { id, userId, conversationId, controller, closed: false }; + // The subscribed frame tells the client whether a turn is in flight + // (so it can trim a stale in-flight turn from disk history before the + // RunBus replay rebuilds it) and its current seq. + controller.enqueue( + frame("subscribed", { + subscriberId: id, + isActive: meta?.isActive ?? false, + activeSeq: meta?.activeSeq ?? 0, + }), + ); + // Replay the in-flight turn (if any) BEFORE registering for live + // fan-out. start() runs synchronously and we add to the subscribers + // map only after replaying, so no live event can interleave ahead of + // the replay — viewers never see out-of-order deltas. + if (replay) { + for (const e of replay) controller.enqueue(frame(e.type, e.data, e.seq)); + } this.subscribers.set(id, sub); - const subscribedMsg = `event: subscribed\ndata: ${JSON.stringify({ subscriberId: id })}\n\n`; - controller.enqueue(encoder.encode(subscribedMsg)); }, cancel: () => { this.removeSubscriber(id); @@ -92,6 +118,29 @@ export class ConversationEventManager { return { stream, subscriberId: id }; } + /** + * Fan out a live run event (with its sequence number) to every subscriber + * of the conversation. The seq lets viewers de-duplicate against replay and + * resume after a reconnect. + */ + publishEvent(conversationId: string, event: BufferedRunEvent): void { + const encoded = frame(event.type, event.data, event.seq); + for (const [id, sub] of this.subscribers) { + if (sub.closed) { + this.subscribers.delete(id); + continue; + } + if (sub.conversationId !== conversationId) continue; + try { + sub.controller.enqueue(encoded); + } catch (err) { + console.warn("[conversation-events] SSE write failed:", err); + this.closeSub(sub); + this.subscribers.delete(id); + } + } + } + /** Remove a specific subscriber. */ removeSubscriber(subscriberId: string): void { const sub = this.subscribers.get(subscriberId); @@ -121,6 +170,11 @@ export class ConversationEventManager { * explicit policy gates; until then, this is the only exclusion * shape needed. * + * Seq-less: unlike {@link publishEvent} (the RunBus path), these frames carry + * no `id:` sequence. A seq-tracking `conversation-stream` viewer applies them + * live but can't replay/resume them. Only `/v1/chat` + `/v1/chat/stream` use + * this; the web shell is RunBus-only. + * * @param conversationId - Target conversation * @param eventType - SSE event type (e.g. "text.delta", "user.message") * @param data - Event data payload diff --git a/src/api/events.ts b/src/api/events.ts index 235da077..e2f96572 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -59,6 +59,11 @@ const SSE_ROUTES: Partial> = { // existing "broadcast to all clients in this process" behavior to avoid // silently breaking iframe refresh. Revisit when payload grows wsId. "data.changed": { scope: "global" }, + // Live conversation-title update (auto-title generation completes after the + // turn). Workspace-scoped via the conversation's workspaceId breadcrumb so + // it doesn't leak across tenants. The shell routes it to the matching + // conversation slice by `conversationId`. + "conversation.title": { scope: "workspace", wsIdField: "wsId" }, // Org-level config (model preferences, feature flags). Affects every // workspace; broadcast to all. "config.changed": { scope: "global" }, diff --git a/src/api/handlers.ts b/src/api/handlers.ts index 9d458734..4b9cd33c 100644 --- a/src/api/handlers.ts +++ b/src/api/handlers.ts @@ -7,6 +7,7 @@ import type { EngineEvent, EventSink } from "../engine/types.ts"; import { ingestFiles, isAllowedMime, type UploadedFile } from "../files/ingest.ts"; import type { FileEntry } from "../files/types.ts"; import type { IdentityProvider, UserIdentity } from "../identity/provider.ts"; +import { DEV_IDENTITY } from "../identity/providers/dev.ts"; import { ConversationAccessDeniedError, ConversationCorruptedError, @@ -154,6 +155,88 @@ function runInProgressResponse(conversationId: string): Response { ); } +/** + * Handle POST /v1/chat/start — kick off a detached, server-authoritative turn + * and return the conversation id immediately. The turn runs to completion on + * the server regardless of this request's lifecycle (closing the tab does NOT + * cancel it). Clients watch the turn via GET /v1/conversations/:id/events, + * which replays the in-flight turn then tails live. + */ +export async function handleChatStart( + request: Request, + runtime: Runtime, + features: ResolvedFeatures, + identity?: UserIdentity, + workspaceId?: string, +): Promise { + const parsed = await parseChatBody(request, runtime, features, identity, workspaceId); + if (parsed instanceof Response) return parsed; + try { + const { conversationId } = await runtime.startTurn(parsed); + return Response.json({ conversationId }); + } catch (err) { + if (err instanceof RunInProgressError) { + return runInProgressResponse(parsed.conversationId ?? ""); + } + if (err instanceof ConversationAccessDeniedError) { + return apiError( + 403, + "conversation_access_denied", + "You do not have access to this conversation.", + { conversationId: parsed.conversationId }, + ); + } + // startTurn → store.load can throw on a pre-migration (ownerless) + // conversation. Map to 422 — parity with handleChat / handleChatCancel — + // instead of leaking a raw 500. + if (err instanceof ConversationCorruptedError) { + return conversationCorruptedResponse(err); + } + throw err; + } +} + +/** + * Handle POST /v1/conversations/:id/cancel — the explicit Stop button. The + * ONLY thing that aborts generation; client disconnect does not. Ownership is + * enforced (same posture as the events route). + */ +export async function handleChatCancel( + conversationId: string, + runtime: Runtime, + identity?: UserIdentity, +): Promise { + const callerId = identity?.id ?? (runtime.getIdentityProvider() ? null : DEV_IDENTITY.id); + if (!callerId) { + return apiError(401, "authentication_required", "Authentication required."); + } + const conversation = await runtime.findConversation(conversationId).catch((err) => { + if (err instanceof ConversationCorruptedError) return err; + throw err; + }); + if (conversation instanceof ConversationCorruptedError) { + return apiError(422, "conversation_corrupted", conversation.message, { + conversationId: conversation.conversationId, + reason: conversation.reason, + }); + } + if (!conversation) { + return apiError(404, "not_found", "Conversation not found"); + } + if (conversation.ownerId !== callerId) { + return apiError( + 403, + "conversation_access_denied", + "You do not have access to this conversation.", + { + conversationId, + }, + ); + } + const cancelled = runtime.cancelTurn(conversationId); + return Response.json({ cancelled }); +} + function conversationAccessDeniedResponse(conversationId: string): Response { return apiError( 403, diff --git a/src/api/routes/chat.ts b/src/api/routes/chat.ts index cb4d2201..cc350e2c 100644 --- a/src/api/routes/chat.ts +++ b/src/api/routes/chat.ts @@ -1,5 +1,5 @@ import { Hono } from "hono"; -import { handleChat, handleChatStream } from "../handlers.ts"; +import { handleChat, handleChatCancel, handleChatStart, handleChatStream } from "../handlers.ts"; import { requireAuth } from "../middleware/auth.ts"; import { bodyLimit } from "../middleware/body-limit.ts"; import { errorLog } from "../middleware/error-log.ts"; @@ -23,28 +23,40 @@ export function chatRoutes(ctx: AppContext) { // value flows through `handleChat` into `ChatRequest.workspaceId` as the // *focused* workspace, scoping the prompt briefing (installed apps + // house rules). See `handlers.ts::parseChatBody`. - return new Hono() - .use("*", requireAuth(ctx.authOptions)) - .use("*", optionalWorkspace(ctx.workspaceStore)) - .use("*", errorLog(ctx)) - .post("/v1/chat", chatBodyLimit, rl, (c) => - handleChat( - c.req.raw, - ctx.runtime, - ctx.features, - c.var.identity, - c.var.workspaceId, - ctx.conversationEventManager, - ), - ) - .post("/v1/chat/stream", chatBodyLimit, rl, (c) => - handleChatStream( - c.req.raw, - ctx.runtime, - ctx.features, - c.var.identity, - c.var.workspaceId, - ctx.conversationEventManager, - ), - ); + return ( + new Hono() + .use("*", requireAuth(ctx.authOptions)) + .use("*", optionalWorkspace(ctx.workspaceStore)) + .use("*", errorLog(ctx)) + .post("/v1/chat", chatBodyLimit, rl, (c) => + handleChat( + c.req.raw, + ctx.runtime, + ctx.features, + c.var.identity, + c.var.workspaceId, + ctx.conversationEventManager, + ), + ) + .post("/v1/chat/stream", chatBodyLimit, rl, (c) => + handleChatStream( + c.req.raw, + ctx.runtime, + ctx.features, + c.var.identity, + c.var.workspaceId, + ctx.conversationEventManager, + ), + ) + // Server-authoritative entry point: starts a detached turn and returns + // the conversation id immediately. The client then watches via + // GET /v1/conversations/:id/events. Generation survives client disconnect. + .post("/v1/chat/start", chatBodyLimit, rl, (c) => + handleChatStart(c.req.raw, ctx.runtime, ctx.features, c.var.identity, c.var.workspaceId), + ) + // Explicit Stop — the only way to abort an in-flight turn. + .post("/v1/conversations/:id/cancel", (c) => + handleChatCancel(c.req.param("id"), ctx.runtime, c.var.identity), + ) + ); } diff --git a/src/api/routes/conversation-events.ts b/src/api/routes/conversation-events.ts index 0b1c775b..0a73430f 100644 --- a/src/api/routes/conversation-events.ts +++ b/src/api/routes/conversation-events.ts @@ -102,13 +102,28 @@ export function conversationEventRoutes(ctx: AppContext) { ); } - // Create SSE stream for this subscriber. The first frame - // (event: subscribed) carries the server-generated subscriberId - // so the client can pass it back as `X-Origin-Subscriber-Id` on - // any chat-stream POST it originates — that prevents the - // chat-stream's broadcast from echoing back to this same - // subscription. - const { stream } = ctx.conversationEventManager.addSubscriber(conversationId, callerId); + // Resume point: the client passes the highest sequence number it has + // already rendered (0 / absent = full replay of the in-flight turn). + const afterSeqRaw = c.req.query("afterSeq"); + const afterSeq = afterSeqRaw ? Number.parseInt(afterSeqRaw, 10) : 0; + const replay = ctx.runtime.getTurnReplay( + conversationId, + Number.isFinite(afterSeq) ? afterSeq : 0, + ); + + // Create the SSE stream. The manager replays the buffered in-flight turn + // (events with seq > afterSeq) before registering for live fan-out, so a + // page refresh reconstructs the in-progress assistant message and then + // tails the rest with no gap or duplication. + const { stream } = ctx.conversationEventManager.addSubscriber( + conversationId, + callerId, + replay, + { + isActive: ctx.runtime.isTurnActive(conversationId), + activeSeq: ctx.runtime.turnSeq(conversationId), + }, + ); return new Response(stream, { headers: { diff --git a/src/api/server.ts b/src/api/server.ts index 8a5ce362..ca42d8ac 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -73,6 +73,13 @@ export function startServer(options: ServerOptions): ServerHandle { const conversationEventManager = new ConversationEventManager(); conversationEventManager.start(); + // Bridge detached-turn events (RunBus) to the per-conversation SSE manager + // so connected viewers tail live. Replay-on-connect is sourced separately + // from the RunBus buffer (see the conversation-events route). + runtime.onTurnEvent = (conversationId, event) => { + conversationEventManager.publishEvent(conversationId, event); + }; + // Login rate limiter — per-IP brute-force protection const rateLimiter = new LoginRateLimiter(); rateLimiter.start(); diff --git a/src/bundles/conversations/CHANGELOG.md b/src/bundles/conversations/CHANGELOG.md index cb2721be..ba8315aa 100644 --- a/src/bundles/conversations/CHANGELOG.md +++ b/src/bundles/conversations/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +## 0.4.0 + +### Changed + +- List no longer flickers on live updates. `data.changed` refreshes now run in + the background (no skeleton swap) and only for conversation changes — other + apps' `data.changed` are ignored. Initial load and view switches still show + the skeleton. + +## 0.3.0 + +### Added + +- Live streaming indicator: a pulsing dot marks any conversation with an + in-flight assistant turn. Driven by host-pushed `streamingConversationIds` + (`useHostContext`), so it reflects real-time tab state without polling. + ## 0.2.0 **Breaking — tool output shape.** The bundle now returns a display-oriented diff --git a/src/bundles/conversations/src/index-cache.ts b/src/bundles/conversations/src/index-cache.ts index c937e2dd..cb4d7d30 100644 --- a/src/bundles/conversations/src/index-cache.ts +++ b/src/bundles/conversations/src/index-cache.ts @@ -105,6 +105,29 @@ export class ConversationIndex { }); } + /** + * Bring the index up to date NOW, bypassing the fs.watch debounce. + * + * Processes any queued watch events immediately, then scans the directory + * for files not yet indexed (a just-created conversation whose watch event + * hasn't fired or debounced yet). Called on the read path so a + * `data.changed`-driven list refresh reflects a brand-new conversation + * deterministically, instead of racing the 500ms watch debounce. + */ + async flushPending(): Promise { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = null; + } + await this.processPendingFiles(); + if (!this.dir) return; + for (const filePath of listConversationFiles(this.dir)) { + if (!this.fileToId.has(basename(filePath))) { + await this.indexFile(filePath); + } + } + } + /** Stop watching. */ stopWatching(): void { if (this.watcher) { diff --git a/src/bundles/conversations/src/jsonl-reader.ts b/src/bundles/conversations/src/jsonl-reader.ts index f15a8ab6..faaf4dcf 100644 --- a/src/bundles/conversations/src/jsonl-reader.ts +++ b/src/bundles/conversations/src/jsonl-reader.ts @@ -47,6 +47,13 @@ export interface DisplayMessage { files?: DisplayFile[]; /** Non-"complete" run terminations bubble up here ("max_iterations", "error"). */ stopReason?: string; + /** + * True when this assistant turn has no terminal event yet (no run.done / + * run.error) — i.e. the run was still in flight when the file was read. Lets + * a live viewer tell a partial disk snapshot from a finished turn and decide + * whether to reconcile against the server's replay. + */ + pending?: boolean; } export type DisplayBlock = @@ -452,6 +459,10 @@ function collectRun( let endTs = events[start]?.ts ?? ""; let stopReason: string | undefined; + // A run is "terminated" only when we see its run.done/run.error. If the loop + // runs out of events first, the turn was still in flight when the file was + // read → mark the message pending. + let terminated = false; let i = start + 1; while (i < events.length) { @@ -459,12 +470,14 @@ function collectRun( if (isRunDone(inner) && inner.runId === runId) { endTs = inner.ts; stopReason = inner.stopReason; + terminated = true; i++; break; } if (isRunError(inner) && inner.runId === runId) { endTs = inner.ts; stopReason = "error"; + terminated = true; i++; break; } @@ -588,6 +601,7 @@ function collectRun( ...(flatToolCalls.length > 0 ? { toolCalls: flatToolCalls } : {}), usage, ...(stopReason && stopReason !== "complete" ? { stopReason } : {}), + ...(terminated ? {} : { pending: true }), }; return [msg, i]; } diff --git a/src/bundles/conversations/ui/src/App.tsx b/src/bundles/conversations/ui/src/App.tsx index 2b23c1bb..d0e86196 100644 --- a/src/bundles/conversations/ui/src/App.tsx +++ b/src/bundles/conversations/ui/src/App.tsx @@ -3,7 +3,7 @@ import { Dashboard } from "./Dashboard"; export function App() { return ( - + ); diff --git a/src/bundles/conversations/ui/src/ConversationList.tsx b/src/bundles/conversations/ui/src/ConversationList.tsx index 8a7f433c..23a9e305 100644 --- a/src/bundles/conversations/ui/src/ConversationList.tsx +++ b/src/bundles/conversations/ui/src/ConversationList.tsx @@ -10,6 +10,8 @@ interface Props { groups: DateGroup[]; activeFilter: FilterKey; totalConversations: number; + /** Conversation ids with an in-flight assistant turn (host-pushed). */ + streamingIds?: Set; onOpen: (id: string) => void; } @@ -18,6 +20,7 @@ export function ConversationList({ groups, activeFilter, totalConversations, + streamingIds, onOpen, }: Props) { if (loading) { @@ -60,10 +63,21 @@ export function ConversationList({ {showSectionLabels &&
{group.label}
} {group.items.map((c) => { const title = c.title || c.preview || c.id; + const isStreaming = streamingIds?.has(c.id) ?? false; return ( - + {disabled && onStop ? ( + + ) : ( + + )} diff --git a/web/src/components/MessageList.tsx b/web/src/components/MessageList.tsx index e2871d86..fae1e220 100644 --- a/web/src/components/MessageList.tsx +++ b/web/src/components/MessageList.tsx @@ -180,15 +180,22 @@ function useSmartScroll(messages: ChatMessage[]) { prevConversationKeyRef.current = conversationKey; prevMessageCountRef.current = messages.length; - // Conversation loaded (different conversation or first load with history) + // Conversation loaded (different conversation or first load with history): + // land at the bottom (most recent turn), like ChatGPT/Claude. if (conversationKey !== prevKey && messages.length > 1) { - // Use double-rAF to ensure DOM has rendered the messages + // Use double-rAF to ensure the DOM has rendered the messages. Scroll the + // last real message to the viewport bottom (not the trailing 60vh + // spacer / bottomRef, which would leave the last turn off-screen). requestAnimationFrame(() => { requestAnimationFrame(() => { - scrollRef.current?.scrollTo({ top: 0, behavior: "instant" }); + const container = scrollRef.current; + const inner = container?.firstElementChild; + const lastMsg = inner?.children[messages.length - 1] as HTMLElement | undefined; + if (lastMsg) lastMsg.scrollIntoView({ behavior: "instant", block: "end" }); + else container?.scrollTo({ top: container.scrollHeight, behavior: "instant" }); }); }); - setIsAtBottom(false); + setIsAtBottom(true); return; } diff --git a/web/src/components/SlotRenderer.tsx b/web/src/components/SlotRenderer.tsx index 4ac72025..a5102677 100644 --- a/web/src/components/SlotRenderer.tsx +++ b/web/src/components/SlotRenderer.tsx @@ -1,4 +1,4 @@ -import { useEffect, useRef } from "react"; +import { useEffect, useRef, useSyncExternalStore } from "react"; import { getResources, uiPathFromUri } from "../api/client"; import type { BridgeHandle } from "../bridge/bridge"; import { createBridge } from "../bridge/bridge"; @@ -7,6 +7,7 @@ import { createAppIframe } from "../bridge/iframe"; import type { UiChatContext } from "../bridge/types"; import { useTheme } from "../context/ThemeContext"; import { useWorkspaceContext } from "../context/WorkspaceContext"; +import { chatStore } from "../hooks/chat-store"; import type { PlacementEntry } from "../types"; interface SlotRendererProps { @@ -61,6 +62,17 @@ export function SlotRenderer({ const forceRefreshRef = useRef(forceRefresh); forceRefreshRef.current = forceRefresh; + // Conversations currently streaming an assistant turn in this tab. Pushed + // into hostContext so the conversations list can show a per-row indicator. + // The store identity is stable between membership changes, so this only + // re-pushes when a conversation starts/stops streaming — not per delta. + const streamingIds = useSyncExternalStore( + chatStore.subscribeStreamingIds, + chatStore.getStreamingIds, + ); + const streamingIdsRef = useRef(streamingIds); + streamingIdsRef.current = streamingIds; + const filtered = routeFilter ? placements.filter((p) => p.route === routeFilter) : placements; // Stable key: only re-mount iframes when the actual placements change @@ -111,7 +123,11 @@ export function SlotRenderer({ onNavigate: (...args) => onNavigateRef.current?.(...args), onPromptAction: (...args) => onPromptActionRef.current?.(...args), getHostExtensions: () => - buildHostExtensions(workspaceRef.current, forceRefreshRef.current), + buildHostExtensions( + workspaceRef.current, + forceRefreshRef.current, + streamingIdsRef.current, + ), }); bridges.push(bridge); } catch (err) { @@ -140,11 +156,11 @@ export function SlotRenderer({ // mounted; apps that observe `useHostContext()` (or `useTheme()`) re-render // and refetch workspace-scoped data without losing local state. useEffect(() => { - const ctx = buildHostContext(mode, activeWorkspace); + const ctx = buildHostContext(mode, activeWorkspace, streamingIds); for (const bridge of bridgesRef.current) { bridge.setHostContext(ctx); } - }, [mode, activeWorkspace]); + }, [mode, activeWorkspace, streamingIds]); if (filtered.length === 0) return null; diff --git a/web/src/components/chat/Composer.tsx b/web/src/components/chat/Composer.tsx index 85cca768..b0ff31b5 100644 --- a/web/src/components/chat/Composer.tsx +++ b/web/src/components/chat/Composer.tsx @@ -24,6 +24,8 @@ export interface ComposerProps { disabled: boolean; onNewConversation?: () => void; streamingState?: StreamingState; + /** Stop the in-flight turn — wired to the Stop button in MessageInput. */ + onStop?: () => void; /** * Hide the footer (e.g. for embedded / popover compositions where * vertical space is at a premium). Default: show. @@ -36,6 +38,7 @@ export function Composer({ disabled, onNewConversation, streamingState, + onStop, hideFooter = false, }: ComposerProps) { return ( @@ -45,6 +48,7 @@ export function Composer({ disabled={disabled} onNewConversation={onNewConversation} streamingState={streamingState} + onStop={onStop} /> {!hideFooter && } diff --git a/web/src/context/ChatContext.tsx b/web/src/context/ChatContext.tsx index 01ac29a0..39ff3ea2 100644 --- a/web/src/context/ChatContext.tsx +++ b/web/src/context/ChatContext.tsx @@ -1,10 +1,18 @@ import type { ReactNode } from "react"; -import { createContext, useCallback, useContext, useEffect, useMemo, useState } from "react"; +import { + createContext, + useCallback, + useContext, + useEffect, + useMemo, + useRef, + useState, +} from "react"; import { useLocation } from "react-router-dom"; import { callTool } from "../api/client"; +import { chatStore } from "../hooks/chat-store"; import type { UseChatReturn } from "../hooks/useChat"; import { useChat } from "../hooks/useChat"; -import { useConversationEvents } from "../hooks/useConversationEvents"; import type { AppContext, ConfigInfo } from "../types"; import { useWorkspaceContext } from "./WorkspaceContext"; @@ -72,6 +80,19 @@ export function ChatProvider({ : null; const chat = useChat(initialConversationId, currentUserId, focusWorkspaceId); + // Drop every cached conversation slice when the signed-in user changes + // (logout → login as someone else in the same tab). Conversations are + // user-scoped, so a workspace switch must NOT reset — only an identity + // change. The store is a module singleton that outlives this provider's + // remounts, so stale slices would otherwise leak across users. + const prevUserRef = useRef(currentUserId); + useEffect(() => { + if (prevUserRef.current !== currentUserId) { + chatStore.reset(); + prevUserRef.current = currentUserId; + } + }, [currentUserId]); + // Dev helper: window.__nb.simulateError("some error message") useEffect(() => { if (!import.meta.env.DEV) return; @@ -165,21 +186,10 @@ export function ChatProvider({ } }, []); - // Same-user cross-tab sync (Stage 1 single-owner). Stage 4 widens - // the audience when sharing returns. - useConversationEvents(chat.conversationId, { - onRemoteUserMessage: (data) => { - chat.injectRemoteUserMessage(data.userId, data.displayName, data.content); - }, - onRemoteStreamEvent: (type, data) => { - chat.processRemoteStreamEvent(type, data); - }, - onReconnect: () => { - if (chat.conversationId) { - chat.loadConversation(chat.conversationId); - } - }, - }); + // Cross-tab / refresh sync is now handled by the per-conversation turn + // stream itself (server-authoritative): every viewer attaches to + // GET /v1/conversations/:id/events, which replays the in-flight turn and + // tails live. No separate remote-event bridge needed. const wrappedSendMessage = useCallback( (text: string, appContext?: AppContext, files?: File[]) => { diff --git a/web/src/hooks/chat-store.ts b/web/src/hooks/chat-store.ts new file mode 100644 index 00000000..429d0445 --- /dev/null +++ b/web/src/hooks/chat-store.ts @@ -0,0 +1,1016 @@ +import { + callTool, + cancelChatTurn, + getAuthToken, + startChatTurn, + startChatTurnMultipart, +} from "../api/client"; +import { + type ConversationStreamConnection, + connectConversationStream, +} from "../api/conversation-stream"; +import { formatSendError } from "../api/format-error"; +import type { + AppContext, + ChatRequest, + ChatResult, + LlmDoneEvent, + ReasoningDeltaEvent, + StreamErrorEvent, + TextDeltaEvent, + ToolDoneEvent, + ToolPreparingEvent, + ToolStartEvent, +} from "../types"; + +// =========================================================================== +// Public display types (shared across the chat UI). These live here — not in +// useChat — because the slice store is the lowest layer that owns them and +// `useChat` re-exports them for backward-compatible imports. +// =========================================================================== + +export type StreamingState = + | null + | "thinking" + | "streaming" + | "preparing" + | "working" + | "analyzing"; + +/** Identifies the tool the model is currently building a call for. */ +export interface PreparingTool { + id: string; + name: string; +} + +/** Typed tool result shape forwarded through the bridge. */ +export interface ToolResultForUI { + content: Array<{ type: string; text?: string; [key: string]: unknown }>; + structuredContent?: Record; + isError: boolean; +} + +/** Tool call with UI state for streaming display. */ +export interface ToolCallDisplay { + id: string; + name: string; + status: "running" | "done" | "error"; + ok?: boolean; + ms?: number; + resourceUri?: string; + resourceLinks?: Array<{ + uri: string; + name?: string; + mimeType?: string; + description?: string; + }>; + result?: ToolResultForUI; + input?: Record; + appName?: string; +} + +/** A block in the assistant message stream — text, reasoning, or tool group. */ +export type ContentBlock = + | { type: "text"; text: string } + | { type: "reasoning"; text: string } + | { type: "tool"; toolCalls: ToolCallDisplay[] }; + +/** Live iteration progress during streaming. */ +export interface IterationProgress { + n: number; + inputTokens: number; + outputTokens: number; +} + +/** File metadata attached to a message. */ +export interface MessageFileAttachment { + id: string; + filename: string; + mimeType: string; + size: number; + extracted: boolean; +} + +/** A chat message with ordered content blocks for display. */ +export interface ChatMessage { + role: "user" | "assistant"; + content: string; + blocks?: ContentBlock[]; + toolCalls?: ToolCallDisplay[]; + iteration?: IterationProgress; + timestamp?: string; + userId?: string; + files?: MessageFileAttachment[]; + stopReason?: string; + /** Loaded-from-disk turn with no terminal event yet (run still in flight when + * read). Drives the resume reconcile — a partial snapshot vs a finished turn. */ + pending?: boolean; + error?: string; + usage?: { + inputTokens: number; + outputTokens: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + reasoningTokens?: number; + model: string; + llmMs: number; + }; +} + +/** Conversation-level metadata (Stage 1: single-owner only). */ +export interface LoadedConversationMeta { + ownerId?: string; +} + +// =========================================================================== +// Snapshot — the immutable view a React component renders for one conversation. +// =========================================================================== + +export interface ChatSnapshot { + conversationId: string | null; + /** Server-generated conversation title (null until generated/loaded). */ + title: string | null; + messages: ChatMessage[]; + isStreaming: boolean; + streamingState: StreamingState; + preparingTool: PreparingTool | null; + meta: LoadedConversationMeta | null; + error: string | null; +} + +const EMPTY_MESSAGES: ChatMessage[] = []; +const EMPTY_SNAPSHOT: ChatSnapshot = { + conversationId: null, + title: null, + messages: EMPTY_MESSAGES, + isStreaming: false, + streamingState: null, + preparingTool: null, + meta: null, + error: null, +}; + +// =========================================================================== +// Slice — mutable per-conversation viewer state. +// +// The server is authoritative: a turn runs to completion server-side and its +// events are published to a per-conversation stream. This slice is a *view* +// over that stream plus the persisted history. Switching away / refreshing +// just detaches; re-attaching replays the in-flight turn (issue #254 + +// server-authoritative streaming follow-up). +// =========================================================================== + +interface ConversationSlice { + keys: Set; + conversationId: string | null; + title: string | null; + messages: ChatMessage[]; + isStreaming: boolean; + streamingState: StreamingState; + preparingTool: PreparingTool | null; + meta: LoadedConversationMeta | null; + error: string | null; + // streaming scratch + blocks: ContentBlock[]; + toolCalls: ToolCallDisplay[]; + iteration?: IterationProgress; + // live subscription to the server turn stream (null when detached) + connection: ConversationStreamConnection | null; + /** The next streamed `user.message` echoes a turn we optimistically added — + * consume it instead of appending a duplicate. */ + pendingEcho: boolean; + /** First `subscribed` frame of a resume should trim a stale in-flight turn + * from disk history (the replay rebuilds it). */ + resumeOnSubscribe: boolean; + /** True once full history is loaded (loadConversation) or the conversation + * was authored in this session (sendTurn / new draft). A dot-only probe + * leaves it false so opening the conversation still fetches full history. */ + hydrated: boolean; + lastActiveAt: number; + snapshot: ChatSnapshot; +} + +export interface StartTurnHooks { + onConversationId?: (id: string) => void; +} + +export interface StartTurnParams { + text: string; + appContext?: AppContext; + model?: string; + files?: File[]; + currentUserId?: string; +} + +// --------------------------------------------------------------------------- +// Pure helpers +// --------------------------------------------------------------------------- + +function cloneBlocks(blocks: ContentBlock[]): ContentBlock[] { + return blocks.map((b) => { + if (b.type === "tool") return { ...b, toolCalls: [...b.toolCalls] }; + return { ...b }; + }); +} + +function textFromBlocks(blocks: ContentBlock[]): string { + return blocks + .filter((b): b is ContentBlock & { type: "text" } => b.type === "text") + .map((b) => b.text) + .join(""); +} + +function wrapStringResult(text: string, isError = false): ToolResultForUI { + return { content: [{ type: "text", text }], isError }; +} + +const updateTool = + (evt: ToolDoneEvent) => + (tc: ToolCallDisplay): ToolCallDisplay => + tc.id === evt.id + ? { + ...tc, + status: evt.ok ? ("done" as const) : ("error" as const), + ok: evt.ok, + ms: evt.ms, + resourceUri: tc.resourceUri ?? evt.resourceUri, + resourceLinks: + evt.resourceLinks != null && evt.resourceLinks.length > 0 + ? evt.resourceLinks + : tc.resourceLinks, + result: evt.result != null ? (evt.result as ToolResultForUI) : tc.result, + } + : tc; + +// --------------------------------------------------------------------------- +// Key helpers +// --------------------------------------------------------------------------- + +const DRAFT_PREFIX = "draft:"; +let draftCounter = 0; + +export function freshDraftKey(): string { + draftCounter += 1; + return `${DRAFT_PREFIX}${draftCounter}`; +} + +export function isDraftKey(key: string): boolean { + return key.startsWith(DRAFT_PREFIX); +} + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +const MAX_SLICES = 30; + +export interface ChatStore { + ensureSlice(key: string, opts?: { conversationId?: string | null }): void; + getSnapshot(key: string): ChatSnapshot; + subscribeSlice(key: string, cb: () => void): () => void; + getStreamingIds(): string[]; + subscribeStreamingIds(cb: () => void): () => void; + markActive(key: string): void; + markInactive(key: string): void; + /** Send a message: start a server turn, then watch its stream. */ + sendTurn(key: string, params: StartTurnParams, hooks?: StartTurnHooks): Promise; + /** Load persisted history and attach to any in-flight turn. */ + loadConversation(id: string): Promise; + /** Probe whether a conversation is generating (restores dots on reload), + * without fetching message history. */ + probeConversation(id: string): void; + /** Set a conversation's title (from the live `conversation.title` SSE). + * No-op if the conversation has no slice in this tab. */ + setTitle(conversationId: string, title: string): void; + /** Stop an in-flight turn (the only thing that aborts generation). */ + cancelTurn(key: string): void; + retryLastMessage(key: string): string | null; + simulateError(key: string, message: string): void; + reset(): void; + sliceCount(): number; +} + +export function createChatStore(): ChatStore { + const byKey = new Map(); + const allSlices = new Set(); + const listeners = new Map void>>(); + const activeCounts = new Map(); + + let streamingIds: string[] = []; + const streamingListeners = new Set<() => void>(); + + // -- snapshot + notification -- + + function buildSnapshot(slice: ConversationSlice): ChatSnapshot { + return { + conversationId: slice.conversationId, + title: slice.title, + messages: slice.messages, + isStreaming: slice.isStreaming, + streamingState: slice.streamingState, + preparingTool: slice.preparingTool, + meta: slice.meta, + error: slice.error, + }; + } + + function notifyKey(key: string): void { + const set = listeners.get(key); + if (!set) return; + for (const cb of set) cb(); + } + + function recomputeStreamingIds(): void { + const ids = new Set(); + for (const slice of allSlices) { + if (slice.isStreaming && slice.conversationId) ids.add(slice.conversationId); + } + const next = [...ids].sort(); + if (next.length !== streamingIds.length || next.some((id, i) => id !== streamingIds[i])) { + streamingIds = next; + for (const cb of streamingListeners) cb(); + } + } + + function commit(slice: ConversationSlice): void { + slice.snapshot = buildSnapshot(slice); + for (const key of slice.keys) notifyKey(key); + recomputeStreamingIds(); + } + + // -- slice lifecycle -- + + function isActive(slice: ConversationSlice): boolean { + for (const key of slice.keys) { + if ((activeCounts.get(key) ?? 0) > 0) return true; + } + return false; + } + + function removeSlice(slice: ConversationSlice): void { + slice.connection?.close(); + slice.connection = null; + for (const key of slice.keys) byKey.delete(key); + allSlices.delete(slice); + } + + function evict(): void { + if (allSlices.size <= MAX_SLICES) return; + const idle = [...allSlices] + .filter((s) => !s.isStreaming && !isActive(s)) + .sort((a, b) => a.lastActiveAt - b.lastActiveAt); + let over = allSlices.size - MAX_SLICES; + for (const s of idle) { + if (over <= 0) break; + removeSlice(s); + over -= 1; + } + } + + function createSlice(key: string, conversationId: string | null): ConversationSlice { + const slice: ConversationSlice = { + keys: new Set([key]), + conversationId, + title: null, + messages: [], + isStreaming: false, + streamingState: null, + preparingTool: null, + meta: null, + error: null, + blocks: [], + toolCalls: [], + iteration: undefined, + connection: null, + pendingEcho: false, + resumeOnSubscribe: false, + // A fresh draft is fully "loaded" (empty IS its full history); a slice + // keyed by a real conversation id starts unhydrated until fetched. + hydrated: isDraftKey(key), + lastActiveAt: Date.now(), + snapshot: EMPTY_SNAPSHOT, + }; + slice.snapshot = buildSnapshot(slice); + byKey.set(key, slice); + allSlices.add(slice); + evict(); + return slice; + } + + function ensureSlice(key: string, opts?: { conversationId?: string | null }): void { + const existing = byKey.get(key); + if (existing) { + existing.lastActiveAt = Date.now(); + return; + } + const convId = + opts && "conversationId" in opts + ? (opts.conversationId ?? null) + : isDraftKey(key) + ? null + : key; + createSlice(key, convId); + } + + function aliasSlice(slice: ConversationSlice, conversationId: string): void { + if (slice.keys.has(conversationId)) return; + slice.keys.add(conversationId); + byKey.set(conversationId, slice); + } + + // -- streaming scratch -- + + function resetScratch(slice: ConversationSlice): void { + slice.blocks = []; + slice.toolCalls = []; + slice.iteration = undefined; + } + + function assistantFromScratch(slice: ConversationSlice): ChatMessage { + return { + role: "assistant", + content: textFromBlocks(slice.blocks), + blocks: cloneBlocks(slice.blocks), + toolCalls: [...slice.toolCalls], + iteration: slice.iteration ? { ...slice.iteration } : undefined, + }; + } + + function flush(slice: ConversationSlice): void { + const updated = [...slice.messages]; + updated[updated.length - 1] = assistantFromScratch(slice); + slice.messages = updated; + commit(slice); + } + + /** Drop the trailing in-flight turn (last user message + anything after). */ + function trimTrailingTurn(slice: ConversationSlice): void { + for (let i = slice.messages.length - 1; i >= 0; i--) { + if (slice.messages[i].role === "user") { + slice.messages = slice.messages.slice(0, i); + return; + } + } + } + + /** True when the loaded disk tail is an unfinished turn — a trailing user + * message (no assistant yet) or an assistant flagged `pending` (read before + * its run.done). Distinguishes a partial snapshot from a finished turn. */ + function hasPendingTail(slice: ConversationSlice): boolean { + const last = slice.messages[slice.messages.length - 1]; + if (!last) return false; + return last.role === "user" || last.pending === true; + } + + // -- subscription -- + + function closeConnection(slice: ConversationSlice): void { + slice.connection?.close(); + slice.connection = null; + } + + function openConnection(slice: ConversationSlice, conversationId: string, resume: boolean): void { + closeConnection(slice); + slice.resumeOnSubscribe = resume; + // When a resume finds no active turn, the server may still replay the most + // recent (already-finished) turn from its grace buffer. Those events would + // re-append a turn that's already in the loaded disk history → duplicate. + // Drop them once we know this connection isn't watching a live turn. + let dropEvents = false; + slice.connection = connectConversationStream({ + conversationId, + token: getAuthToken() ?? undefined, + afterSeq: 0, + onSubscribed: (info) => { + if (slice.resumeOnSubscribe) { + slice.resumeOnSubscribe = false; + const pendingTail = hasPendingTail(slice); + if (info.isActive || (pendingTail && info.activeSeq > 0)) { + // Either a live turn, OR a turn that finished in the load→subscribe + // window but is still retained in the RunBus grace buffer (partial + // disk tail + a retained run). Either way the replay carries the + // full trailing turn — trim the stale/partial disk copy so the + // replay rebuilds it without duplicating. For a live turn, reflect + // the streaming indicator immediately; for a just-finished one the + // replayed `done` finalizes it. + trimTrailingTurn(slice); + resetScratch(slice); + if (info.isActive) { + slice.isStreaming = true; + if (!slice.streamingState) slice.streamingState = "thinking"; + } + commit(slice); + return; + } + if (pendingTail) { + // Partial disk tail but the run is gone (grace GC'd) — no replay can + // complete it. Refetch the now-complete transcript. + dropEvents = true; + closeConnection(slice); + void loadConversation(conversationId); + return; + } + if (!slice.isStreaming) { + // Complete disk tail (or idle) — ignore any stray grace-buffer + // replay; it would duplicate (and flicker) a turn already fully on + // disk. + dropEvents = true; + closeConnection(slice); + return; + } + } + // Server-authoritative reconcile: the server says no turn is running, + // but we still think we're streaming. Happens when a viewer reconnects + // after the turn ended while disconnected past the RunBus grace window + // — the terminal frame was GC'd, so it will never replay and the spinner + // would hang forever. Clear it. A terminal frame still within grace + // arrives in the replay that follows and finalizes the content; if it + // was GC'd, the slice keeps its last-seen partial (a reload fetches the + // final) — either way we stop hanging. + if (!info.isActive && slice.isStreaming) { + slice.isStreaming = false; + slice.streamingState = null; + slice.preparingTool = null; + commit(slice); + } + }, + onEvent: (type, data) => { + if (dropEvents) return; + applyStreamEvent(slice, type, data); + }, + onError: () => { + // Leave the slice intact; the persisted history still renders. + }, + }); + } + + // -- stream reducer -- + + function applyStreamEvent(slice: ConversationSlice, type: string, data: unknown): void { + switch (type) { + case "user.message": { + const evt = data as { content: string; userId?: string; timestamp?: string }; + resetScratch(slice); + if (slice.pendingEcho) { + // Our optimistic user message + assistant placeholder are already in + // place; the deltas will fill the placeholder. + slice.pendingEcho = false; + } else { + const userMsg: ChatMessage = { + role: "user", + content: evt.content, + ...(evt.timestamp ? { timestamp: evt.timestamp } : {}), + ...(evt.userId ? { userId: evt.userId } : {}), + }; + const assistantMsg: ChatMessage = { + role: "assistant", + content: "", + blocks: [], + toolCalls: [], + timestamp: new Date().toISOString(), + }; + slice.messages = [...slice.messages, userMsg, assistantMsg]; + } + slice.isStreaming = true; + slice.streamingState = "thinking"; + commit(slice); + break; + } + case "chat.start": { + const evt = data as { conversationId: string }; + if (evt.conversationId && slice.conversationId !== evt.conversationId) { + slice.conversationId = evt.conversationId; + aliasSlice(slice, evt.conversationId); + commit(slice); + } + break; + } + case "text.delta": { + const evt = data as TextDeltaEvent; + slice.streamingState = "streaming"; + slice.preparingTool = null; + const last = slice.blocks[slice.blocks.length - 1]; + if (last && last.type === "text") last.text += evt.text; + else slice.blocks.push({ type: "text", text: evt.text }); + flush(slice); + break; + } + case "reasoning.delta": { + const evt = data as ReasoningDeltaEvent; + slice.streamingState = "streaming"; + slice.preparingTool = null; + const last = slice.blocks[slice.blocks.length - 1]; + if (last && last.type === "reasoning") last.text += evt.text; + else slice.blocks.push({ type: "reasoning", text: evt.text }); + flush(slice); + break; + } + case "tool.preparing": { + const evt = data as ToolPreparingEvent; + slice.streamingState = "preparing"; + slice.preparingTool = { id: evt.id, name: evt.name }; + commit(slice); + break; + } + case "tool.preparing.done": + break; + case "tool.start": { + const evt = data as ToolStartEvent; + slice.streamingState = "working"; + slice.preparingTool = null; + const separatorIdx = evt.name.indexOf("__"); + const newTool: ToolCallDisplay = { + id: evt.id, + name: evt.name, + status: "running", + resourceUri: evt.resourceUri, + input: evt.input, + appName: separatorIdx !== -1 ? evt.name.slice(0, separatorIdx) : undefined, + }; + slice.toolCalls = [...slice.toolCalls, newTool]; + const last = slice.blocks[slice.blocks.length - 1]; + if (last && last.type === "tool") last.toolCalls = [...last.toolCalls, newTool]; + else slice.blocks.push({ type: "tool", toolCalls: [newTool] }); + flush(slice); + break; + } + case "tool.done": { + const evt = data as ToolDoneEvent; + const updater = updateTool(evt); + slice.toolCalls = slice.toolCalls.map(updater); + for (const block of slice.blocks) { + if (block.type === "tool") block.toolCalls = block.toolCalls.map(updater); + } + const anyRunning = slice.toolCalls.some((tc) => tc.status === "running"); + slice.streamingState = anyRunning ? "working" : "analyzing"; + flush(slice); + break; + } + case "llm.done": { + const evt = data as LlmDoneEvent; + slice.iteration = { + n: (slice.iteration?.n ?? 0) + 1, + inputTokens: (slice.iteration?.inputTokens ?? 0) + (evt.usage?.inputTokens ?? 0), + outputTokens: (slice.iteration?.outputTokens ?? 0) + (evt.usage?.outputTokens ?? 0), + }; + flush(slice); + break; + } + case "done": { + const result = data as ChatResult; + slice.streamingState = null; + slice.preparingTool = null; + slice.isStreaming = false; + + if (result.toolCalls) { + const outputMap = new Map(result.toolCalls.map((tc) => [tc.id, tc.output])); + const backfill = (tc: ToolCallDisplay): ToolCallDisplay => { + if (tc.result != null) return tc; + const output = outputMap.get(tc.id); + return output != null ? { ...tc, result: wrapStringResult(output) } : tc; + }; + for (const block of slice.blocks) { + if (block.type === "tool") block.toolCalls = block.toolCalls.map(backfill); + } + slice.toolCalls = slice.toolCalls.map(backfill); + } + + const finalBlocks = cloneBlocks(slice.blocks); + const finalTools = slice.toolCalls.length > 0 ? [...slice.toolCalls] : undefined; + const usage = result.usage + ? { + inputTokens: result.usage.inputTokens, + outputTokens: result.usage.outputTokens, + cacheReadTokens: result.usage.cacheReadTokens, + cacheWriteTokens: result.usage.cacheWriteTokens, + reasoningTokens: result.usage.reasoningTokens, + model: result.usage.model, + llmMs: result.usage.llmMs, + } + : undefined; + // Cast: `files` is attached to the done payload by the server but isn't + // on the typed ChatResult — read it defensively. + const resultFiles = (result as unknown as Record).files as + | MessageFileAttachment[] + | undefined; + + const updated = [...slice.messages]; + if (updated.length > 0 && updated[updated.length - 1].role === "assistant") { + updated[updated.length - 1] = { + role: "assistant", + content: result.response, + blocks: finalBlocks, + toolCalls: finalTools, + usage, + ...(result.stopReason && result.stopReason !== "complete" + ? { stopReason: result.stopReason } + : {}), + ...(resultFiles && resultFiles.length > 0 ? { files: resultFiles } : {}), + }; + slice.messages = updated; + } + resetScratch(slice); + commit(slice); + closeConnection(slice); + break; + } + case "error": { + const evt = data as StreamErrorEvent; + slice.streamingState = null; + slice.preparingTool = null; + slice.isStreaming = false; + const updated = [...slice.messages]; + const last = updated[updated.length - 1]; + if (last?.role === "assistant") { + updated[updated.length - 1] = { ...last, error: evt.message }; + slice.messages = updated; + } else { + slice.error = evt.message; + } + commit(slice); + closeConnection(slice); + break; + } + case "cancelled": { + slice.streamingState = null; + slice.preparingTool = null; + slice.isStreaming = false; + commit(slice); + closeConnection(slice); + break; + } + } + } + + // -- send (start a server turn, then watch it) -- + + async function sendTurn( + key: string, + params: StartTurnParams, + hooks?: StartTurnHooks, + ): Promise { + ensureSlice(key); + const slice = byKey.get(key); + if (!slice || slice.isStreaming) return; + + slice.error = null; + slice.isStreaming = true; + slice.streamingState = "thinking"; + slice.pendingEcho = true; + // Authoring a turn means the full conversation lives in memory. + slice.hydrated = true; + resetScratch(slice); + + // Optimistic user message + assistant placeholder for snappy UX. The + // streamed `user.message` echo is consumed (pendingEcho), not duplicated. + const userFiles: MessageFileAttachment[] | undefined = params.files?.map((f) => ({ + id: `pending_${f.name}_${f.size}`, + filename: f.name, + mimeType: f.type || "application/octet-stream", + size: f.size, + extracted: false, + })); + const userMsg: ChatMessage = { + role: "user", + content: params.text, + timestamp: new Date().toISOString(), + ...(params.currentUserId ? { userId: params.currentUserId } : {}), + ...(userFiles && userFiles.length > 0 ? { files: userFiles } : {}), + }; + const assistantMsg: ChatMessage = { + role: "assistant", + content: "", + blocks: [], + toolCalls: [], + timestamp: new Date().toISOString(), + }; + slice.messages = [...slice.messages, userMsg, assistantMsg]; + commit(slice); + + const req: ChatRequest = { + message: params.text, + ...(slice.conversationId ? { conversationId: slice.conversationId } : {}), + ...(params.appContext ? { appContext: params.appContext } : {}), + ...(params.model ? { model: params.model } : {}), + }; + + let conversationId: string; + try { + const result = + params.files && params.files.length > 0 + ? await startChatTurnMultipart(req, params.files) + : await startChatTurn(req); + conversationId = result.conversationId; + } catch (err) { + handleTurnError(slice, err); + slice.isStreaming = false; + slice.streamingState = null; + slice.pendingEcho = false; + commit(slice); + return; + } + + if (slice.conversationId !== conversationId) { + slice.conversationId = conversationId; + aliasSlice(slice, conversationId); + hooks?.onConversationId?.(conversationId); + commit(slice); + } + + // Watch the turn we just started (fresh turn — not a resume). + openConnection(slice, conversationId, false); + } + + function handleTurnError(slice: ConversationSlice, err: unknown): void { + // Drop the optimistic user+assistant placeholders on a hard start failure. + slice.messages = slice.messages.slice(0, -2); + slice.error = formatSendError(err); + } + + // -- load from disk + attach -- + + async function loadConversation(id: string): Promise { + const existing = byKey.get(id); + // Already fully loaded and live — keep the stream, don't refetch. A + // dot-only probe (connection but not hydrated) falls through so opening + // the conversation fetches its full history. + if (existing?.hydrated && (existing.isStreaming || existing.connection)) { + existing.lastActiveAt = Date.now(); + return; + } + ensureSlice(id, { conversationId: id }); + const slice = byKey.get(id); + if (slice) slice.error = null; + try { + const res = await callTool("conversations", "get", { id, expand: "full" }); + const current = byKey.get(id); + if (!current) return; + if (res.isError) { + const errText = res.content + ?.map((b) => b.text ?? "") + .filter(Boolean) + .join("\n"); + throw new Error(errText || "Failed to load conversation"); + } + let raw: unknown = res.structuredContent; + if (!raw && res.content?.[0]?.text) { + try { + raw = JSON.parse(res.content[0].text); + } catch { + raw = {}; + } + } + const parsed = raw as { + metadata: { id: string; ownerId?: string; title?: string | null }; + messages: ChatMessage[]; + }; + current.conversationId = parsed.metadata.id; + aliasSlice(current, parsed.metadata.id); + current.meta = { ownerId: parsed.metadata.ownerId }; + current.title = parsed.metadata.title ?? null; + current.messages = parsed.messages ?? []; + current.hydrated = true; + commit(current); + // Attach to any in-flight turn (resume — trims a stale in-flight turn + // from the loaded history if the server says one is active). + openConnection(current, parsed.metadata.id, true); + } catch (err) { + const slc = byKey.get(id); + if (slc) { + slc.error = err instanceof Error ? err.message : "Failed to load conversation"; + commit(slc); + } + } + } + + function cancelTurn(key: string): void { + const slice = byKey.get(key); + if (!slice?.conversationId) return; + void cancelChatTurn(slice.conversationId); + // The server emits a terminal `cancelled` event which finalizes the slice; + // no optimistic mutation needed. + } + + /** + * Lightweight "is this conversation generating?" probe — used on reload to + * restore background streaming dots without fetching message history. Opens + * a resume subscription: if the server says the turn is active, the slice + * flips to streaming (→ `getStreamingIds` → dot) and tails live; if not, the + * connection closes and the slice stays idle. Leaves `hydrated` false so a + * later open still loads full history. + */ + function probeConversation(id: string): void { + const existing = byKey.get(id); + if (existing?.isStreaming || existing?.connection) return; // already live/probed + ensureSlice(id, { conversationId: id }); + const slice = byKey.get(id); + if (slice) openConnection(slice, id, true); + } + + // -- retry / simulate -- + + function retryLastMessage(key: string): string | null { + const slice = byKey.get(key); + if (!slice) return null; + let text: string | null = null; + for (let i = slice.messages.length - 1; i >= 0; i--) { + if (slice.messages[i].role === "user") { + text = slice.messages[i].content; + slice.messages = slice.messages.slice(0, i); + break; + } + } + slice.error = null; + slice.isStreaming = false; + slice.streamingState = null; + slice.preparingTool = null; + commit(slice); + return text; + } + + function simulateError(key: string, message: string): void { + const slice = byKey.get(key); + if (!slice || slice.messages.length === 0) return; + const updated = [...slice.messages]; + const last = updated[updated.length - 1]; + if (last?.role === "assistant") { + updated[updated.length - 1] = { ...last, error: message }; + } else { + updated.push({ role: "assistant", content: "", error: message }); + } + slice.messages = updated; + slice.streamingState = null; + slice.preparingTool = null; + slice.isStreaming = false; + commit(slice); + } + + function reset(): void { + for (const slice of allSlices) slice.connection?.close(); + byKey.clear(); + allSlices.clear(); + activeCounts.clear(); + streamingIds = []; + for (const set of listeners.values()) { + for (const cb of set) cb(); + } + for (const cb of streamingListeners) cb(); + } + + return { + ensureSlice, + getSnapshot(key) { + return byKey.get(key)?.snapshot ?? EMPTY_SNAPSHOT; + }, + subscribeSlice(key, cb) { + let set = listeners.get(key); + if (!set) { + set = new Set(); + listeners.set(key, set); + } + set.add(cb); + return () => { + const s = listeners.get(key); + if (!s) return; + s.delete(cb); + if (s.size === 0) listeners.delete(key); + }; + }, + getStreamingIds() { + return streamingIds; + }, + subscribeStreamingIds(cb) { + streamingListeners.add(cb); + return () => streamingListeners.delete(cb); + }, + markActive(key) { + activeCounts.set(key, (activeCounts.get(key) ?? 0) + 1); + const slice = byKey.get(key); + if (slice) slice.lastActiveAt = Date.now(); + }, + markInactive(key) { + const n = (activeCounts.get(key) ?? 0) - 1; + if (n <= 0) activeCounts.delete(key); + else activeCounts.set(key, n); + }, + sendTurn, + loadConversation, + probeConversation, + setTitle(conversationId, title) { + const slice = byKey.get(conversationId); + if (!slice || slice.title === title) return; + slice.title = title; + commit(slice); + }, + cancelTurn, + retryLastMessage, + simulateError, + reset, + sliceCount() { + return allSlices.size; + }, + }; +} + +/** Module-singleton store. */ +export const chatStore = createChatStore(); diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 4be3da27..e1e485a8 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -1,136 +1,27 @@ -import { useCallback, useEffect, useRef, useState } from "react"; -import { ApiClientError, callTool, streamChat, streamChatMultipart } from "../api/client"; -import { formatSendError } from "../api/format-error"; +import { useCallback, useEffect, useMemo, useState, useSyncExternalStore } from "react"; import { captureEvent } from "../telemetry"; +import type { AppContext } from "../types"; import type { - AppContext, - ChatRequest, - ChatResult, - ChatStreamEventMap, - ChatStreamEventType, - LlmDoneEvent, - ReasoningDeltaEvent, - StreamErrorEvent, - TextDeltaEvent, - ToolDoneEvent, - ToolPreparingEvent, - ToolStartEvent, -} from "../types"; - -/** - * Streaming state machine: - * - * null → thinking → streaming ↔ preparing → working → analyzing → streaming → null - * ↘ working (next tool.start) - * - * `analyzing` fills the gap between the last tool.done (all tools finished) - * and the next text.delta / tool.start, when the model is inferring on tool - * results but the UI would otherwise look frozen. - * - * `preparing` fills the model-side gap: after text/reasoning has streamed - * within an iteration, the model may continue emitting a large tool-call - * input block (45 KB+ for full-document writes). No deltas fire during - * that window — without `preparing`, the indicator goes dark for as long - * as it takes the LLM to emit the args. `tool.preparing` fires on - * `tool-input-start` from the AI SDK; `tool.start` follows once the - * iteration finishes and the engine begins execution. - * - * Any `tool.start` can re-enter `working` from a non-terminal state. - */ -export type StreamingState = - | null - | "thinking" - | "streaming" - | "preparing" - | "working" - | "analyzing"; - -/** Identifies the tool the model is currently building a call for. */ -export interface PreparingTool { - id: string; - name: string; -} - -/** Typed tool result shape forwarded through the bridge. */ -export interface ToolResultForUI { - content: Array<{ type: string; text?: string; [key: string]: unknown }>; - structuredContent?: Record; - isError: boolean; -} - -/** Tool call with UI state for streaming display. */ -export interface ToolCallDisplay { - id: string; - name: string; - status: "running" | "done" | "error"; - ok?: boolean; - ms?: number; - resourceUri?: string; - /** MCP `resource_link` blocks returned by the tool result, if any. */ - resourceLinks?: Array<{ - uri: string; - name?: string; - mimeType?: string; - description?: string; - }>; - result?: ToolResultForUI; - input?: Record; - appName?: string; -} - -/** A block in the assistant message stream — text, reasoning, or tool call group, in temporal order. */ -export type ContentBlock = - | { type: "text"; text: string } - | { type: "reasoning"; text: string } - | { type: "tool"; toolCalls: ToolCallDisplay[] }; - -/** Live iteration progress during streaming. */ -export interface IterationProgress { - n: number; - inputTokens: number; - outputTokens: number; -} - -/** File metadata attached to a message. */ -export interface MessageFileAttachment { - id: string; - filename: string; - mimeType: string; - size: number; - extracted: boolean; -} - -/** A chat message with ordered content blocks for display. */ -export interface ChatMessage { - role: "user" | "assistant"; - content: string; - blocks?: ContentBlock[]; - toolCalls?: ToolCallDisplay[]; - iteration?: IterationProgress; - timestamp?: string; - userId?: string; - files?: MessageFileAttachment[]; - stopReason?: string; - /** Set when the engine errors mid-stream — renders inline on the message. */ - error?: string; - usage?: { - inputTokens: number; - outputTokens: number; - cacheReadTokens?: number; - cacheWriteTokens?: number; - reasoningTokens?: number; - model: string; - llmMs: number; - }; -} - -/** - * Conversation-level metadata. Stage 1: single-owner only — sharing - * was removed (returns in Stage 4 with policy gating). - */ -export interface LoadedConversationMeta { - ownerId?: string; -} + ChatMessage, + LoadedConversationMeta, + PreparingTool, + StreamingState, +} from "./chat-store"; +import { chatStore, freshDraftKey } from "./chat-store"; + +// Re-export the display types so existing `from "../hooks/useChat"` imports +// keep working — the slice store now owns the definitions. +export type { + ChatMessage, + ContentBlock, + IterationProgress, + LoadedConversationMeta, + MessageFileAttachment, + PreparingTool, + StreamingState, + ToolCallDisplay, + ToolResultForUI, +} from "./chat-store"; export interface UseChatReturn { messages: ChatMessage[]; @@ -139,6 +30,8 @@ export interface UseChatReturn { /** Set while streamingState === "preparing"; null otherwise. */ preparingTool: PreparingTool | null; conversationId: string | null; + /** Server-generated title; null until generated/loaded. */ + title: string | null; conversationMeta: LoadedConversationMeta | null; error: string | null; sendMessage: ( @@ -149,700 +42,128 @@ export interface UseChatReturn { ) => Promise; newConversation: () => void; loadConversation: (id: string) => Promise; - /** Inject a user message from another participant (remote stream). */ - injectRemoteUserMessage: (userId: string, displayName: string, content: string) => void; - /** Process a streaming event from a remote participant's assistant response. */ - processRemoteStreamEvent: (type: string, data: unknown) => void; + /** Stop the in-flight turn (the only thing that aborts generation). */ + stop: () => void; /** Retry the last failed message (removes errored pair and re-sends). */ retryLastMessage: () => void; /** Inject a synthetic error for demoing the error UX (dev only). */ simulateError: (message: string) => void; } -/** Deep-copy blocks for immutable state updates. */ -function cloneBlocks(blocks: ContentBlock[]): ContentBlock[] { - return blocks.map((b) => { - if (b.type === "tool") return { ...b, toolCalls: [...b.toolCalls] }; - return { ...b }; // text or reasoning — both shaped { type, text } - }); -} - -/** Derive full visible text from blocks. Reasoning is NOT included - * (it's collapsed-by-default UI and shouldn't pollute the message body). */ -function textFromBlocks(blocks: ContentBlock[]): string { - return blocks - .filter((b): b is ContentBlock & { type: "text" } => b.type === "text") - .map((b) => b.text) - .join(""); -} - -/** Wrap a plain string result into a ToolResultForUI. */ -function wrapStringResult(text: string, isError = false): ToolResultForUI { - return { content: [{ type: "text", text }], isError }; -} - -const updateTool = - (evt: ToolDoneEvent) => - (tc: ToolCallDisplay): ToolCallDisplay => - tc.id === evt.id - ? { - ...tc, - status: evt.ok ? ("done" as const) : ("error" as const), - ok: evt.ok, - ms: evt.ms, - resourceUri: tc.resourceUri ?? evt.resourceUri, - resourceLinks: - evt.resourceLinks != null && evt.resourceLinks.length > 0 - ? evt.resourceLinks - : tc.resourceLinks, - result: evt.result != null ? (evt.result as ToolResultForUI) : tc.result, - } - : tc; - +/** + * Per-conversation chat state, backed by the module-singleton {@link chatStore}. + * + * `activeKey` selects which conversation's slice this hook renders. A stream + * started for one conversation writes only into that conversation's slice + * (captured at send time), so switching conversations mid-turn never bleeds + * the in-flight response into the destination chat (issue #254). Switching + * back shows the still-arriving response because the background stream kept + * filling its origin slice. + * + * `focusWorkspaceId` is the workspace the chat is FOCUSED on (the `/w/:slug` + * the user is viewing). Currently unused on this path — the chat-store + * dispatches via `startChatTurn`, which picks up the global active + * workspace from `headers()`. Accepted as a param for API compatibility + * with the ChatProvider; route-derived focus override is tracked as a + * follow-up. + */ export function useChat( initialConversationId?: string, currentUserId?: string, - focusWorkspaceId?: string | null, + _focusWorkspaceId?: string | null, ): UseChatReturn { - const [messages, setMessages] = useState([]); - const [isStreaming, setIsStreaming] = useState(false); - const [conversationId, setConversationId] = useState( - initialConversationId ?? null, - ); - const [error, setError] = useState(null); - const [streamingState, setStreamingState] = useState(null); - const [preparingTool, setPreparingTool] = useState(null); - const [conversationMeta, setConversationMeta] = useState(null); - - // The chat's FOCUSED workspace is what the user is currently VIEWING (the - // `/w/:slug`), supplied by the provider from the route. It's situational - // context — the agent reasons about which workspace/app is on screen — and - // the source of the workspace briefing (apps + house rules). NOT tool scope. - // Null on home / identity routes, so the agent correctly sees "no current - // workspace". Read via a ref so `sendMessage` doesn't re-memoize on nav. - const focusWorkspaceIdRef = useRef(focusWorkspaceId ?? null); - focusWorkspaceIdRef.current = focusWorkspaceId ?? null; + const [activeKey, setActiveKey] = useState(() => { + const key = initialConversationId ?? freshDraftKey(); + chatStore.ensureSlice( + key, + initialConversationId ? { conversationId: initialConversationId } : undefined, + ); + return key; + }); - // Refs for building the current assistant message during streaming. - const blocksRef = useRef([]); - const toolCallsRef = useRef([]); - const iterationRef = useRef(undefined); + const subscribe = useCallback( + (cb: () => void) => chatStore.subscribeSlice(activeKey, cb), + [activeKey], + ); + const getSnapshot = useCallback(() => chatStore.getSnapshot(activeKey), [activeKey]); + const snap = useSyncExternalStore(subscribe, getSnapshot); - /** Push current refs into the last assistant message. */ - function flushToMessage() { - const currentBlocks = cloneBlocks(blocksRef.current); - const currentText = textFromBlocks(blocksRef.current); - const currentTools = [...toolCallsRef.current]; - const currentIteration = iterationRef.current ? { ...iterationRef.current } : undefined; - setMessages((prev) => { - const updated = [...prev]; - updated[updated.length - 1] = { - role: "assistant", - content: currentText, - blocks: currentBlocks, - toolCalls: currentTools, - iteration: currentIteration, - }; - return updated; - }); - } + // Mark the active slice so the LRU never evicts what the user is viewing. + useEffect(() => { + chatStore.markActive(activeKey); + return () => chatStore.markInactive(activeKey); + }, [activeKey]); const sendMessage = useCallback( async (text: string, appContext?: AppContext, model?: string, files?: File[]) => { - if (isStreaming) return; - - setError(null); - setIsStreaming(true); - setStreamingState("thinking"); - blocksRef.current = []; - toolCallsRef.current = []; - iterationRef.current = undefined; - - // Add user message (with file previews if attached) - const userFiles: MessageFileAttachment[] | undefined = files?.map((f) => ({ - id: `pending_${f.name}_${f.size}`, - filename: f.name, - mimeType: f.type || "application/octet-stream", - size: f.size, - extracted: false, - })); - const userMsg: ChatMessage = { - role: "user", - content: text, - timestamp: new Date().toISOString(), - ...(currentUserId ? { userId: currentUserId } : {}), - ...(userFiles && userFiles.length > 0 ? { files: userFiles } : {}), - }; - setMessages((prev) => [...prev, userMsg]); - - // Add placeholder assistant message - const assistantMsg: ChatMessage = { - role: "assistant", - content: "", - blocks: [], - toolCalls: [], - timestamp: new Date().toISOString(), - }; - setMessages((prev) => [...prev, assistantMsg]); - - // Enrich appContext with latest app state from the bridge (Synapse Feature 2) + // Enrich appContext with the latest app state from the bridge + // (Synapse Feature 2). Kept here — the store stays bridge-agnostic. let enrichedContext = appContext; if (appContext) { const { getAppState } = await import("../bridge/bridge"); const appStateEntry = getAppState(appContext.serverName); - if (appStateEntry) { - enrichedContext = { ...appContext, appState: appStateEntry }; - } - } - - const req: ChatRequest = { - message: text, - ...(conversationId ? { conversationId } : {}), - ...(enrichedContext ? { appContext: enrichedContext } : {}), - ...(model ? { model } : {}), - }; - - try { - const onEvent = (type: K, data: ChatStreamEventMap[K]) => { - switch (type) { - case "chat.start": { - const evt = data as { conversationId: string }; - if (evt.conversationId) { - setConversationId(evt.conversationId); - } - break; - } - case "text.delta": { - const evt = data as TextDeltaEvent; - setStreamingState((prev) => (prev !== "streaming" ? "streaming" : prev)); - // Defensive: keeps `preparingTool` paired with the - // `"preparing"` streamingState. Render sites gate on the - // state, so stale data never shows today, but a future - // caller reading `preparingTool` directly would otherwise - // see a tool name from a long-finished iteration. - setPreparingTool(null); - // Append to last text block or create a new one - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "text") { - lastBlock.text += evt.text; - } else { - blocks.push({ type: "text", text: evt.text }); - } - flushToMessage(); - break; - } - case "reasoning.delta": { - const evt = data as ReasoningDeltaEvent; - setStreamingState((prev) => (prev !== "streaming" ? "streaming" : prev)); - setPreparingTool(null); - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "reasoning") { - lastBlock.text += evt.text; - } else { - blocks.push({ type: "reasoning", text: evt.text }); - } - flushToMessage(); - break; - } - case "tool.preparing": { - const evt = data as ToolPreparingEvent; - setStreamingState("preparing"); - setPreparingTool({ id: evt.id, name: evt.name }); - break; - } - case "tool.preparing.done": { - // No state change — `tool.start` follows once the iteration - // ends and the engine begins execution. Holding `preparing` - // through the gap keeps the indicator stable. - break; - } - case "tool.start": { - const evt = data as ToolStartEvent; - setStreamingState("working"); - setPreparingTool(null); - const separatorIdx = evt.name.indexOf("__"); - const newTool: ToolCallDisplay = { - id: evt.id, - name: evt.name, - status: "running", - resourceUri: evt.resourceUri, - input: evt.input, - appName: separatorIdx !== -1 ? evt.name.slice(0, separatorIdx) : undefined, - }; - // Flat ref - toolCallsRef.current = [...toolCallsRef.current, newTool]; - // Blocks — group consecutive tool calls - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "tool") { - lastBlock.toolCalls = [...lastBlock.toolCalls, newTool]; - } else { - blocks.push({ type: "tool", toolCalls: [newTool] }); - } - flushToMessage(); - break; - } - case "tool.done": { - const evt = data as ToolDoneEvent; - const updater = updateTool(evt); - // Update flat ref - toolCallsRef.current = toolCallsRef.current.map(updater); - // Update in blocks - for (const block of blocksRef.current) { - if (block.type === "tool") { - block.toolCalls = block.toolCalls.map(updater); - } - } - // Hold `working` while other parallel tools are still running; - // only flip to `analyzing` when the last tool in the batch lands, - // so the indicator reflects "model is inferring on results." - const anyRunning = toolCallsRef.current.some((tc) => tc.status === "running"); - setStreamingState(anyRunning ? "working" : "analyzing"); - flushToMessage(); - break; - } - case "llm.done": { - const evt = data as LlmDoneEvent; - iterationRef.current = { - n: (iterationRef.current?.n ?? 0) + 1, - inputTokens: - (iterationRef.current?.inputTokens ?? 0) + (evt.usage?.inputTokens ?? 0), - outputTokens: - (iterationRef.current?.outputTokens ?? 0) + (evt.usage?.outputTokens ?? 0), - }; - flushToMessage(); - break; - } - case "done": { - const result = data as ChatResult; - setStreamingState(null); - setPreparingTool(null); - setConversationId(result.conversationId); - - // Backfill tool results from done event - if (result.toolCalls) { - const outputMap = new Map(result.toolCalls.map((tc) => [tc.id, tc.output])); - const backfill = (tc: ToolCallDisplay): ToolCallDisplay => { - if (tc.result != null) return tc; - const output = outputMap.get(tc.id); - return output != null ? { ...tc, result: wrapStringResult(output) } : tc; - }; - for (const block of blocksRef.current) { - if (block.type === "tool") { - block.toolCalls = block.toolCalls.map(backfill); - } - } - toolCallsRef.current = toolCallsRef.current.map(backfill); - } - - const finalBlocks = cloneBlocks(blocksRef.current); - const finalTools = - toolCallsRef.current.length > 0 ? [...toolCallsRef.current] : undefined; - const usage = result.usage - ? { - inputTokens: result.usage.inputTokens, - outputTokens: result.usage.outputTokens, - cacheReadTokens: result.usage.cacheReadTokens, - cacheWriteTokens: result.usage.cacheWriteTokens, - reasoningTokens: result.usage.reasoningTokens, - model: result.usage.model, - llmMs: result.usage.llmMs, - } - : undefined; - // Parse file attachments from done event metadata - const resultFiles = (result as unknown as Record).files as - | MessageFileAttachment[] - | undefined; - - setMessages((prev) => { - const updated = [...prev]; - updated[updated.length - 1] = { - role: "assistant", - content: result.response, - blocks: finalBlocks, - toolCalls: finalTools, - usage, - ...(result.stopReason && result.stopReason !== "complete" - ? { stopReason: result.stopReason } - : {}), - ...(resultFiles && resultFiles.length > 0 ? { files: resultFiles } : {}), - }; - return updated; - }); - break; - } - case "error": { - const evt = data as StreamErrorEvent; - setStreamingState(null); - setPreparingTool(null); - // Stamp the error on the last assistant message so it renders - // inline — not as a disconnected banner at the top. - setMessages((prev) => { - const updated = [...prev]; - const last = updated[updated.length - 1]; - if (last?.role === "assistant") { - updated[updated.length - 1] = { ...last, error: evt.message }; - } else { - // No assistant message to attach to — fall back to banner - setError(evt.message); - } - return updated; - }); - break; - } - } - }; - if (files && files.length > 0) { - await streamChatMultipart(req, files, onEvent, focusWorkspaceIdRef.current); - } else { - await streamChat(req, onEvent, focusWorkspaceIdRef.current); - } - captureEvent("web.chat_sent", { - is_resume: !!conversationId, - has_app_context: !!appContext, - }); - } catch (err) { - if (err instanceof ApiClientError && err.code === "run_in_progress") { - // Server rejected because a previous run is still in flight. - // Drop the optimistic user+assistant placeholders so the failed - // message doesn't stick in history as if it had succeeded. - setMessages((prev) => prev.slice(0, -2)); - captureEvent("web.chat_run_in_progress", { - conversation_id: conversationId ?? null, - has_app_context: !!appContext, - }); - // Banner only — nothing in this turn to mark inline - setError(formatSendError(err)); - return; - } - const msg = formatSendError(err); - // Stamp on the last assistant message if one exists; - // only fall back to banner when there's no message to attach to. - setMessages((prev) => { - const last = prev[prev.length - 1]; - if (last?.role === "assistant") { - const updated = [...prev]; - updated[updated.length - 1] = { ...last, error: msg }; - return updated; - } - // No assistant message — fall back to banner - setError(msg); - return prev; - }); - } finally { - setIsStreaming(false); - setStreamingState(null); - setPreparingTool(null); + if (appStateEntry) enrichedContext = { ...appContext, appState: appStateEntry }; } + const hadConversation = !!chatStore.getSnapshot(activeKey).conversationId; + await chatStore.sendTurn( + activeKey, + { text, appContext: enrichedContext, model, files, currentUserId }, + { onConversationId: (id) => setActiveKey(id) }, + ); + captureEvent("web.chat_sent", { + is_resume: hadConversation, + has_app_context: !!appContext, + }); }, - // biome-ignore lint/correctness/useExhaustiveDependencies: sendMessage captures streaming/conversation state via refs - [isStreaming, conversationId, currentUserId, flushToMessage], + [activeKey, currentUserId], ); const newConversation = useCallback(() => { - setMessages([]); - setConversationId(null); - setConversationMeta(null); - setError(null); - setIsStreaming(false); - setStreamingState(null); - setPreparingTool(null); - blocksRef.current = []; - toolCallsRef.current = []; - iterationRef.current = undefined; + const key = freshDraftKey(); + chatStore.ensureSlice(key); + setActiveKey(key); }, []); const loadConversation = useCallback(async (id: string) => { - setError(null); - try { - // expand:"full" — the shell is rendering the entire chat, not - // sampling for an LLM. The bounded default (`expand:"messages"`, - // last 20) exists to keep agent tool-results small; the trusted - // web shell needs every turn or the UI silently shows only the tail. - const res = await callTool("conversations", "get", { id, expand: "full" }); - if (res.isError) { - const errText = res.content - ?.map((b) => b.text ?? "") - .filter(Boolean) - .join("\n"); - throw new Error(errText || "Failed to load conversation"); - } - // Prefer structuredContent; fall back to parsing first text block. - let raw: unknown = res.structuredContent; - if (!raw && res.content?.[0]?.text) { - try { - raw = JSON.parse(res.content[0].text); - } catch { - raw = {}; - } - } - // The API already returns DisplayMessage[] in the exact shape ChatMessage - // expects — one message per turn, blocks in iteration order, tool calls - // hydrated with status+result. No reshaping needed here. - const data = raw as { - metadata: { - id: string; - ownerId?: string; - }; - messages: ChatMessage[]; - }; - setConversationId(data.metadata.id); - setConversationMeta({ ownerId: data.metadata.ownerId }); - setMessages(data.messages); - } catch (err) { - const msg = err instanceof Error ? err.message : "Failed to load conversation"; - setError(msg); - } + setActiveKey(id); + await chatStore.loadConversation(id); }, []); - // --- Remote participant event injection --- + const stop = useCallback(() => { + chatStore.cancelTurn(activeKey); + }, [activeKey]); - const injectRemoteUserMessage = useCallback( - (userId: string, _displayName: string, content: string) => { - // Reset streaming refs for the incoming remote assistant response - blocksRef.current = []; - toolCallsRef.current = []; - iterationRef.current = undefined; + const retryLastMessage = useCallback(() => { + const text = chatStore.retryLastMessage(activeKey); + if (text != null) void sendMessage(text); + }, [activeKey, sendMessage]); - const userMsg: ChatMessage = { - role: "user", - content, - timestamp: new Date().toISOString(), - userId, - }; - const assistantMsg: ChatMessage = { - role: "assistant", - content: "", - blocks: [], - toolCalls: [], - timestamp: new Date().toISOString(), - }; - setMessages((prev) => [...prev, userMsg, assistantMsg]); - setIsStreaming(true); - setStreamingState("thinking"); + const simulateError = useCallback( + (message: string) => { + chatStore.simulateError(activeKey, message); }, - [], + [activeKey], ); - const processRemoteStreamEvent = useCallback( - (type: string, data: unknown) => { - switch (type) { - case "text.delta": { - const evt = data as TextDeltaEvent; - setStreamingState((prev) => (prev !== "streaming" ? "streaming" : prev)); - setPreparingTool(null); - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "text") { - lastBlock.text += evt.text; - } else { - blocks.push({ type: "text", text: evt.text }); - } - flushToMessage(); - break; - } - case "reasoning.delta": { - const evt = data as ReasoningDeltaEvent; - setStreamingState((prev) => (prev !== "streaming" ? "streaming" : prev)); - setPreparingTool(null); - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "reasoning") { - lastBlock.text += evt.text; - } else { - blocks.push({ type: "reasoning", text: evt.text }); - } - flushToMessage(); - break; - } - case "tool.preparing": { - const evt = data as ToolPreparingEvent; - setStreamingState("preparing"); - setPreparingTool({ id: evt.id, name: evt.name }); - break; - } - case "tool.preparing.done": { - break; - } - case "tool.start": { - const evt = data as ToolStartEvent; - setStreamingState("working"); - setPreparingTool(null); - const separatorIdx = evt.name.indexOf("__"); - const newTool: ToolCallDisplay = { - id: evt.id, - name: evt.name, - status: "running", - resourceUri: evt.resourceUri, - input: evt.input, - appName: separatorIdx !== -1 ? evt.name.slice(0, separatorIdx) : undefined, - }; - toolCallsRef.current = [...toolCallsRef.current, newTool]; - const blocks = blocksRef.current; - const lastBlock = blocks[blocks.length - 1]; - if (lastBlock && lastBlock.type === "tool") { - lastBlock.toolCalls = [...lastBlock.toolCalls, newTool]; - } else { - blocks.push({ type: "tool", toolCalls: [newTool] }); - } - flushToMessage(); - break; - } - case "tool.done": { - const evt = data as ToolDoneEvent; - const updater = updateTool(evt); - toolCallsRef.current = toolCallsRef.current.map(updater); - for (const block of blocksRef.current) { - if (block.type === "tool") { - block.toolCalls = block.toolCalls.map(updater); - } - } - const anyRunning = toolCallsRef.current.some((tc) => tc.status === "running"); - setStreamingState(anyRunning ? "working" : "analyzing"); - flushToMessage(); - break; - } - case "llm.done": { - const evt = data as LlmDoneEvent; - iterationRef.current = { - n: (iterationRef.current?.n ?? 0) + 1, - inputTokens: (iterationRef.current?.inputTokens ?? 0) + (evt.usage?.inputTokens ?? 0), - outputTokens: - (iterationRef.current?.outputTokens ?? 0) + (evt.usage?.outputTokens ?? 0), - }; - flushToMessage(); - break; - } - case "done": { - const result = data as ChatResult; - setStreamingState(null); - setPreparingTool(null); - setIsStreaming(false); - - if (result.toolCalls) { - const outputMap = new Map(result.toolCalls.map((tc) => [tc.id, tc.output])); - const backfill = (tc: ToolCallDisplay): ToolCallDisplay => { - if (tc.result != null) return tc; - const output = outputMap.get(tc.id); - return output != null ? { ...tc, result: wrapStringResult(output) } : tc; - }; - for (const block of blocksRef.current) { - if (block.type === "tool") { - block.toolCalls = block.toolCalls.map(backfill); - } - } - toolCallsRef.current = toolCallsRef.current.map(backfill); - } - - const finalBlocks = cloneBlocks(blocksRef.current); - const finalTools = - toolCallsRef.current.length > 0 ? [...toolCallsRef.current] : undefined; - const usage = result.usage - ? { - inputTokens: result.usage.inputTokens, - outputTokens: result.usage.outputTokens, - cacheReadTokens: result.usage.cacheReadTokens, - cacheWriteTokens: result.usage.cacheWriteTokens, - reasoningTokens: result.usage.reasoningTokens, - model: result.usage.model, - llmMs: result.usage.llmMs, - } - : undefined; - - setMessages((prev) => { - const updated = [...prev]; - updated[updated.length - 1] = { - role: "assistant", - content: result.response, - blocks: finalBlocks, - toolCalls: finalTools, - usage, - ...(result.stopReason && result.stopReason !== "complete" - ? { stopReason: result.stopReason } - : {}), - }; - return updated; - }); - break; - } - } - // biome-ignore lint/correctness/useExhaustiveDependencies: SSE handler intentionally captures only flushToMessage - }, - [flushToMessage], + return useMemo( + () => ({ + messages: snap.messages, + isStreaming: snap.isStreaming, + streamingState: snap.streamingState, + preparingTool: snap.preparingTool, + // Drafts carry a null conversationId on the slice, so this is null until + // the server assigns a real id on chat.start. + conversationId: snap.conversationId, + title: snap.title, + conversationMeta: snap.meta, + error: snap.error, + sendMessage, + newConversation, + loadConversation, + stop, + retryLastMessage, + simulateError, + }), + [snap, sendMessage, newConversation, loadConversation, stop, retryLastMessage, simulateError], ); - - // Pending retry text — set by retryLastMessage, consumed by an effect - const retryRef = useRef(null); - - const retryLastMessage = useCallback(() => { - // Find the last user message, stash its text, remove the failed pair - setMessages((prev) => { - for (let i = prev.length - 1; i >= 0; i--) { - if (prev[i].role === "user") { - retryRef.current = prev[i].content; - return prev.slice(0, i); - } - } - return prev; - }); - // Clear error + streaming state so sendMessage's guard passes - setError(null); - setIsStreaming(false); - setStreamingState(null); - setPreparingTool(null); - }, []); - - // Effect: once isStreaming is false and there's a pending retry, fire it. - // We can't call sendMessage synchronously from retryLastMessage because - // sendMessage is memoized with isStreaming in its dep list — the closure - // still sees isStreaming=true until React re-renders with the new state. - // This effect fires after React flushes the state updates, at which point - // sendMessage has been recreated with isStreaming=false. - // NOTE: this depends on sendMessage's identity changing when isStreaming - // changes (via flushToMessage in its dep list). Do not memoize - // flushToMessage without verifying this still fires. - useEffect(() => { - if (!isStreaming && retryRef.current) { - const text = retryRef.current; - retryRef.current = null; - sendMessage(text); - } - }, [isStreaming, sendMessage]); - - const simulateError = useCallback((message: string) => { - setMessages((prev) => { - if (prev.length === 0) return prev; - const updated = [...prev]; - const last = updated[updated.length - 1]; - if (last?.role === "assistant") { - updated[updated.length - 1] = { ...last, error: message }; - } else { - // Append a synthetic assistant message with the error - updated.push({ role: "assistant", content: "", error: message }); - } - return updated; - }); - setStreamingState(null); - setPreparingTool(null); - setIsStreaming(false); - }, []); - - return { - messages, - isStreaming, - streamingState, - preparingTool, - conversationId, - conversationMeta, - error, - sendMessage, - newConversation, - loadConversation, - injectRemoteUserMessage, - processRemoteStreamEvent, - retryLastMessage, - simulateError, - }; } diff --git a/web/src/hooks/useConversationEvents.ts b/web/src/hooks/useConversationEvents.ts deleted file mode 100644 index d701b820..00000000 --- a/web/src/hooks/useConversationEvents.ts +++ /dev/null @@ -1,81 +0,0 @@ -/** - * React hook for subscribing to per-conversation SSE events. - * - * Stage 1: conversations are single-owner; the broadcast still fires - * server-side so this subscription is the same-user cross-tab sync - * path (one user, multiple browser tabs/devices on the same - * conversation). Stage 4 reintroduces multi-user sharing and this - * hook's audience widens. - * - * Disconnects on cleanup or when the conversation changes. On - * reconnect, triggers a full conversation reload to catch missed - * messages. - */ - -import { useEffect, useRef } from "react"; -import { getAuthToken } from "../api/client"; -import { type ConversationSseConnection, connectConversationEvents } from "../api/conversation-sse"; - -export interface ConversationEventCallbacks { - /** A user message arrived from another participant. */ - onRemoteUserMessage: (data: { - userId: string; - displayName: string; - content: string; - timestamp: string; - }) => void; - /** A streaming event arrived from the assistant (responding to another user's message). */ - onRemoteStreamEvent: (type: string, data: unknown) => void; - /** Connection was re-established — reload the conversation to catch missed messages. */ - onReconnect: () => void; -} - -export function useConversationEvents( - conversationId: string | null, - callbacks: ConversationEventCallbacks, -): void { - // Keep callbacks in a ref so we don't reconnect on every render - const callbacksRef = useRef(callbacks); - callbacksRef.current = callbacks; - - useEffect(() => { - // Subscribe whenever a conversation is open — same-user cross-tab - // sync (and, post-Stage-4, multi-user sharing). - if (!conversationId) return; - - const token = getAuthToken(); - let connection: ConversationSseConnection | null = null; - - connection = connectConversationEvents({ - conversationId, - token: token ?? undefined, - onEvent: (type, data) => { - if (type === "user.message") { - callbacksRef.current.onRemoteUserMessage( - data as { - userId: string; - displayName: string; - content: string; - timestamp: string; - }, - ); - } else if (type === "heartbeat") { - // Ignore heartbeats - } else { - // text.delta, tool.start, tool.done, llm.done, done - callbacksRef.current.onRemoteStreamEvent(type, data); - } - }, - onReconnect: () => { - callbacksRef.current.onReconnect(); - }, - onError: (err) => { - console.warn("[conversation-sse] Error:", err.message); - }, - }); - - return () => { - connection?.close(); - }; - }, [conversationId]); -} diff --git a/web/src/hooks/useEvents.ts b/web/src/hooks/useEvents.ts index 2920c586..b7c7cc4b 100644 --- a/web/src/hooks/useEvents.ts +++ b/web/src/hooks/useEvents.ts @@ -4,6 +4,7 @@ import { connectEvents } from "../api/sse"; import type { ConfigChangedEvent, ConnectionStateChangedEvent, + ConversationTitleEvent, DataChangedEvent, SseEventMap, SseEventType, @@ -14,6 +15,8 @@ export interface UseEventsOptions { onDataChanged?: (event: DataChangedEvent) => void; /** Called when a config.changed SSE event is received. */ onConfigChanged?: (event: ConfigChangedEvent) => void; + /** Called when an auto-generated conversation title arrives. */ + onConversationTitle?: (event: ConversationTitleEvent) => void; /** Called when a per-Connection state transition fires (URL bundles). */ onConnectionStateChanged?: (event: ConnectionStateChangedEvent) => void; /** @@ -41,6 +44,8 @@ export function useEvents( onDataChangedRef.current = options?.onDataChanged; const onConfigChangedRef = useRef(options?.onConfigChanged); onConfigChangedRef.current = options?.onConfigChanged; + const onConversationTitleRef = useRef(options?.onConversationTitle); + onConversationTitleRef.current = options?.onConversationTitle; const onConnectionStateChangedRef = useRef(options?.onConnectionStateChanged); onConnectionStateChangedRef.current = options?.onConnectionStateChanged; const onBundleLifecycleChangedRef = useRef(options?.onBundleLifecycleChanged); @@ -59,6 +64,9 @@ export function useEvents( if (type === "config.changed") { onConfigChangedRef.current?.(data as ConfigChangedEvent); } + if (type === "conversation.title") { + onConversationTitleRef.current?.(data as ConversationTitleEvent); + } if (type === "connection.state_changed") { onConnectionStateChangedRef.current?.(data as ConnectionStateChangedEvent); } diff --git a/web/src/lib/active-conversation-storage.ts b/web/src/lib/active-conversation-storage.ts new file mode 100644 index 00000000..cf5c9d7d --- /dev/null +++ b/web/src/lib/active-conversation-storage.ts @@ -0,0 +1,60 @@ +/** + * Per-tab persistence of the last-viewed conversation id. + * + * Stored in `sessionStorage` so it is: + * - site-scoped (per-origin) and never sent to the server, + * - per-tab (two tabs don't clobber each other's active conversation), + * - cleared automatically when the tab closes (no stale-id cleanup). + * + * On a fresh page load the chat panel reads this and re-opens the + * conversation, which re-subscribes to the server turn stream — so an + * in-flight turn's streaming indicator resumes (the actual stream lives + * server-side; only the id needs remembering). The `/chat/:id` route + * restores from the URL instead and doesn't use this. + */ + +const KEY = "nb:activeConversationId"; +const STREAMING_KEY = "nb:streamingConversationIds"; + +export function getSavedConversationId(): string | null { + try { + return sessionStorage.getItem(KEY); + } catch { + // sessionStorage can throw in private-mode / sandboxed contexts. + return null; + } +} + +export function setSavedConversationId(id: string | null): void { + try { + if (id) sessionStorage.setItem(KEY, id); + else sessionStorage.removeItem(KEY); + } catch { + // Best-effort — persistence is an enhancement, not a correctness path. + } +} + +/** + * Conversation ids that had an in-flight turn when the page was last alive. + * On reload these are re-probed against the server (`isActive`) to restore the + * list's streaming dots; finished ones self-heal (probe → not active → no dot). + */ +export function getSavedStreamingIds(): string[] { + try { + const raw = sessionStorage.getItem(STREAMING_KEY); + if (!raw) return []; + const parsed: unknown = JSON.parse(raw); + return Array.isArray(parsed) ? parsed.filter((x): x is string => typeof x === "string") : []; + } catch { + return []; + } +} + +export function setSavedStreamingIds(ids: string[]): void { + try { + if (ids.length > 0) sessionStorage.setItem(STREAMING_KEY, JSON.stringify(ids)); + else sessionStorage.removeItem(STREAMING_KEY); + } catch { + // Best-effort. + } +} diff --git a/web/src/lib/forward-conversation-title.ts b/web/src/lib/forward-conversation-title.ts new file mode 100644 index 00000000..9f65733b --- /dev/null +++ b/web/src/lib/forward-conversation-title.ts @@ -0,0 +1,37 @@ +/** + * Forward a live `conversation.title` SSE event to the conversations-list + * iframe via postMessage. + * + * The conversations bundle's Dashboard listens for `synapse/conversation-title` + * and patches the matching row's title in-place. This is the cheap path: a + * full `data.changed` would force a list refetch, which is what the runtime + * used to fire on title resolve. Sending the (conversationId, title) tuple + * directly is one postMessage and an in-place state update. + * + * Targets only iframes whose `data-app` matches the conversations bundle + * name (`@nimblebraininc/conversations`). Unrelated iframes never see the + * message. No-op when the conversations panel isn't currently mounted — + * the next mount loads from disk where the title is already persisted, so + * there's no race. + * + * @param conversationId Conversation whose title was just generated. + * @param title The generated title. + */ +const CONVERSATIONS_APP = "@nimblebraininc/conversations"; + +export function forwardConversationTitleToIframes(conversationId: string, title: string): void { + const iframes = document.querySelectorAll( + `iframe[data-app="${CONVERSATIONS_APP}"]`, + ); + if (iframes.length === 0) return; + const message = { + jsonrpc: "2.0", + method: "synapse/conversation-title", + params: { conversationId, title }, + }; + for (const iframe of iframes) { + // Srcdoc iframes have the opaque "null" origin; targetOrigin must be "*" + // (matches useDataSync's path — same constraint). + iframe.contentWindow?.postMessage(message, "*"); + } +} diff --git a/web/src/types.ts b/web/src/types.ts index c3f014ab..f17be2b4 100644 --- a/web/src/types.ts +++ b/web/src/types.ts @@ -163,6 +163,13 @@ export interface ConfigChangedEvent { timestamp: string; } +/** Live auto-generated conversation title (routed to a slice by conversationId). */ +export interface ConversationTitleEvent { + conversationId: string; + title: string; + wsId?: string; +} + /** SSE event type to payload mapping. */ export interface SseEventMap { "bundle.installed": BundleInstalledEvent; @@ -172,6 +179,7 @@ export interface SseEventMap { "bundle.dead": BundleDeadEvent; "connection.state_changed": ConnectionStateChangedEvent; "data.changed": DataChangedEvent; + "conversation.title": ConversationTitleEvent; "config.changed": ConfigChangedEvent; heartbeat: HeartbeatEvent; } diff --git a/web/test/active-conversation-storage.test.ts b/web/test/active-conversation-storage.test.ts new file mode 100644 index 00000000..ff30a483 --- /dev/null +++ b/web/test/active-conversation-storage.test.ts @@ -0,0 +1,26 @@ +import { beforeEach, describe, expect, it } from "bun:test"; +import { + getSavedConversationId, + setSavedConversationId, +} from "../src/lib/active-conversation-storage"; + +describe("active-conversation-storage", () => { + beforeEach(() => { + sessionStorage.clear(); + }); + + it("returns null when nothing is saved", () => { + expect(getSavedConversationId()).toBeNull(); + }); + + it("round-trips a conversation id", () => { + setSavedConversationId("conv_abc123"); + expect(getSavedConversationId()).toBe("conv_abc123"); + }); + + it("clears the saved id when set to null (new/draft chat)", () => { + setSavedConversationId("conv_abc123"); + setSavedConversationId(null); + expect(getSavedConversationId()).toBeNull(); + }); +}); diff --git a/web/test/chat-store.test.ts b/web/test/chat-store.test.ts new file mode 100644 index 00000000..dca8a29b --- /dev/null +++ b/web/test/chat-store.test.ts @@ -0,0 +1,346 @@ +import { beforeEach, describe, expect, it, mock } from "bun:test"; +import type { ChatMessage } from "../src/hooks/chat-store.ts"; + +// --------------------------------------------------------------------------- +// Drive the chat store directly (no React). The store is now a *viewer* over +// the server turn stream: sendTurn → startChatTurn (POST) → subscribe via +// connectConversationStream. We mock both seams and drive the captured +// stream callback to simulate server events. +// +// Bun module mocks are process-global; we spread the real client and override +// only the turn transport so sibling suites keep the real exports. +// --------------------------------------------------------------------------- + +interface CapturedStream { + conversationId: string; + onEvent: (type: string, data: unknown, seq: number) => void; + onSubscribed?: (info: { isActive: boolean; activeSeq: number }) => void; + closed: boolean; +} +let streams: CapturedStream[] = []; +let convCounter = 0; + +const LOADED: ChatMessage[] = [ + { role: "user", content: "loaded-q" }, + { role: "assistant", content: "loaded-a", blocks: [{ type: "text", text: "loaded-a" }] }, +]; + +// A partial disk snapshot: the trailing assistant has no terminal event yet. +const PENDING_LOADED: ChatMessage[] = [ + { role: "user", content: "loaded-q" }, + { role: "assistant", content: "part", blocks: [{ type: "text", text: "part" }], pending: true }, +]; + +mock.module("../src/api/conversation-stream", () => ({ + connectConversationStream: (opts: { + conversationId: string; + onEvent: (type: string, data: unknown, seq: number) => void; + onSubscribed?: (info: { isActive: boolean; activeSeq: number }) => void; + }) => { + const entry: CapturedStream = { + conversationId: opts.conversationId, + onEvent: opts.onEvent, + onSubscribed: opts.onSubscribed, + closed: false, + }; + streams.push(entry); + return { + close() { + entry.closed = true; + }, + }; + }, +})); + +const actualClient = await import("../src/api/client"); +mock.module("../src/api/client", () => ({ + ...actualClient, + startChatTurn: (req: { conversationId?: string }) => { + convCounter += 1; + return Promise.resolve({ conversationId: req.conversationId ?? `conv_${convCounter}` }); + }, + startChatTurnMultipart: (req: { conversationId?: string }) => { + convCounter += 1; + return Promise.resolve({ conversationId: req.conversationId ?? `conv_${convCounter}` }); + }, + cancelChatTurn: () => Promise.resolve(), + callTool: (_server: string, _action: string, args?: Record) => + Promise.resolve({ + isError: false, + structuredContent: { + metadata: { id: args?.id }, + messages: args?.id === "conv_pending" ? PENDING_LOADED : LOADED, + }, + }), +})); + +import { createChatStore, freshDraftKey } from "../src/hooks/chat-store.ts"; + +function lastAssistant(messages: ChatMessage[]): ChatMessage | undefined { + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === "assistant") return messages[i]; + } + return undefined; +} + +/** The most-recently-opened stream (the one a just-sent turn subscribed to). */ +function latestStream(): CapturedStream { + return streams[streams.length - 1]; +} + +describe("chat-store viewer", () => { + beforeEach(() => { + streams = []; + convCounter = 0; + }); + + it("renders a sent turn from the server stream (echo consumed, no dup)", async () => { + const store = createChatStore(); + await store.sendTurn("draft-1", { text: "hello" }); + const s = latestStream(); + + // Server echoes the user message (consumed by the optimistic placeholder), + // then streams the assistant. + s.onEvent("user.message", { content: "hello" }, 1); + s.onEvent("text.delta", { text: "hi " }, 2); + s.onEvent("text.delta", { text: "there" }, 3); + + const snap = store.getSnapshot("draft-1"); + const users = snap.messages.filter((m) => m.role === "user"); + expect(users).toHaveLength(1); // not duplicated + expect(users[0].content).toBe("hello"); + expect(lastAssistant(snap.messages)?.content).toBe("hi there"); + }); + + it("isolates concurrent turns into their own slices", async () => { + const store = createChatStore(); + await store.sendTurn("kA", { text: "a" }); + const aStream = latestStream(); + await store.sendTurn("kB", { text: "b" }); + const bStream = latestStream(); + + aStream.onEvent("user.message", { content: "a" }, 1); + aStream.onEvent("text.delta", { text: "a1" }, 2); + bStream.onEvent("user.message", { content: "b" }, 1); + bStream.onEvent("text.delta", { text: "b1" }, 2); + aStream.onEvent("text.delta", { text: "a2" }, 3); + + expect(lastAssistant(store.getSnapshot("kA").messages)?.content).toBe("a1a2"); + expect(lastAssistant(store.getSnapshot("kB").messages)?.content).toBe("b1"); + }); + + it("remaps a draft to the real conversation id", async () => { + const store = createChatStore(); + const draft = freshDraftKey(); + const seen: string[] = []; + await store.sendTurn(draft, { text: "hi" }, { onConversationId: (id) => seen.push(id) }); + expect(seen).toEqual(["conv_1"]); + expect(store.getSnapshot(draft).conversationId).toBe("conv_1"); + // The real id resolves to the same live slice. + latestStream().onEvent("user.message", { content: "hi" }, 1); + latestStream().onEvent("text.delta", { text: "yo" }, 2); + expect(lastAssistant(store.getSnapshot("conv_1").messages)?.content).toBe("yo"); + }); + + it("enforces per-slice single-flight", async () => { + const store = createChatStore(); + const p1 = store.sendTurn("kA", { text: "first" }); + const p2 = store.sendTurn("kA", { text: "second" }); // ignored — already streaming + await Promise.all([p1, p2]); + // Only one turn started → one stream opened. + expect(streams).toHaveLength(1); + }); + + it("finalizes on the terminal done event and closes the stream", async () => { + const store = createChatStore(); + await store.sendTurn("kA", { text: "go" }); + const s = latestStream(); + s.onEvent("user.message", { content: "go" }, 1); + s.onEvent("text.delta", { text: "partial" }, 2); + s.onEvent("done", { response: "final answer", conversationId: "conv_1" }, 3); + + const snap = store.getSnapshot("kA"); + expect(snap.isStreaming).toBe(false); + expect(lastAssistant(snap.messages)?.content).toBe("final answer"); + expect(s.closed).toBe(true); + }); + + it("does not clobber a slice that is streaming on loadConversation", async () => { + const store = createChatStore(); + await store.sendTurn("conv_1", { text: "go" }); + latestStream().onEvent("user.message", { content: "go" }, 1); + latestStream().onEvent("text.delta", { text: "streaming-text" }, 2); + + await store.loadConversation("conv_1"); + expect(lastAssistant(store.getSnapshot("conv_1").messages)?.content).toBe("streaming-text"); + }); + + it("loads persisted history into an idle slice and trims a stale in-flight turn on resume", async () => { + const store = createChatStore(); + await store.loadConversation("conv_X"); + const s = latestStream(); + // Server says a turn is in flight → the stale in-flight turn (last user + // message + after) is trimmed, then replay rebuilds it. + s.onSubscribed?.({ isActive: true, activeSeq: 2 }); + // After trim, the loaded "loaded-q"/"loaded-a" pair: "loaded-q" is the last + // user message, so it + the trailing assistant are dropped. + expect(store.getSnapshot("conv_X").messages).toHaveLength(0); + // Server says a turn is active → the streaming indicator shows immediately, + // before any replayed event arrives. + expect(store.getSnapshot("conv_X").isStreaming).toBe(true); + + s.onEvent("user.message", { content: "loaded-q" }, 1); + s.onEvent("text.delta", { text: "fresh" }, 2); + expect(lastAssistant(store.getSnapshot("conv_X").messages)?.content).toBe("fresh"); + }); + + it("closes an idle resume connection when nothing is in flight", async () => { + const store = createChatStore(); + await store.loadConversation("conv_Y"); + const s = latestStream(); + s.onSubscribed?.({ isActive: false, activeSeq: 0 }); + expect(s.closed).toBe(true); + // History still present. + expect(store.getSnapshot("conv_Y").messages).toHaveLength(LOADED.length); + }); + + it("tracks streaming ids and clears on terminal", async () => { + const store = createChatStore(); + await store.sendTurn(freshDraftKey(), { text: "a" }); + latestStream().onEvent("user.message", { content: "a" }, 1); + expect(store.getStreamingIds()).toEqual(["conv_1"]); + latestStream().onEvent("done", { response: "x", conversationId: "conv_1" }, 2); + expect(store.getStreamingIds()).toEqual([]); + }); + + it("caps idle slices via LRU but keeps streaming ones", async () => { + const store = createChatStore(); + await store.sendTurn(freshDraftKey(), { text: "go" }); + latestStream().onEvent("user.message", { content: "go" }, 1); + for (let i = 0; i < 60; i++) store.ensureSlice(`idle-${i}`); + expect(store.sliceCount()).toBeLessThanOrEqual(30); + expect(store.getSnapshot("conv_1").isStreaming).toBe(true); + }); + + it("reset drops every slice and closes streams", async () => { + const store = createChatStore(); + await store.sendTurn("kA", { text: "a" }); + const s = latestStream(); + expect(store.sliceCount()).toBeGreaterThan(0); + store.reset(); + expect(store.sliceCount()).toBe(0); + expect(s.closed).toBe(true); + expect(store.getSnapshot("conv_1").messages).toEqual([]); + }); + + it("probeConversation lights a dot for an active conversation (no history fetch)", () => { + const store = createChatStore(); + store.probeConversation("conv_live"); + latestStream().onSubscribed?.({ isActive: true, activeSeq: 3 }); + + expect(store.getStreamingIds()).toEqual(["conv_live"]); + // No message history was fetched — only the probe subscription. + expect(store.getSnapshot("conv_live").messages).toEqual([]); + }); + + it("probeConversation closes and shows no dot for an inactive conversation", () => { + const store = createChatStore(); + store.probeConversation("conv_done"); + const s = latestStream(); + s.onSubscribed?.({ isActive: false, activeSeq: 0 }); + + expect(store.getStreamingIds()).toEqual([]); + expect(s.closed).toBe(true); + }); + + it("opening a probed conversation still loads full history", async () => { + const store = createChatStore(); + store.probeConversation("conv_x"); + latestStream().onSubscribed?.({ isActive: true, activeSeq: 3 }); + // Probe left it unhydrated despite streaming — loadConversation must fetch. + await store.loadConversation("conv_x"); + expect(lastAssistant(store.getSnapshot("conv_x").messages)?.content).toBe("loaded-a"); + }); + + it("setTitle updates a conversation's slice title (live conversation.title SSE)", async () => { + const store = createChatStore(); + await store.sendTurn("kA", { text: "a" }); + latestStream().onEvent("chat.start", { conversationId: "A" }, 1); + expect(store.getSnapshot("A").title).toBeNull(); + + store.setTitle("A", "Library Paranoia Joke"); + expect(store.getSnapshot("A").title).toBe("Library Paranoia Joke"); + }); + + it("setTitle is a no-op for a conversation with no slice in this tab", () => { + const store = createChatStore(); + store.setTitle("conv_absent", "Whatever"); + expect(store.getSnapshot("conv_absent").title).toBeNull(); + }); + + it("clears a stuck stream when a reconnect reports the turn already ended", async () => { + const store = createChatStore(); + await store.sendTurn("kA", { text: "a" }); + const s = latestStream(); + s.onEvent("chat.start", { conversationId: "A" }, 1); + s.onEvent("text.delta", { text: "partial" }, 2); + expect(store.getSnapshot("A").isStreaming).toBe(true); + + // Reconnect after the turn ended while disconnected past the grace window: + // server says not active and the terminal frame was GC'd (no replay). + s.onSubscribed?.({ isActive: false, activeSeq: 0 }); + + expect(store.getSnapshot("A").isStreaming).toBe(false); + expect(store.getSnapshot("A").streamingState).toBeNull(); + // Last-seen partial is retained (a reload would fetch the final transcript). + expect(lastAssistant(store.getSnapshot("A").messages)?.content).toBe("partial"); + }); + + it("completes a partial disk tail from the grace replay on resume (no dup)", async () => { + const store = createChatStore(); + await store.loadConversation("conv_pending"); + expect(lastAssistant(store.getSnapshot("conv_pending").messages)?.content).toBe("part"); + + // Turn finished in the load→subscribe window but is still graced: not + // active, retained run (activeSeq>0). The replay carries the full turn. + const s = latestStream(); + s.onSubscribed?.({ isActive: false, activeSeq: 5 }); + s.onEvent("user.message", { content: "loaded-q" }, 1); + s.onEvent("text.delta", { text: "full answer" }, 2); + s.onEvent("done", { conversationId: "conv_pending", response: "full answer" }, 3); + + const msgs = store.getSnapshot("conv_pending").messages; + expect(lastAssistant(msgs)?.content).toBe("full answer"); + expect(msgs.filter((m) => m.role === "user").length).toBe(1); // no duplicate turn + }); + + it("keeps a complete disk tail intact when a graced replay is available (no flicker)", async () => { + const store = createChatStore(); + await store.loadConversation("conv_done2"); + const s = latestStream(); + // Complete tail + retained run: the grace replay must be ignored (not + // trimmed+rebuilt) so the just-opened turn doesn't blink out and back. + s.onSubscribed?.({ isActive: false, activeSeq: 9 }); + s.onEvent("user.message", { content: "loaded-q" }, 1); + s.onEvent("done", { conversationId: "conv_done2", response: "loaded-a" }, 2); + expect(store.getSnapshot("conv_done2").messages).toEqual(LOADED); + }); + + it("does not duplicate a finished turn whose grace-buffer replay still arrives", async () => { + const store = createChatStore(); + // Disk already has the completed turn. + await store.loadConversation("conv_done"); + expect(store.getSnapshot("conv_done").messages).toEqual(LOADED); + + // Resume finds no active turn, but the server still replays the recently + // finished turn from its grace buffer. Those events must be dropped, not + // re-appended on top of the disk history. + const s = latestStream(); + s.onSubscribed?.({ isActive: false, activeSeq: 0 }); + s.onEvent("user.message", { content: "loaded-q" }, 1); + s.onEvent("text.delta", { text: "loaded-a" }, 2); + s.onEvent("done", { conversationId: "conv_done", response: "loaded-a" }, 3); + + expect(store.getSnapshot("conv_done").messages).toEqual(LOADED); + }); +}); diff --git a/web/test/chatBleed.test.tsx b/web/test/chatBleed.test.tsx new file mode 100644 index 00000000..34df0097 --- /dev/null +++ b/web/test/chatBleed.test.tsx @@ -0,0 +1,134 @@ +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it, mock } from "bun:test"; +import type { ReactNode } from "react"; +import { MemoryRouter } from "react-router-dom"; +import { ChatProvider, useChatContext } from "../src/context/ChatContext.tsx"; +import type { ChatMessage } from "../src/hooks/useChat.ts"; + +// --------------------------------------------------------------------------- +// Regression for #254 under the server-authoritative model: a turn streaming +// in conversation A must NOT bleed into B when the user switches mid-turn. +// Each conversation is a viewer over its own server stream; switching away +// keeps A's stream filling A's slice in the background. +// --------------------------------------------------------------------------- + +type StreamCb = (type: string, data: unknown, seq: number) => void; +const streamsByConv = new Map(); +const subscribedByConv = new Map void>(); + +const B_MESSAGES: ChatMessage[] = [ + { role: "user", content: "b-question" }, + { role: "assistant", content: "b-answer", blocks: [{ type: "text", text: "b-answer" }] }, +]; + +mock.module("../src/api/conversation-stream", () => ({ + connectConversationStream: (opts: { + conversationId: string; + onEvent: StreamCb; + onSubscribed?: (info: { isActive: boolean; activeSeq: number }) => void; + }) => { + streamsByConv.set(opts.conversationId, opts.onEvent); + if (opts.onSubscribed) subscribedByConv.set(opts.conversationId, opts.onSubscribed); + return { + close() { + streamsByConv.delete(opts.conversationId); + subscribedByConv.delete(opts.conversationId); + }, + }; + }, +})); + +const actualClient = await import("../src/api/client"); +mock.module("../src/api/client", () => ({ + ...actualClient, + startChatTurn: () => Promise.resolve({ conversationId: "conv-A" }), + startChatTurnMultipart: () => Promise.resolve({ conversationId: "conv-A" }), + cancelChatTurn: () => Promise.resolve(), + callTool: (server: string, action: string, args?: Record) => { + if (server === "conversations" && action === "get") { + return Promise.resolve({ + isError: false, + structuredContent: { metadata: { id: args?.id }, messages: B_MESSAGES }, + }); + } + return Promise.resolve({ isError: false, structuredContent: {} }); + }, +})); + +function wrapper({ children }: { children: ReactNode }) { + return ( + + {children} + + ); +} + +function lastAssistant(messages: ChatMessage[]): ChatMessage | undefined { + for (let i = messages.length - 1; i >= 0; i--) { + if (messages[i].role === "assistant") return messages[i]; + } + return undefined; +} + +describe("#254 mid-turn conversation switch (server-authoritative)", () => { + beforeEach(() => { + streamsByConv.clear(); + subscribedByConv.clear(); + }); + + it("does not bleed A's streaming deltas into conversation B", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + + await act(async () => { + await result.current.sendMessage("a-question"); + }); + + act(() => { + streamsByConv.get("conv-A")?.("user.message", { content: "a-question" }, 1); + streamsByConv.get("conv-A")?.("text.delta", { text: "A-part1" }, 2); + }); + + await act(async () => { + await result.current.loadConversation("conv-B"); + }); + act(() => { + subscribedByConv.get("conv-B")?.({ isActive: false, activeSeq: 0 }); + }); + + expect(result.current.conversationId).toBe("conv-B"); + expect(lastAssistant(result.current.messages)?.content).toBe("b-answer"); + + // A keeps streaming in the background. + act(() => { + streamsByConv.get("conv-A")?.("text.delta", { text: "A-part2" }, 3); + }); + + expect(lastAssistant(result.current.messages)?.content).toBe("b-answer"); + }); + + it("keeps A's background stream so switching back shows the response", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + + await act(async () => { + await result.current.sendMessage("a-question"); + }); + act(() => { + streamsByConv.get("conv-A")?.("user.message", { content: "a-question" }, 1); + streamsByConv.get("conv-A")?.("text.delta", { text: "A1" }, 2); + }); + + await act(async () => { + await result.current.loadConversation("conv-B"); + }); + act(() => { + subscribedByConv.get("conv-B")?.({ isActive: false, activeSeq: 0 }); + streamsByConv.get("conv-A")?.("text.delta", { text: "A2" }, 3); + }); + + await act(async () => { + await result.current.loadConversation("conv-A"); + }); + expect(result.current.conversationId).toBe("conv-A"); + expect(lastAssistant(result.current.messages)?.content).toBe("A1A2"); + }); +}); diff --git a/web/test/inlineError.test.tsx b/web/test/inlineError.test.tsx index b9c45184..a91d13d7 100644 --- a/web/test/inlineError.test.tsx +++ b/web/test/inlineError.test.tsx @@ -1,195 +1,129 @@ -import { describe, expect, it, mock, beforeEach } from "bun:test"; +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it, mock } from "bun:test"; import { realClient } from "./setup"; -import { renderHook, act } from "@testing-library/react"; import type { ReactNode } from "react"; import { MemoryRouter } from "react-router-dom"; import { ChatProvider, useChatContext } from "../src/context/ChatContext.tsx"; // --------------------------------------------------------------------------- -// Mock streamChat so we can control SSE events in tests +// Inline error UX under the server-authoritative path. We capture the turn +// stream's onEvent and push synthetic server events. // --------------------------------------------------------------------------- -type StreamCallback = (type: string, data: unknown) => void; +type StreamCb = (type: string, data: unknown, seq: number) => void; +let capturedOnEvent: StreamCb | null = null; -let capturedCallback: StreamCallback | null = null; -let resolveStream: (() => void) | null = null; -let rejectStream: ((err: Error) => void) | null = null; +mock.module("../src/api/conversation-stream", () => ({ + connectConversationStream: (opts: { onEvent: StreamCb }) => { + capturedOnEvent = opts.onEvent; + return { close() {} }; + }, +})); // Spread the preload's real-module snapshot (see web/test/setup.ts) so this -// whole-module mock exposes every api/client export; only the two below are +// whole-module mock exposes every api/client export; only the three below are // overridden. Bun's mock.module registry is process-global, so an incomplete // stub leaking into another suite's module graph is what crashed bridge tests // with "Export named 'getActiveWorkspaceId' not found". mock.module("../src/api/client", () => ({ - ...realClient, - streamChat: (_req: unknown, cb: StreamCallback) => { - capturedCallback = cb; - return new Promise((resolve, reject) => { - resolveStream = resolve; - rejectStream = reject; - }); - }, - getConversationHistory: mock(() => - Promise.resolve({ conversationId: "c1", messages: [] }), - ), + ...realClient, + startChatTurn: () => Promise.resolve({ conversationId: "c1" }), + startChatTurnMultipart: () => Promise.resolve({ conversationId: "c1" }), + cancelChatTurn: () => Promise.resolve(), })); -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - function wrapper({ children }: { children: ReactNode }) { - return ( - - {children} - - ); + return ( + + {children} + + ); } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- +let seq = 0; +function emit(type: string, data: unknown): void { + seq += 1; + capturedOnEvent?.(type, data, seq); +} describe("inline error UX", () => { - beforeEach(() => { - capturedCallback = null; - resolveStream = null; - rejectStream = null; - }); - - it("stream error event stamps error on last assistant message, not banner", async () => { - const { result } = renderHook(() => useChatContext(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - // Simulate some streaming content first - act(() => { - capturedCallback?.("text.delta", { text: "Here is my response" }); - }); - - // Fire SSE error event - act(() => { - capturedCallback?.("error", { - error: "json_parse", - message: "JSON Parse error: Unable to parse JSON string", - }); - }); - - // Error should be on the last assistant message - const lastMsg = result.current.messages[result.current.messages.length - 1]; - expect(lastMsg?.role).toBe("assistant"); - expect(lastMsg?.error).toBe("JSON Parse error: Unable to parse JSON string"); - - // Banner error should NOT be set - expect(result.current.error).toBeNull(); - }); - - it("simulateError is a no-op when there are no messages", () => { - const { result } = renderHook(() => useChatContext(), { wrapper }); - - expect(result.current.messages).toHaveLength(0); - - act(() => { - result.current.simulateError("Something broke"); - }); - - // No messages to stamp on — stays empty - expect(result.current.messages).toHaveLength(0); - expect(result.current.isStreaming).toBe(false); - expect(result.current.streamingState).toBeNull(); - }); - - it("simulateError stamps on existing assistant message", async () => { - const { result } = renderHook(() => useChatContext(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("text.delta", { text: "response text" }); - }); - - // Complete the stream so isStreaming is false - act(() => { - capturedCallback?.("done", { - response: "response text", - conversationId: "c1", - toolCalls: [], - inputTokens: 10, - outputTokens: 5, - stopReason: "complete", - }); - }); - act(() => { - resolveStream?.(); - }); - await act(async () => {}); - - const msgCountBefore = result.current.messages.length; - - act(() => { - result.current.simulateError("Simulated crash"); - }); - - // Should stamp on existing message, not add a new one - expect(result.current.messages).toHaveLength(msgCountBefore); - const lastMsg = result.current.messages[result.current.messages.length - 1]; - expect(lastMsg?.role).toBe("assistant"); - expect(lastMsg?.error).toBe("Simulated crash"); - }); - - it("retryLastMessage removes failed pair and re-sends", async () => { - const { result } = renderHook(() => useChatContext(), { wrapper }); - - // Send a message - act(() => { - result.current.sendMessage("try this"); - }); - await act(async () => {}); - - // Simulate some content then error - act(() => { - capturedCallback?.("text.delta", { text: "partial" }); - }); - act(() => { - capturedCallback?.("error", { - error: "crash", - message: "Engine crashed", - }); - }); - - // Complete the stream promise so isStreaming clears - act(() => { - resolveStream?.(); - }); - await act(async () => {}); - - // Should have user + errored assistant - expect(result.current.messages).toHaveLength(2); - expect(result.current.messages[1].error).toBe("Engine crashed"); - - // Reset mocks for the retry - capturedCallback = null; - resolveStream = null; - - // Retry - act(() => { - result.current.retryLastMessage(); - }); - await act(async () => {}); - - // retryLastMessage removes the failed pair and triggers a new send. - // After retry fires, we should have a new user + assistant placeholder. - // The callback should be captured again from the new sendMessage call. - // Give it another tick for the effect to fire sendMessage - await act(async () => {}); - - // The retry effect should have fired sendMessage, creating new messages - expect(result.current.isStreaming).toBe(true); - }); + beforeEach(() => { + capturedOnEvent = null; + seq = 0; + }); + + it("stream error event stamps error on last assistant message, not banner", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + + act(() => emit("text.delta", { text: "Here is my response" })); + act(() => + emit("error", { + error: "json_parse", + message: "JSON Parse error: Unable to parse JSON string", + }), + ); + + const lastMsg = result.current.messages[result.current.messages.length - 1]; + expect(lastMsg?.role).toBe("assistant"); + expect(lastMsg?.error).toBe("JSON Parse error: Unable to parse JSON string"); + expect(result.current.error).toBeNull(); + }); + + it("simulateError is a no-op when there are no messages", () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + expect(result.current.messages).toHaveLength(0); + act(() => { + result.current.simulateError("Something broke"); + }); + expect(result.current.messages).toHaveLength(0); + expect(result.current.isStreaming).toBe(false); + expect(result.current.streamingState).toBeNull(); + }); + + it("simulateError stamps on existing assistant message", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => emit("text.delta", { text: "response text" })); + act(() => + emit("done", { + response: "response text", + conversationId: "c1", + toolCalls: [], + stopReason: "complete", + }), + ); + + const msgCountBefore = result.current.messages.length; + act(() => { + result.current.simulateError("Simulated crash"); + }); + expect(result.current.messages).toHaveLength(msgCountBefore); + const lastMsg = result.current.messages[result.current.messages.length - 1]; + expect(lastMsg?.role).toBe("assistant"); + expect(lastMsg?.error).toBe("Simulated crash"); + }); + + it("retryLastMessage removes failed pair and re-sends", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("try this"); + }); + act(() => emit("text.delta", { text: "partial" })); + act(() => emit("error", { error: "crash", message: "Engine crashed" })); + + expect(result.current.messages).toHaveLength(2); + expect(result.current.messages[1].error).toBe("Engine crashed"); + + await act(async () => { + result.current.retryLastMessage(); + }); + await act(async () => {}); + + expect(result.current.isStreaming).toBe(true); + }); }); diff --git a/web/test/streamingState.test.tsx b/web/test/streamingState.test.tsx index de8707ab..7b47029e 100644 --- a/web/test/streamingState.test.tsx +++ b/web/test/streamingState.test.tsx @@ -1,317 +1,203 @@ -import { describe, expect, it, mock, beforeEach } from "bun:test"; +import { act, renderHook } from "@testing-library/react"; +import { beforeEach, describe, expect, it, mock } from "bun:test"; import { realClient } from "./setup"; -import { renderHook, act } from "@testing-library/react"; import type { ReactNode } from "react"; import { MemoryRouter } from "react-router-dom"; import { ChatProvider, useChatContext } from "../src/context/ChatContext.tsx"; import type { StreamingState } from "../src/hooks/useChat.ts"; // --------------------------------------------------------------------------- -// Mock streamChat so we can control SSE events in tests +// Drive the streaming state machine through the server-authoritative path: +// sendMessage → startChatTurn (POST) → subscribe via connectConversationStream. +// We capture the stream's onEvent and push synthetic server events. // --------------------------------------------------------------------------- -type StreamCallback = (type: string, data: unknown) => void; +type StreamCb = (type: string, data: unknown, seq: number) => void; +let capturedOnEvent: StreamCb | null = null; -let capturedCallback: StreamCallback | null = null; -let resolveStream: (() => void) | null = null; +mock.module("../src/api/conversation-stream", () => ({ + connectConversationStream: (opts: { onEvent: StreamCb }) => { + capturedOnEvent = opts.onEvent; + return { close() {} }; + }, +})); // Spread the preload's real-module snapshot (see web/test/setup.ts) so this -// whole-module mock exposes every api/client export; only the two below are +// whole-module mock exposes every api/client export; only the three below are // overridden. Bun's mock.module registry is process-global, so an incomplete // stub leaking into another suite's module graph is what crashed bridge tests // with "Export named 'getActiveWorkspaceId' not found". mock.module("../src/api/client", () => ({ - ...realClient, - streamChat: (_req: unknown, cb: StreamCallback) => { - capturedCallback = cb; - return new Promise((resolve) => { - resolveStream = resolve; - }); - }, - getConversationHistory: mock(() => - Promise.resolve({ conversationId: "c1", messages: [] }), - ), + ...realClient, + startChatTurn: () => Promise.resolve({ conversationId: "c1" }), + startChatTurnMultipart: () => Promise.resolve({ conversationId: "c1" }), + cancelChatTurn: () => Promise.resolve(), })); -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - function wrapper({ children }: { children: ReactNode }) { - return ( - - {children} - - ); + return ( + + {children} + + ); } -function useStreamingState() { - const ctx = useChatContext(); - return ctx; +let seq = 0; +function emit(type: string, data: unknown): void { + seq += 1; + capturedOnEvent?.(type, data, seq); } -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - describe("streamingState state machine", () => { - beforeEach(() => { - capturedCallback = null; - resolveStream = null; - }); - - it("starts as null", () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - expect(result.current.streamingState).toBeNull(); - }); - - it("transitions to thinking when sendMessage is called", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - // sendMessage returns a promise; don't await (stream is pending) - act(() => { - result.current.sendMessage("hello"); - }); - - // Wait a tick for state to settle - await act(async () => {}); - - expect(result.current.streamingState).toBe("thinking" as StreamingState); - }); - - it("transitions thinking → streaming on first text.delta", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - expect(result.current.streamingState).toBe("thinking"); - - act(() => { - capturedCallback?.("text.delta", { text: "Hi" }); - }); - - expect(result.current.streamingState).toBe("streaming"); - }); - - it("transitions streaming → working on tool.start", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("text.delta", { text: "Let me check" }); - }); - expect(result.current.streamingState).toBe("streaming"); - - act(() => { - capturedCallback?.("tool.start", { id: "t1", name: "search" }); - }); - expect(result.current.streamingState).toBe("working"); - }); - - it("transitions working → analyzing on last tool.done", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("text.delta", { text: "x" }); - }); - act(() => { - capturedCallback?.("tool.start", { id: "t1", name: "search" }); - }); - expect(result.current.streamingState).toBe("working"); - - act(() => { - capturedCallback?.("tool.done", { id: "t1", name: "search", ok: true, ms: 100 }); - }); - // No in-flight tools remain → model is inferring on the result. - expect(result.current.streamingState).toBe("analyzing"); - }); - - it("holds working while parallel tools are still in flight, then analyzing", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("tool.start", { id: "a", name: "search" }); - capturedCallback?.("tool.start", { id: "b", name: "fetch" }); - }); - expect(result.current.streamingState).toBe("working"); - - // First of two completes — the other is still running, so stay `working`. - act(() => { - capturedCallback?.("tool.done", { id: "a", name: "search", ok: true, ms: 10 }); - }); - expect(result.current.streamingState).toBe("working"); - - // Last one lands → flip to `analyzing`. - act(() => { - capturedCallback?.("tool.done", { id: "b", name: "fetch", ok: false, ms: 725 }); - }); - expect(result.current.streamingState).toBe("analyzing"); - }); - - it("transitions analyzing → streaming on the next text.delta", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("tool.start", { id: "t1", name: "search" }); - capturedCallback?.("tool.done", { id: "t1", name: "search", ok: true, ms: 10 }); - }); - expect(result.current.streamingState).toBe("analyzing"); - - act(() => { - capturedCallback?.("text.delta", { text: "Based on that…" }); - }); - expect(result.current.streamingState).toBe("streaming"); - }); - - it("transitions analyzing → working when the model calls another tool", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("tool.start", { id: "t1", name: "search" }); - capturedCallback?.("tool.done", { id: "t1", name: "search", ok: true, ms: 10 }); - }); - expect(result.current.streamingState).toBe("analyzing"); - - act(() => { - capturedCallback?.("tool.start", { id: "t2", name: "fetch" }); - }); - expect(result.current.streamingState).toBe("working"); - }); - - it("transitions to null on done event", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - act(() => { - capturedCallback?.("text.delta", { text: "Hi" }); - }); - expect(result.current.streamingState).toBe("streaming"); - - act(() => { - capturedCallback?.("done", { - conversationId: "c1", - response: "Hi", - }); - }); - expect(result.current.streamingState).toBeNull(); - }); - - it("transitions to null on error event", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - expect(result.current.streamingState).toBe("thinking"); - - act(() => { - capturedCallback?.("error", { error: "fail", message: "fail" }); - }); - expect(result.current.streamingState).toBeNull(); - }); - - it("transitions to null in finally after stream resolves", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - - expect(result.current.streamingState).toBe("thinking"); - - // Resolve the stream promise (triggers finally block) - await act(async () => { - resolveStream?.(); - }); - - expect(result.current.streamingState).toBeNull(); - }); - - it("newConversation resets streamingState to null", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.newConversation(); - }); - - expect(result.current.streamingState).toBeNull(); - }); - - it("full cycle: thinking → streaming → working → analyzing → streaming → null", async () => { - const { result } = renderHook(() => useStreamingState(), { wrapper }); - - act(() => { - result.current.sendMessage("hello"); - }); - await act(async () => {}); - expect(result.current.streamingState).toBe("thinking"); - - act(() => { - capturedCallback?.("text.delta", { text: "Let me " }); - }); - expect(result.current.streamingState).toBe("streaming"); - - act(() => { - capturedCallback?.("tool.start", { id: "t1", name: "lookup" }); - }); - expect(result.current.streamingState).toBe("working"); - - act(() => { - capturedCallback?.("tool.done", { id: "t1", name: "lookup", ok: true, ms: 50 }); - }); - expect(result.current.streamingState).toBe("analyzing"); - - act(() => { - capturedCallback?.("text.delta", { text: "here you go" }); - }); - expect(result.current.streamingState).toBe("streaming"); - - act(() => { - capturedCallback?.("done", { - conversationId: "c1", - response: "Let me here you go", - }); - }); - expect(result.current.streamingState).toBeNull(); - - // Resolve the stream so the finally block runs - await act(async () => { - resolveStream?.(); - }); - expect(result.current.streamingState).toBeNull(); - }); + beforeEach(() => { + capturedOnEvent = null; + seq = 0; + }); + + it("starts as null", () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + expect(result.current.streamingState).toBeNull(); + }); + + it("transitions to thinking when sendMessage is called", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + expect(result.current.streamingState).toBe("thinking" as StreamingState); + }); + + it("transitions thinking → streaming on first text.delta", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + expect(result.current.streamingState).toBe("thinking"); + act(() => emit("text.delta", { text: "Hi" })); + expect(result.current.streamingState).toBe("streaming"); + }); + + it("transitions streaming → working on tool.start", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => emit("text.delta", { text: "Let me check" })); + expect(result.current.streamingState).toBe("streaming"); + act(() => emit("tool.start", { id: "t1", name: "search" })); + expect(result.current.streamingState).toBe("working"); + }); + + it("transitions working → analyzing on last tool.done", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => emit("text.delta", { text: "x" })); + act(() => emit("tool.start", { id: "t1", name: "search" })); + expect(result.current.streamingState).toBe("working"); + act(() => emit("tool.done", { id: "t1", name: "search", ok: true, ms: 100 })); + expect(result.current.streamingState).toBe("analyzing"); + }); + + it("holds working while parallel tools are still in flight, then analyzing", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => { + emit("tool.start", { id: "a", name: "search" }); + emit("tool.start", { id: "b", name: "fetch" }); + }); + expect(result.current.streamingState).toBe("working"); + act(() => emit("tool.done", { id: "a", name: "search", ok: true, ms: 10 })); + expect(result.current.streamingState).toBe("working"); + act(() => emit("tool.done", { id: "b", name: "fetch", ok: false, ms: 725 })); + expect(result.current.streamingState).toBe("analyzing"); + }); + + it("transitions analyzing → streaming on the next text.delta", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => { + emit("tool.start", { id: "t1", name: "search" }); + emit("tool.done", { id: "t1", name: "search", ok: true, ms: 10 }); + }); + expect(result.current.streamingState).toBe("analyzing"); + act(() => emit("text.delta", { text: "Based on that…" })); + expect(result.current.streamingState).toBe("streaming"); + }); + + it("transitions analyzing → working when the model calls another tool", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => { + emit("tool.start", { id: "t1", name: "search" }); + emit("tool.done", { id: "t1", name: "search", ok: true, ms: 10 }); + }); + expect(result.current.streamingState).toBe("analyzing"); + act(() => emit("tool.start", { id: "t2", name: "fetch" })); + expect(result.current.streamingState).toBe("working"); + }); + + it("transitions to null on done event", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + act(() => emit("text.delta", { text: "Hi" })); + expect(result.current.streamingState).toBe("streaming"); + act(() => emit("done", { conversationId: "c1", response: "Hi" })); + expect(result.current.streamingState).toBeNull(); + }); + + it("transitions to null on error event", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + expect(result.current.streamingState).toBe("thinking"); + act(() => emit("error", { error: "fail", message: "fail" })); + expect(result.current.streamingState).toBeNull(); + }); + + it("transitions to null on cancelled event", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + expect(result.current.streamingState).toBe("thinking"); + act(() => emit("cancelled", {})); + expect(result.current.streamingState).toBeNull(); + }); + + it("newConversation resets streamingState to null", () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + act(() => { + result.current.newConversation(); + }); + expect(result.current.streamingState).toBeNull(); + }); + + it("full cycle: thinking → streaming → working → analyzing → streaming → null", async () => { + const { result } = renderHook(() => useChatContext(), { wrapper }); + await act(async () => { + await result.current.sendMessage("hello"); + }); + expect(result.current.streamingState).toBe("thinking"); + act(() => emit("text.delta", { text: "Let me " })); + expect(result.current.streamingState).toBe("streaming"); + act(() => emit("tool.start", { id: "t1", name: "lookup" })); + expect(result.current.streamingState).toBe("working"); + act(() => emit("tool.done", { id: "t1", name: "lookup", ok: true, ms: 50 })); + expect(result.current.streamingState).toBe("analyzing"); + act(() => emit("text.delta", { text: "here you go" })); + expect(result.current.streamingState).toBe("streaming"); + act(() => emit("done", { conversationId: "c1", response: "Let me here you go" })); + expect(result.current.streamingState).toBeNull(); + }); });