Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/orchestrator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type {
ToolListAggregator,
ToolListAggregatorOptions,
WorkspaceToolLister,
WorkspaceToolListing,
} from "./tool-list-aggregator.ts";
export { createToolListAggregator } from "./tool-list-aggregator.ts";

Expand Down
6 changes: 5 additions & 1 deletion src/orchestrator/tool-list-aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ import {
type WorkspaceToolLister,
} from "./tool-list-cache.ts";

export type { NamespacedToolDescriptor, WorkspaceToolLister } from "./tool-list-cache.ts";
export type {
NamespacedToolDescriptor,
WorkspaceToolLister,
WorkspaceToolListing,
} from "./tool-list-cache.ts";

/**
* Lists the kernel identity sources' tools (conversations, …), source-qualified
Expand Down
237 changes: 183 additions & 54 deletions src/orchestrator/tool-list-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,29 @@ export interface NamespacedToolDescriptor {
}

/**
* Per-workspace tool lister. Caller supplies one of these (typically
* `(wsId) => runtime.getRegistryForWorkspace(wsId).availableTools()` in
* production). The lister is treated as the source of truth and is the
* only function the cache invokes for a given workspace until the
* watcher fires.
* One workspace's listing: the bare-named tools plus whether enumeration
* was COMPLETE. `complete: false` means at least one source was skipped
* because it wasn't ready yet (cold start, subprocess restart, pending
* auth), so the list is partial — the cache must not memoize it.
*/
export interface WorkspaceToolListing {
tools: readonly Tool[];
complete: boolean;
}

/**
* Per-workspace tool lister. Caller supplies one of these (typically a
* wrapper over `runtime.getRegistryForWorkspace(wsId)` in production). The
* lister is treated as the source of truth and is the only function the
* cache invokes for a given workspace until the watcher fires.
*
* Returns `Tool[]` (bare names) — the cache namespaces every entry via
* `namespacedToolName` before handing it to a consumer.
* Returns a {@link WorkspaceToolListing}: bare-named `Tool[]` (the cache
* namespaces every entry via `namespacedToolName` before handing it to a
* consumer) plus a `complete` flag. The cache refuses to memoize a listing
* whose `complete` is false, so a partial cold-start snapshot can never go
* sticky and starve discovery until an unrelated invalidation fires.
*/
export type WorkspaceToolLister = (wsId: string) => Promise<readonly Tool[]>;
export type WorkspaceToolLister = (wsId: string) => Promise<WorkspaceToolListing>;

