diff --git a/src/bundles/lifecycle.ts b/src/bundles/lifecycle.ts index f5aad0db..f98d4cef 100644 --- a/src/bundles/lifecycle.ts +++ b/src/bundles/lifecycle.ts @@ -48,6 +48,14 @@ import type { export class BundleLifecycleManager { private instances = new Map(); private placementRegistry: PlacementRegistry | null = null; + /** + * Per-(serverName, wsId) lock chain serializing `respawnBundle` against + * concurrent callers (and against itself). Each pending respawn appends + * its promise to the chain; the next caller awaits the previous one + * before starting its own stop/start. Keys are dropped from the map + * when the last queued op resolves, so this doesn't grow unbounded. + */ + private respawnLocks = new Map>(); /** * Getter for a workspace-scoped automations domain context. Set by * Runtime after the automations platform source is constructed. Used @@ -65,6 +73,7 @@ export class BundleLifecycleManager { private configPath: string | undefined, private allowInsecureRemotes = false, private mpakHome: string = join(homedir(), ".mpak"), + private workDir: string = join(homedir(), ".nimblebrain"), ) {} /** Set the PlacementRegistry (called by Runtime after construction). */ @@ -567,6 +576,115 @@ export class BundleLifecycleManager { this.transition(instance, "stopped"); } + /** + * Stop the current bundle subprocess and re-spawn it under the same + * server name. The new subprocess inherits the platform process env + * AND re-reads workspace `user_config` from disk via the named-bundle + * path in `startBundleSource` → `mpak.prepareServer`. This is the + * canonical "user changed a credential, make the bundle use it" path, + * called from the credential-writing tool handlers + * (`set_user_config`, `configureBundle`). + * + * Concurrency: serialized per-(serverName, wsId) via `respawnLocks`. + * Concurrent respawns queue and execute in order; a respawn-in-flight + * blocks subsequent respawns for the same key until it completes + * (success or error). This prevents the half-state window where the + * old source is removed but the new one hasn't started. + * + * Tool calls dispatched during a respawn are NOT serialized by this + * lock — they target the registry, not the lifecycle map — but + * `registry.removeSource` waits for in-flight calls on the old source + * to drain before returning, and the new source is only registered + * after `startBundleSource` returns ready. So a tool call landing + * mid-respawn either completes against the old source (before + * removeSource finishes) or fails with "no source named X" (during + * the brief gap before the new source is registered). The caller + * (engine) treats both as transient and the LLM retries. + * + * Protected bundles refuse to respawn — that flag is the contract + * with platform-critical apps that must not be torn down mid-call. + * + * Scope: named bundles only today. URL bundles read OAuth credentials + * lazily via `WorkspaceOAuthProvider` and don't need respawn on + * credential change. Path bundles (dev-mode local installs) aren't + * exposed via the credential-writing tool surface. + */ + async respawnBundle( + serverName: string, + wsId: string, + registry: ToolRegistry, + ): Promise<{ ok: true } | { ok: false; error: string }> { + const key = `${serverName}|${wsId}`; + const prev = this.respawnLocks.get(key); + let resolveNext!: () => void; + const next = new Promise((resolve) => { + resolveNext = () => resolve(undefined); + }); + this.respawnLocks.set(key, next); + try { + if (prev) await prev; + return await this.runRespawn(serverName, wsId, registry); + } finally { + resolveNext(); + // Only drop the chain head if no one queued behind us. If the + // chain advanced, leave it intact for the next waiter. + if (this.respawnLocks.get(key) === next) { + this.respawnLocks.delete(key); + } + } + } + + private async runRespawn( + serverName: string, + wsId: string, + registry: ToolRegistry, + ): Promise<{ ok: true } | { ok: false; error: string }> { + const instance = this.instances.get(`${serverName}|${wsId}`); + if (!instance) { + return { + ok: false, + error: `Bundle "${serverName}" is not installed in workspace "${wsId}". Install it first via manage_app install.`, + }; + } + if (instance.protected) { + return { ok: false, error: `Cannot respawn "${serverName}": bundle is protected` }; + } + + // Named bundles re-spawn against `{ name: bundleName }`. The named- + // bundle path in `startBundleSource` re-reads user_config from the + // workspace credential store via `resolveUserConfig` — which is the + // whole point of this method. + this.transition(instance, "starting"); + try { + if (registry.hasSource(serverName)) { + await registry.removeSource(serverName); + } + await startBundleSource( + { name: instance.bundleName }, + registry, + this.eventSink, + this.configPath ? dirname(this.configPath) : undefined, + { + wsId, + workDir: this.workDir, + allowInsecureRemotes: this.allowInsecureRemotes, + }, + ); + this.transition(instance, "running"); + return { ok: true }; + } catch (err) { + // The old source is already removed and the new one failed to + // start. The instance is in `dead` state until the operator + // fixes the underlying cause (typically a bad credential value) + // and the next save retries the respawn. + this.transition(instance, "dead"); + return { + ok: false, + error: err instanceof Error ? err.message : String(err), + }; + } + } + // ---- State transitions ------------------------------------------------- /** diff --git a/src/bundles/types.ts b/src/bundles/types.ts index dc804ae9..5305132b 100644 --- a/src/bundles/types.ts +++ b/src/bundles/types.ts @@ -334,7 +334,12 @@ export interface BundleInstance { briefing: BriefingBlock | null; /** HTTP proxy declaration from _meta["ai.nimblebrain/http-proxy"]. */ httpProxy: HttpProxyConfig | null; - /** Whether the bundle is protected from uninstall. */ + /** + * Whether the bundle is protected from tear-down operations: uninstall + * via `nb__manage_app` and respawn via `lifecycle.respawnBundle`. Both + * paths gate on this flag so platform-critical apps can't be torn down + * mid-call (e.g. an in-flight tool would lose its subprocess). + */ protected: boolean; /** Whether this is an Upjack app or plain MCP server. */ type: "upjack" | "plain"; diff --git a/src/config/nimblebrain-config.schema.json b/src/config/nimblebrain-config.schema.json index 61612c4c..fc656e09 100644 --- a/src/config/nimblebrain-config.schema.json +++ b/src/config/nimblebrain-config.schema.json @@ -106,7 +106,7 @@ }, "protected": { "type": "boolean", - "description": "Prevents uninstall via nb__manage_app." + "description": "Prevents tear-down operations: uninstall via nb__manage_app and respawn after credential change." }, "trustScore": { "type": ["number", "null"], "description": "MTF trust score (0-100)." }, "transport": { diff --git a/src/runtime/runtime.ts b/src/runtime/runtime.ts index 5814140d..cc478f67 100644 --- a/src/runtime/runtime.ts +++ b/src/runtime/runtime.ts @@ -316,12 +316,14 @@ export class Runtime { // Create placement registry and lifecycle manager const placementRegistry = new PlacementRegistry(); - const mpakHome = join(resolve(resolveWorkDir(config)), "apps"); + const resolvedWorkDir = resolve(workDir); + const mpakHome = join(resolvedWorkDir, "apps"); const lifecycle = new BundleLifecycleManager( events, config.configPath, config.allowInsecureRemotes, mpakHome, + resolvedWorkDir, ); lifecycle.setPlacementRegistry(placementRegistry); diff --git a/src/tools/connector-tools.ts b/src/tools/connector-tools.ts index 7b03b130..08a5bf21 100644 --- a/src/tools/connector-tools.ts +++ b/src/tools/connector-tools.ts @@ -2306,8 +2306,11 @@ async function handleSetUserConfig( // necessary but not sufficient; without a respawn the running // subprocess keeps using whatever it was launched with. Mirror the // chat agent's `configureBundle` pattern so both the chat path and - // the UI path produce identical post-write state. - const respawn = await respawnBundleAfterCredentialChange(ctx, wsId, bundleName, serverName); + // the UI path produce identical post-write state. `respawnBundle` + // owns the per-(serverName, wsId) mutex and protected-bundle gating. + const registry = ctx.runtime.getRegistryForWorkspace(wsId); + const respawnResult = await ctx.runtime.getLifecycle().respawnBundle(serverName, wsId, registry); + const respawn = respawnResult.ok ? { ok: true } : { ok: false, error: respawnResult.error }; const populated = await probeUserConfigPopulated(wsId, bundleName, workDir, schema); return { @@ -2365,49 +2368,6 @@ async function handleClearUserConfig( }; } -/** - * Tear down + restart a stdio bundle's McpSource so a fresh subprocess - * picks up the just-written credentials from the workspace credential - * store. Called after `set_user_config` and `clear_user_config`. - * - * Why not just leave the bundle running? Mode 1 bundles read - * `user_config` once, at spawn, via `${user_config.foo}` placeholders - * resolved into env vars. The subprocess has no way to re-read after - * launch. Without this respawn the user updates a key in the UI, - * sees "✓ configured," then watches the next tool call fail with the - * old key — the bug the user hit before this fix. - * - * Best-effort by design: a respawn failure (e.g., required field still - * missing after a partial save) shouldn't roll back the credential - * write. The caller's structured response carries `{ respawn: { ok, - * error? } }` so the UI can surface the failure separately. - */ -async function respawnBundleAfterCredentialChange( - ctx: ManageConnectorsContext, - wsId: string, - bundleName: string, - serverName: string, -): Promise<{ ok: boolean; error?: string }> { - try { - const registry = ctx.runtime.getRegistryForWorkspace(wsId); - if (registry.hasSource(serverName)) { - await registry.removeSource(serverName); - } - // Pass `name` (the scoped manifest name) so startBundleSource hits - // the named-bundle path that resolves user_config from the - // workspace credential store. configDir is undefined — same as - // configureBundle's call site; named-bundle path doesn't need it. - await startBundleSource({ name: bundleName }, registry, ctx.runtime.getEventSink(), undefined, { - wsId, - workDir: ctx.runtime.getWorkDir(), - allowInsecureRemotes: ctx.runtime.getAllowInsecureRemotes(), - }); - return { ok: true }; - } catch (err) { - return { ok: false, error: err instanceof Error ? err.message : String(err) }; - } -} - /** * Workspace admin gate. Returns true if the identity is a workspace * admin — explicitly via `members[].role === "admin"`, or implicitly diff --git a/src/tools/system-tools.ts b/src/tools/system-tools.ts index 2c5f8db1..800b933b 100644 --- a/src/tools/system-tools.ts +++ b/src/tools/system-tools.ts @@ -2,7 +2,6 @@ import { NoopEventSink } from "../adapters/noop-events.ts"; import type { BundleLifecycleManager } from "../bundles/lifecycle.ts"; import { getMpak } from "../bundles/mpak.ts"; import { deriveServerName } from "../bundles/paths.ts"; -import { startBundleSource } from "../bundles/startup.ts"; import type { BundleManifest } from "../bundles/types.ts"; import { installBundleInWorkspace, @@ -219,7 +218,7 @@ export async function createSystemTools( return await configureBundle( name, getRegistry(), - manageBundleCtx.eventSink, + lifecycle, wsId, manageBundleCtx.workDir, gate, @@ -712,14 +711,12 @@ function formatUptime(ms: number): string { async function configureBundle( name: string, registry: ToolRegistry, - // Required — passed to the restarted McpSource so its task-augmented tool - // progress events reach SSE broadcasts (Synapse useDataSync). Without it, - // re-configuring a bundle's credentials silently breaks live updates for - // that bundle until the next full platform restart. - eventSink: EventSink, + lifecycle: BundleLifecycleManager, // Workspace id + work directory — required because credentials are stored // per-workspace (`{workDir}/workspaces/{wsId}/credentials/{bundle}.json`), // not globally in `~/.mpak/config.json`. Threaded from the manage_app handler. + // The respawn itself reads eventSink and workDir off the lifecycle manager; + // workDir here is used only by `resolveUserConfig` for the credential write. wsId: string, workDir: string, confirmGate?: ConfirmationGate, @@ -771,26 +768,21 @@ async function configureBundle( forcePrompt: true, }); - // Restart the bundle via the shared primitive — same construction path - // as boot-time / agent install. `startBundleSource` reads the values we - // just persisted above from the workspace credential store. If this - // function diverges from that primitive the rest of the app silently - // breaks (sink plumbing, PYTHONPATH, data-dir layout, user_config - // resolution). Delegate instead: pass `wsId`+`workDir` and let - // `startBundleSource` derive the workspace-scoped data dir itself — - // never compute it here, or it drifts from the install-time layout - // and Upjack entity state disappears across restarts. + // Re-spawn via the central lifecycle helper so this path gets the + // same per-(serverName, wsId) mutex + protected-bundle gating that + // the UI's `set_user_config` path uses. The helper reads the + // workspace credential store via `startBundleSource` → `prepareServer`, + // so the values we just persisted above land in the new subprocess. const serverName = deriveServerName(name); - if (registry.hasSource(serverName)) { - await registry.removeSource(serverName); + const respawn = await lifecycle.respawnBundle(serverName, wsId, registry); + if (!respawn.ok) { + return { + content: textContent(`Configured ${name} but restart failed: ${respawn.error}`), + isError: true, + }; } - const result = await startBundleSource({ name }, registry, eventSink, undefined, { - wsId, - workDir, - }); - const tools = await registry.availableTools(); - const count = tools.filter((t) => t.name.startsWith(`${result.sourceName}__`)).length; + const count = tools.filter((t) => t.name.startsWith(`${serverName}__`)).length; return { content: textContent(`Configured and restarted ${name}. ${count} tools available.`), isError: false, diff --git a/test/unit/lifecycle-respawn.test.ts b/test/unit/lifecycle-respawn.test.ts new file mode 100644 index 00000000..0923c9f9 --- /dev/null +++ b/test/unit/lifecycle-respawn.test.ts @@ -0,0 +1,190 @@ +/** + * Unit tests for `BundleLifecycleManager.respawnBundle`. Pins: + * + * 1. Missing instance → returns `{ ok: false }` with a clear error. + * 2. Protected bundles refuse to respawn (the contract for platform- + * critical apps that must not be torn down mid-call). + * 3. The per-(serverName, wsId) mutex serializes concurrent respawns — + * a second call waits for the first to complete before starting, + * eliminating the half-state window between `removeSource` and the + * new spawn. + * 4. Errors from `startBundleSource` land the instance in `dead` + * state and bubble back through the structured result. + * + * The actual `startBundleSource` integration is covered by + * `test/integration/configure-credentials.test.ts` — this file focuses + * on the new mutex + error-path semantics that don't need a live + * subprocess to exercise. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { NoopEventSink } from "../../src/adapters/noop-events.ts"; +import { BundleLifecycleManager } from "../../src/bundles/lifecycle.ts"; +import type { BundleInstance } from "../../src/bundles/types.ts"; +import { ToolRegistry } from "../../src/tools/registry.ts"; +import type { Tool, ToolSource } from "../../src/tools/types.ts"; + +let workDir: string; +let lifecycle: BundleLifecycleManager; +let registry: ToolRegistry; +const wsId = "ws_test"; +const serverName = "test-bundle"; +const bundleName = "@test/test-bundle"; + +function makeStubSource(name: string): ToolSource { + return { + name, + async start(): Promise {}, + async stop(): Promise {}, + async tools(): Promise { + return []; + }, + async execute(): Promise { + throw new Error("not implemented"); + }, + }; +} + +/** + * Seed a BundleInstance directly into the lifecycle's instances map + * via reflection. Avoids invoking `installNamed`, which would require + * a real mpak cache + manifest. We're testing respawn-time behavior, + * not install-time wiring. + */ +function seedInstance(opts: { protected?: boolean } = {}): BundleInstance { + const instance: BundleInstance = { + serverName, + bundleName, + version: "1.0.0", + state: "running", + trustScore: null, + ui: null, + briefing: null, + httpProxy: null, + protected: opts.protected ?? false, + type: "plain", + wsId, + }; + // `instances` is private; reach in for the seed. This mirrors how + // tests on the existing health-monitor + state-transition code + // reach in to verify state changes. + (lifecycle as unknown as { instances: Map }).instances.set( + `${serverName}|${wsId}`, + instance, + ); + return instance; +} + +beforeEach(() => { + workDir = mkdtempSync(join(tmpdir(), "lifecycle-respawn-")); + lifecycle = new BundleLifecycleManager(new NoopEventSink(), undefined); + registry = new ToolRegistry(); +}); + +afterEach(() => { + rmSync(workDir, { recursive: true, force: true }); +}); + +describe("lifecycle.respawnBundle — guards", () => { + test("returns error when no instance exists for (serverName, wsId)", async () => { + const result = await lifecycle.respawnBundle("missing", wsId, registry); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error).toMatch(/Bundle "missing" is not installed/); + } + }); + + test("refuses to respawn a protected bundle", async () => { + seedInstance({ protected: true }); + registry.addSource(makeStubSource(serverName)); + + const result = await lifecycle.respawnBundle(serverName, wsId, registry); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error).toMatch(/protected/); + } + // Protected bundle's source stays in the registry untouched. + expect(registry.hasSource(serverName)).toBe(true); + }); +}); + +describe("lifecycle.respawnBundle — failure path", () => { + test("startBundleSource failure transitions instance to 'dead' and returns error", async () => { + const instance = seedInstance(); + registry.addSource(makeStubSource(serverName)); + + // No bundle in the mpak cache for "@test/test-bundle" → startBundleSource + // throws when prepareServer can't find it. We're verifying the error + // is captured and the instance is marked dead, not the specific + // error string mpak emits. + const result = await lifecycle.respawnBundle(serverName, wsId, registry); + expect(result.ok).toBe(false); + expect(instance.state).toBe("dead"); + }); +}); + +describe("lifecycle.respawnBundle — concurrency", () => { + test("max-1-in-flight: concurrent respawns for the same key never overlap inside removeSource", async () => { + seedInstance(); + + // Wrap the registry's removeSource to count in-flight invocations. + // The mutex's job is to ensure the stop/start critical section of + // one respawn completes before another can start. If `removeSource` + // ever sees two simultaneous callers, the mutex is broken. + let inFlight = 0; + let maxInFlight = 0; + const slowSource: ToolSource = { + name: serverName, + async start(): Promise {}, + async stop(): Promise { + // Short delay so a broken mutex would have time to race. + await new Promise((r) => setTimeout(r, 30)); + }, + async tools(): Promise { + return []; + }, + async execute(): Promise { + throw new Error("not implemented"); + }, + }; + registry.addSource(slowSource); + const originalRemove = registry.removeSource.bind(registry); + registry.removeSource = async (name: string) => { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + try { + return await originalRemove(name); + } finally { + inFlight--; + // After the first call removes the source, the second call + // will find no source to remove — but it'll still try, so + // re-add the stub for the next iteration. + if (!registry.hasSource(serverName)) { + registry.addSource({ + ...slowSource, + name: serverName, + }); + } + } + }; + + const [r1, r2] = await Promise.all([ + lifecycle.respawnBundle(serverName, wsId, registry), + lifecycle.respawnBundle(serverName, wsId, registry), + ]); + + expect(maxInFlight).toBe(1); // mutex held: never two removeSource simultaneously + expect(r1.ok).toBe(false); // both fail at startBundleSource (no real bundle) + expect(r2.ok).toBe(false); + }); + + test("respawn map drops the key after the last queued op resolves", async () => { + seedInstance(); + await lifecycle.respawnBundle(serverName, wsId, registry); + const locks = (lifecycle as unknown as { respawnLocks: Map }).respawnLocks; + expect(locks.has(`${serverName}|${wsId}`)).toBe(false); + }); +});