Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ea229ea
feat(runtime): server-authoritative resumable chat turns (RunBus)
Ovaculos May 25, 2026
83d88af
fix(conversations): rework auto-title prompt to stop response echo (#…
Ovaculos May 25, 2026
549cbd6
feat(web): per-conversation chat viewer over the server turn stream
Ovaculos May 25, 2026
fd7fcea
feat(conversations): live streaming dots + flicker-free list
Ovaculos May 25, 2026
00f4244
refactor(web): remove old streaming-chat client orphaned by the rewrite
Ovaculos May 25, 2026
a86bafd
test(conversations): fix auto-title detection after #253 prompt change
Ovaculos May 25, 2026
9ddfd2e
fix(adapters): log sinks must not throw into the event-emit path
Ovaculos May 25, 2026
46d775b
fix(runtime): deliver `cancelled` frame to live viewers on Stop
Ovaculos May 25, 2026
f2c0c33
fix(api): map ConversationCorruptedError to 422 in handleChatStart
Ovaculos May 25, 2026
66210e1
fix(runtime): serialize startTurn create to prevent a double-create race
Ovaculos May 25, 2026
75e01e3
fix(web): clear stuck streaming on reconnect after the turn ended
Ovaculos May 25, 2026
2ce8d73
fix(conversations): complete a partial disk snapshot on resume (no fl…
Ovaculos May 25, 2026
f4e4a62
docs(conversations): note legacy seq-less broadcast vs RunBus viewer
Ovaculos May 25, 2026
1c2d75f
fix(adapters): warn once per failure episode in best-effort log sinks
Ovaculos May 26, 2026
014492b
docs+chore: surface RunBus single-process limit when replicas > 1
Ovaculos May 26, 2026
96d5e65
fix(runtime): cap per-run RunBus event buffer to prevent unbounded gr…
Ovaculos May 27, 2026
5818db5
fix(tests): hard-error on missing workDir under bun test, fix 8 leakers
Ovaculos May 27, 2026
0556d83
perf(conversations): patch list row title in-place instead of refetching
Ovaculos May 27, 2026
d37e54e
chore: drop stale streamChat refs + scope code-style check to non-bun…
Ovaculos May 27, 2026
296dd62
Merge remote-tracking branch 'upstream/main' into feat/conversations-…
Ovaculos May 27, 2026
9f56a1d
Merge remote-tracking branch 'upstream/main' into feat/conversations-…
Ovaculos May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ Routing requests to the process owning a session's transport is the **load balan
3. `sessionStore.type: "redis"`. Each tenant gets its own Redis instance in its own namespace (see `infra/CLAUDE.md` per-tenant Redis pattern). Default `nb:mcp:session:` keyPrefix is correct under that model.
4. `platform.strategy.type: RollingUpdate`. Only after (1).

**Known limitation under `replicas > 1`: RunBus is single-process.** Chat turn replay/resume (the SSE-stream-backed viewer attaches to a per-conversation event log) lives in-memory on the pod that started the turn. A viewer landing on a different pod sees `isActive:false` for an in-flight turn elsewhere and the live frames don't fan out cross-pod. Sticky routing on `Mcp-Session-Id` (prereq #2) mitigates for the active tab; a pod restart or any cross-pod viewer (other tab/device) still drops resume mid-turn. The clustered Redis-backed RunBus is deferred work, tracked in `src/runtime/run-bus.ts` — `serve` warns at boot when `sessionStore.type === "redis"` so the gap is visible.

**TTL units: seconds at the surface, ms internally.** Operator-facing: `MCP_SESSION_TTL_SECONDS` env (highest priority) > `sessionStore.ttlSeconds` config > 8h default. Conversion to ms happens in `Runtime.getSessionStoreTtlMs()` only — registry constructors and the host's idle sweep both take ms from there. Don't add mixed-unit code elsewhere.

## MCP App Bridge Rules
Expand Down
7 changes: 7 additions & 0 deletions scripts/check-code-style.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ function checkNoInlineTypeImports(): CheckResult {
// this skip the check passes in CI but fails on a developer's machine.
if (file.split(/[\\/]/).includes("node_modules")) continue;
const rel = relative(ROOT, file);
// Skip bundle subtrees (their UIs have their own conventions, per the
// doc comment) and vendored deps. `bun run build:bundles` installs
// node_modules under each bundle's UI, so an unfiltered walk picks
// up thousands of vendored `.d.ts` violations that have nothing to
// do with our source.
if (rel.includes("/node_modules/")) continue;
if (rel.startsWith("src/bundles/")) continue;
const content = readFileSync(file, "utf-8");
const source = ts.createSourceFile(file, content, ts.ScriptTarget.Latest, true);

Expand Down
22 changes: 21 additions & 1 deletion src/adapters/structured-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export class StructuredLogSink implements EventSink {
private conversationId: string | undefined;
private userId: string | undefined;
private workspaceId: string | undefined;
/** True after a write failure surfaced a console.warn; reset on the next
* successful write so a recurring failure (after intermittent recovery)
* warns again. Avoids spamming during a sustained outage. */
private writeWarned = false;

constructor(config: StructuredLogConfig) {
this.dir = config.dir;
Expand Down Expand Up @@ -92,7 +96,23 @@ export class StructuredLogSink implements EventSink {
private writeLine(record: Record<string, unknown>): void {
const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
const filename = `nimblebrain-${today}.jsonl`;
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
try {
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
this.writeWarned = false;
} catch (err) {
// Best-effort logging: a write failure (disk full, perms, or a detached
// turn emitting after the workdir was torn down) must never throw into
// the event-emit path and crash the caller. Surface the first failure of
// an episode so operators see disk/perms incidents; suppress until a
// subsequent success re-arms.
if (!this.writeWarned) {
this.writeWarned = true;
console.warn(
`[structured-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`,
err instanceof Error ? err.message : err,
);
}
}
}

/** Remove log files older than the retention threshold. */
Expand Down
20 changes: 19 additions & 1 deletion src/adapters/workspace-log-sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export interface WorkspaceLogConfig {
*/
export class WorkspaceLogSink implements EventSink {
private dir: string;
/** First-failure-of-an-episode flag — see structured-log-sink. */
private writeWarned = false;

constructor(config: WorkspaceLogConfig) {
this.dir = join(config.dir, "workspace");
Expand All @@ -65,7 +67,23 @@ export class WorkspaceLogSink implements EventSink {

const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
const filename = `${today}.jsonl`;
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
try {
appendFileSync(join(this.dir, filename), `${JSON.stringify(record)}\n`);
this.writeWarned = false;
} catch (err) {
// Best-effort logging: a write failure (disk full, perms, or a detached
// turn emitting after the workdir was torn down) must never throw into
// the event-emit path and crash the caller. Surface the first failure of
// an episode so operators see disk/perms incidents; suppress until a
// subsequent success re-arms.
if (!this.writeWarned) {
this.writeWarned = true;
console.warn(
`[workspace-log-sink] write to ${this.dir} failed (further failures suppressed until recovery):`,
err instanceof Error ? err.message : err,
);
}
}
}

/** No-op — kept for API compatibility. Writes are synchronous. */
Expand Down
58 changes: 56 additions & 2 deletions src/api/conversation-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* Separate from SseEventManager which handles workspace-level events.
*/

import type { BufferedRunEvent } from "../runtime/run-bus.ts";

/** A subscriber watching a specific conversation's events. */
interface ConversationSubscriber {
id: string;
Expand All @@ -19,6 +21,13 @@ interface ConversationSubscriber {

const encoder = new TextEncoder();

/** Format an SSE frame. `seq`, when present, is sent as the `id:` line so a
* reconnecting viewer can resume from its last-seen sequence number. */
function frame(eventType: string, data: unknown, seq?: number): Uint8Array {
const idLine = seq != null ? `id: ${seq}\n` : "";
return encoder.encode(`event: ${eventType}\n${idLine}data: ${JSON.stringify(data)}\n\n`);
}

export class ConversationEventManager {
private subscribers = new Map<string, ConversationSubscriber>();
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
Expand Down Expand Up @@ -73,16 +82,33 @@ export class ConversationEventManager {
addSubscriber(
conversationId: string,
userId: string,
replay?: BufferedRunEvent[],
meta?: { isActive: boolean; activeSeq: number },
): { stream: ReadableStream<Uint8Array>; subscriberId: string } {
const id = crypto.randomUUID();
let sub: ConversationSubscriber;

const stream = new ReadableStream<Uint8Array>({
start: (controller) => {
sub = { id, userId, conversationId, controller, closed: false };
// The subscribed frame tells the client whether a turn is in flight
// (so it can trim a stale in-flight turn from disk history before the
// RunBus replay rebuilds it) and its current seq.
controller.enqueue(
frame("subscribed", {
subscriberId: id,
isActive: meta?.isActive ?? false,
activeSeq: meta?.activeSeq ?? 0,
}),
);
// Replay the in-flight turn (if any) BEFORE registering for live
// fan-out. start() runs synchronously and we add to the subscribers
// map only after replaying, so no live event can interleave ahead of
// the replay — viewers never see out-of-order deltas.
if (replay) {
for (const e of replay) controller.enqueue(frame(e.type, e.data, e.seq));
}
this.subscribers.set(id, sub);
const subscribedMsg = `event: subscribed\ndata: ${JSON.stringify({ subscriberId: id })}\n\n`;
controller.enqueue(encoder.encode(subscribedMsg));
},
cancel: () => {
this.removeSubscriber(id);
Expand All @@ -92,6 +118,29 @@ export class ConversationEventManager {
return { stream, subscriberId: id };
}

/**
* Fan out a live run event (with its sequence number) to every subscriber
* of the conversation. The seq lets viewers de-duplicate against replay and
* resume after a reconnect.
*/
publishEvent(conversationId: string, event: BufferedRunEvent): void {
const encoded = frame(event.type, event.data, event.seq);
for (const [id, sub] of this.subscribers) {
if (sub.closed) {
this.subscribers.delete(id);
continue;
}
if (sub.conversationId !== conversationId) continue;
try {
sub.controller.enqueue(encoded);
} catch (err) {
console.warn("[conversation-events] SSE write failed:", err);
this.closeSub(sub);
this.subscribers.delete(id);
}
}
}

/** Remove a specific subscriber. */
removeSubscriber(subscriberId: string): void {
const sub = this.subscribers.get(subscriberId);
Expand Down Expand Up @@ -121,6 +170,11 @@ export class ConversationEventManager {
* explicit policy gates; until then, this is the only exclusion
* shape needed.
*
* Seq-less: unlike {@link publishEvent} (the RunBus path), these frames carry
* no `id:` sequence. A seq-tracking `conversation-stream` viewer applies them
* live but can't replay/resume them. Only `/v1/chat` + `/v1/chat/stream` use
* this; the web shell is RunBus-only.
*
* @param conversationId - Target conversation
* @param eventType - SSE event type (e.g. "text.delta", "user.message")
* @param data - Event data payload
Expand Down
5 changes: 5 additions & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ const SSE_ROUTES: Partial<Record<EngineEventType, SseRoute>> = {
// existing "broadcast to all clients in this process" behavior to avoid
// silently breaking iframe refresh. Revisit when payload grows wsId.
"data.changed": { scope: "global" },
// Live conversation-title update (auto-title generation completes after the
// turn). Workspace-scoped via the conversation's workspaceId breadcrumb so
// it doesn't leak across tenants. The shell routes it to the matching
// conversation slice by `conversationId`.
"conversation.title": { scope: "workspace", wsIdField: "wsId" },
// Org-level config (model preferences, feature flags). Affects every
// workspace; broadcast to all.
"config.changed": { scope: "global" },
Expand Down
83 changes: 83 additions & 0 deletions src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { EngineEvent, EventSink } from "../engine/types.ts";
import { ingestFiles, isAllowedMime, type UploadedFile } from "../files/ingest.ts";
import type { FileEntry } from "../files/types.ts";
import type { IdentityProvider, UserIdentity } from "../identity/provider.ts";
import { DEV_IDENTITY } from "../identity/providers/dev.ts";
import {
ConversationAccessDeniedError,
ConversationCorruptedError,
Expand Down Expand Up @@ -154,6 +155,88 @@ function runInProgressResponse(conversationId: string): Response {
);
}

/**
* Handle POST /v1/chat/start — kick off a detached, server-authoritative turn
* and return the conversation id immediately. The turn runs to completion on
* the server regardless of this request's lifecycle (closing the tab does NOT
* cancel it). Clients watch the turn via GET /v1/conversations/:id/events,
* which replays the in-flight turn then tails live.
*/
export async function handleChatStart(
request: Request,
runtime: Runtime,
features: ResolvedFeatures,
identity?: UserIdentity,
workspaceId?: string,
): Promise<Response> {
const parsed = await parseChatBody(request, runtime, features, identity, workspaceId);
if (parsed instanceof Response) return parsed;
try {
const { conversationId } = await runtime.startTurn(parsed);
return Response.json({ conversationId });
} catch (err) {
if (err instanceof RunInProgressError) {
return runInProgressResponse(parsed.conversationId ?? "");
}
if (err instanceof ConversationAccessDeniedError) {
return apiError(
403,
"conversation_access_denied",
"You do not have access to this conversation.",
{ conversationId: parsed.conversationId },
);
}
// startTurn → store.load can throw on a pre-migration (ownerless)
// conversation. Map to 422 — parity with handleChat / handleChatCancel —
// instead of leaking a raw 500.
if (err instanceof ConversationCorruptedError) {
return conversationCorruptedResponse(err);
}
throw err;
}
}

/**
* Handle POST /v1/conversations/:id/cancel — the explicit Stop button. The
* ONLY thing that aborts generation; client disconnect does not. Ownership is
* enforced (same posture as the events route).
*/
export async function handleChatCancel(
conversationId: string,
runtime: Runtime,
identity?: UserIdentity,
): Promise<Response> {
const callerId = identity?.id ?? (runtime.getIdentityProvider() ? null : DEV_IDENTITY.id);
if (!callerId) {
return apiError(401, "authentication_required", "Authentication required.");
}
const conversation = await runtime.findConversation(conversationId).catch((err) => {
if (err instanceof ConversationCorruptedError) return err;
throw err;
});
if (conversation instanceof ConversationCorruptedError) {
return apiError(422, "conversation_corrupted", conversation.message, {
conversationId: conversation.conversationId,
reason: conversation.reason,
});
}
if (!conversation) {
return apiError(404, "not_found", "Conversation not found");
}
if (conversation.ownerId !== callerId) {
return apiError(
403,
"conversation_access_denied",
"You do not have access to this conversation.",
{
conversationId,
},
);
}
const cancelled = runtime.cancelTurn(conversationId);
return Response.json({ cancelled });
}

function conversationAccessDeniedResponse(conversationId: string): Response {
return apiError(
403,
Expand Down
62 changes: 37 additions & 25 deletions src/api/routes/chat.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Hono } from "hono";
import { handleChat, handleChatStream } from "../handlers.ts";
import { handleChat, handleChatCancel, handleChatStart, handleChatStream } from "../handlers.ts";
import { requireAuth } from "../middleware/auth.ts";
import { bodyLimit } from "../middleware/body-limit.ts";
import { errorLog } from "../middleware/error-log.ts";
Expand All @@ -23,28 +23,40 @@ export function chatRoutes(ctx: AppContext) {
// value flows through `handleChat` into `ChatRequest.workspaceId` as the
// *focused* workspace, scoping the prompt briefing (installed apps +
// house rules). See `handlers.ts::parseChatBody`.
return new Hono<AppEnv>()
.use("*", requireAuth(ctx.authOptions))
.use("*", optionalWorkspace(ctx.workspaceStore))
.use("*", errorLog(ctx))
.post("/v1/chat", chatBodyLimit, rl, (c) =>
handleChat(
c.req.raw,
ctx.runtime,
ctx.features,
c.var.identity,
c.var.workspaceId,
ctx.conversationEventManager,
),
)
.post("/v1/chat/stream", chatBodyLimit, rl, (c) =>
handleChatStream(
c.req.raw,
ctx.runtime,
ctx.features,
c.var.identity,
c.var.workspaceId,
ctx.conversationEventManager,
),
);
return (
new Hono<AppEnv>()
.use("*", requireAuth(ctx.authOptions))
.use("*", optionalWorkspace(ctx.workspaceStore))
.use("*", errorLog(ctx))
.post("/v1/chat", chatBodyLimit, rl, (c) =>
handleChat(
c.req.raw,
ctx.runtime,
ctx.features,
c.var.identity,
c.var.workspaceId,
ctx.conversationEventManager,
),
)
.post("/v1/chat/stream", chatBodyLimit, rl, (c) =>
handleChatStream(
c.req.raw,
ctx.runtime,
ctx.features,
c.var.identity,
c.var.workspaceId,
ctx.conversationEventManager,
),
)
// Server-authoritative entry point: starts a detached turn and returns
// the conversation id immediately. The client then watches via
// GET /v1/conversations/:id/events. Generation survives client disconnect.
.post("/v1/chat/start", chatBodyLimit, rl, (c) =>
handleChatStart(c.req.raw, ctx.runtime, ctx.features, c.var.identity, c.var.workspaceId),
)
// Explicit Stop — the only way to abort an in-flight turn.
.post("/v1/conversations/:id/cancel", (c) =>
handleChatCancel(c.req.param("id"), ctx.runtime, c.var.identity),
)
);
}
Loading