export interface ToolListCacheOptions {
/** Override the 100ms default. Lower for tests, higher in production. */
Expand All @@ -123,7 +136,26 @@ export interface ToolListCacheOptions {
*/
interface WorkspaceWatchEntry {
watcher: FSWatcher;
/**
* Memoized listing — only ever a COMPLETE one. `null` means "next ask
* re-lists." A partial (cold-start) listing is never stored here, so a
* present `toolsPromise` is complete by construction.
*/
toolsPromise: Promise<readonly Tool[]> | null;
/**
* Shared in-flight listing so concurrent first-askers don't each hit the
* lister. Carries the `complete` flag (unlike `toolsPromise`, which holds
* complete listings only) so the awaiter can decide whether to memoize.
* Cleared when it settles.
*/
listingInFlight: Promise<WorkspaceToolListing> | null;
/**
* Bumped on every invalidation. A listing captures it before calling the
* lister and refuses to memoize if it changed while in flight — so an
* invalidation that lands DURING a compute can't be overwritten by the
* now-stale result that compute is about to produce.
*/
generation: number;
pendingDebounce: ReturnType<typeof setTimeout> | null;
/**
* Identities currently caching a union that includes this workspace.
Expand All @@ -142,9 +174,44 @@ export class ToolListCache {
/** Per-workspace cache + watcher. */
private readonly workspaces = new Map<string, WorkspaceWatchEntry>();

/** Per-identity union cache — the public answer the aggregator hands out. */
/**
* Per-identity union cache — the public answer the aggregator hands out.
* Only ever holds a union built entirely from COMPLETE workspace listings.
*/
private readonly identityUnions = new Map<string, Promise<readonly NamespacedToolDescriptor[]>>();

/**
* Shared in-flight union computations, keyed by identity, so concurrent
* first-askers share one fan-out. Carries the union's `complete` flag so
* the awaiter only memoizes a union with no partial contributing
* workspace. Cleared when it settles.
*/
private readonly unionInFlight = new Map<
string,
Promise<{ union: readonly NamespacedToolDescriptor[]; complete: boolean }>
>();

/**
* Per-identity generation counter — the union-level analog of
* `WorkspaceWatchEntry.generation`. Bumped whenever an invalidation touches
* the identity (a workspace it subscribes to changed, or its membership
* changed). A union compute captures it up front and refuses to memoize if
* it changed mid-flight, so an invalidation during the compute can't be
* clobbered by the stale union about to resolve.
*
* Deliberately the one structure here that does NOT reap (unlike orphaned
* watchers or `identityUnions`): it's monotonic and only cleared in
* `dispose`. Growth is bounded by distinct identities over the process
* lifetime (a tenant's user count) at one small int each — not a leak worth
* chasing. Reaping it would be actively unsafe unless guarded: a plain
* delete resets the entry to 0, and if that 0 happens to equal an in-flight
* compute's captured epoch the guard reads "unchanged" and memoizes a result
* the invalidation should have rejected. A correctness-sensitive conditional
* to reclaim kilobytes is a bad trade, so we don't. Monotonic; never deleted
* outside `dispose` (resetting could let a stale in-flight read as current).
*/
private readonly identityEpochs = new Map<string, number>();

private disposed = false;

constructor(workDir: string, lister: WorkspaceToolLister, options: ToolListCacheOptions = {}) {
Expand All @@ -156,39 +223,47 @@ export class ToolListCache {
// ── Per-workspace ─────────────────────────────────────────────────

/**
* Return the cached `Tool[]` for `wsId`, populating on first ask.
* Return the listing for `wsId`, populating on first ask.
*
* Wraps the lister call in a memoized promise so concurrent callers
* during the first listing all share one in-flight request — the same
* pattern `index-cache` relies on for cold-start fan-in.
* Concurrent first-askers share one in-flight request (`listingInFlight`)
* — the cold-start fan-in pattern `index-cache` relies on.
*
* If the lister rejects, the rejection is propagated to the awaiter
* and the cached slot is cleared so a subsequent call retries. This
* mirrors the registry's per-source error containment in
* `ToolRegistry.availableTools` (one stuck source shouldn't poison
* the cache forever).
* Memoization is gated on completeness: a COMPLETE listing is cached in
* `toolsPromise` (served directly on the next ask); a PARTIAL listing (a
* source skipped because it wasn't ready) is returned to this caller but
* left uncached, so the next ask re-lists once the source is up. This is
* the core of the stale-empty-union fix — an incomplete snapshot can never
* become sticky and starve discovery until some unrelated invalidation
* happens to fire.
*
* If the lister rejects, the rejection propagates and nothing is cached, so
* a subsequent call retries — mirroring `ToolRegistry.availableTools`'
* per-source containment (one stuck source shouldn't poison the cache).
*/
async getWorkspaceTools(wsId: string): Promise<readonly Tool[]> {
async getWorkspaceListing(wsId: string): Promise<WorkspaceToolListing> {
this.assertOpen();
const entry = this.ensureWatchEntry(wsId);
if (entry.toolsPromise === null) {
// Holder pattern: declare a mutable holder so the catch can
// self-identify against the cached slot without a forward
// reference to the variable that captures it. A naked IIFE
// referencing its own outer-let binding trips
// `used before assignment`.
const holder: { p: Promise<readonly Tool[]> | null } = { p: null };
holder.p = this.lister(wsId).catch((err: unknown) => {
// Drop the cached slot so the next call retries instead of
// serving a permanently-poisoned rejection — same posture as
// `ToolRegistry.availableTools` (one-source-down doesn't
// poison the cache forever).
if (entry.toolsPromise === holder.p) entry.toolsPromise = null;
throw err;
});
entry.toolsPromise = holder.p;
// Memoized hit — present `toolsPromise` is complete by construction.
if (entry.toolsPromise !== null) {
return { tools: await entry.toolsPromise, complete: true };
}
// Share an in-flight listing across concurrent first-askers.
if (entry.listingInFlight !== null) return entry.listingInFlight;
// Capture the generation before listing. If an invalidation lands while
// the lister runs it bumps `generation`, and we refuse to memoize the
// now-stale result below — the next ask re-lists against current state.
const gen = entry.generation;
const inFlight = this.lister(wsId);
entry.listingInFlight = inFlight;
try {
const listing = await inFlight;
if (listing.complete && entry.generation === gen && entry.toolsPromise === null) {
entry.toolsPromise = Promise.resolve(listing.tools);
}
return listing;
} finally {
if (entry.listingInFlight === inFlight) entry.listingInFlight = null;
}
return entry.toolsPromise;
}

// ── Per-identity union ────────────────────────────────────────────
Expand All @@ -201,7 +276,7 @@ export class ToolListCache {
* the workspace loop entirely.
*
* Watcher attachment for each workspace happens inside
* `getWorkspaceTools` → `ensureWatchEntry`, so this call site
* `getWorkspaceListing` → `ensureWatchEntry`, so this call site
* doesn't have to know FS layout. Membership tracking
* (`subscribedIdentities`) is updated here because the workspace
* watcher needs to know which identity unions to drop when its
Expand All @@ -214,9 +289,25 @@ export class ToolListCache {
): Promise<readonly NamespacedToolDescriptor[]> {
this.assertOpen();
const existing = this.identityUnions.get(identityId);
// A memoized union was built entirely from COMPLETE listings AND no
// invalidation touched this identity while it was built (the epoch guard
// below) — so it's both complete and fresh, safe to serve directly.
if (existing) return existing;
// Share an in-flight fan-out across concurrent first-askers.
const inFlight = this.unionInFlight.get(identityId);
if (inFlight) return inFlight.then((r) => r.union);

const p = (async (): Promise<readonly NamespacedToolDescriptor[]> => {
// Capture the identity's generation before computing. An invalidation
// landing mid-compute (a subscribed workspace changed, or membership
// changed) bumps it, and we refuse to memoize below — leaving the union
// uncached so the next ask rebuilds AND re-subscribes to the workspace
// whose watcher set this identity was just cleared from.
const epoch = this.identityEpochs.get(identityId) ?? 0;

const compute = (async (): Promise<{
union: readonly NamespacedToolDescriptor[];
complete: boolean;
}> => {
// Record interest BEFORE listing so an FS event during listing
// invalidates correctly. Order matters: the watcher needs the
// identity in its set the moment any one workspace's listing
Expand All @@ -233,11 +324,16 @@ export class ToolListCache {
// lister already contains per-SOURCE failures one level down; this
// catches the rarer whole-WORKSPACE listing failure.
const settled = await Promise.allSettled(
wsIds.map(async (wsId) => ({ wsId, tools: await this.getWorkspaceTools(wsId) })),
wsIds.map(async (wsId) => ({ wsId, listing: await this.getWorkspaceListing(wsId) })),
);
const out: NamespacedToolDescriptor[] = [];
// The union is complete only if every workspace listing succeeded AND
// was itself complete. A rejected whole-workspace listing or a partial
// (cold-start) one makes the union partial — see below.
let complete = true;
for (const result of settled) {
if (result.status === "rejected") {
complete = false;
log.debug(
"mcp",
`[tool-list-cache] dropping a workspace from the union for identity "${identityId}": ${
Expand All @@ -246,8 +342,9 @@ export class ToolListCache {
);
continue;
}
const { wsId, tools } = result.value;
for (const t of tools) {
const { wsId, listing } = result.value;
if (!listing.complete) complete = false;
for (const t of listing.tools) {
out.push({
name: namespace(wsId, t.name),
wsId,
Expand All @@ -259,19 +356,29 @@ export class ToolListCache {
});
}
}
return out;
return { union: out, complete };
})();
this.identityUnions.set(identityId, p);
// If the union failed, drop it so the next call retries — same
// posture as `getWorkspaceTools`. The clearer pattern is to await
// here, but doing so would serialize unrelated identities; instead
// we hang an error-handler off the cached promise.
p.catch(() => {
if (this.identityUnions.get(identityId) === p) {
this.identityUnions.delete(identityId);

this.unionInFlight.set(identityId, compute);
try {
const { union, complete } = await compute;
// Memoize ONLY when (a) no contributing workspace was partial, and (b)
// no invalidation touched this identity while we were computing
// (epoch unchanged). (a) keeps a cold-start partial out of the cache;
// (b) keeps a result that an invalidation already superseded out — and,
// critically, leaves it uncached so the next ask re-subscribes to any
// workspace this identity was unsubscribed from mid-compute. Without (b)
// we'd memoize a stale union with a broken watcher subscription — the
// very "stale until an unrelated invalidation" failure this PR kills.
if (complete && (this.identityEpochs.get(identityId) ?? 0) === epoch) {
this.identityUnions.set(identityId, Promise.resolve(union));
}
});
return p;
return union;
} finally {
if (this.unionInFlight.get(identityId) === compute) {
this.unionInFlight.delete(identityId);
}
}
}

/**
Expand All @@ -290,6 +397,9 @@ export class ToolListCache {
*/
invalidateIdentity(identityId: string): void {
this.identityUnions.delete(identityId);
// Bump the identity epoch so an in-flight union compute for this identity
// refuses to memoize a result built before this membership change.
this.bumpIdentityEpoch(identityId);
const orphaned: string[] = [];
for (const [wsId, entry] of this.workspaces) {
entry.subscribedIdentities.delete(identityId);
Expand All @@ -313,7 +423,7 @@ export class ToolListCache {
/**
* Close every watcher, clear every debounce timer, drop every
* cache entry. Idempotent. After `dispose()` the cache is closed —
* further `getWorkspaceTools` / `getUnionForIdentity` calls throw.
* further `getWorkspaceListing` / `getUnionForIdentity` calls throw.
*
* The `index-cache` analog is `stopWatching()`. We close more here
* (the per-identity union map is cleared too) because the cache
Expand All @@ -332,6 +442,14 @@ export class ToolListCache {
}
this.workspaces.clear();
this.identityUnions.clear();
this.unionInFlight.clear();
this.identityEpochs.clear();
}

/** Bump an identity's generation so any in-flight union compute for it
* refuses to memoize a now-superseded result. */
private bumpIdentityEpoch(identityId: string): void {
this.identityEpochs.set(identityId, (this.identityEpochs.get(identityId) ?? 0) + 1);
}

// ── Test / inspection helpers ─────────────────────────────────────
Expand Down Expand Up @@ -386,6 +504,8 @@ export class ToolListCache {
const entry: WorkspaceWatchEntry = {
watcher,
toolsPromise: null,
listingInFlight: null,
generation: 0,
pendingDebounce: null,
subscribedIdentities: new Set(),
};
Expand Down Expand Up @@ -424,10 +544,19 @@ export class ToolListCache {
invalidateWorkspace(wsId: string): void {
const entry = this.workspaces.get(wsId);
if (!entry) return;
// Bump generation and clear any in-flight listing so a listing already
// running against the old state can't memoize, and a caller arriving
// after this point starts a fresh listing rather than sharing the stale
// in-flight one.
entry.generation += 1;
entry.toolsPromise = null;
// Drop every identity union that read from this workspace.
entry.listingInFlight = null;
// Drop every identity union that read from this workspace, and bump each
// identity's epoch so an in-flight union compute for it refuses to memoize
// a result built before this change.
for (const identityId of entry.subscribedIdentities) {
this.identityUnions.delete(identityId);
this.bumpIdentityEpoch(identityId);
}
entry.subscribedIdentities.clear();
}
Expand Down
Loading