From e11b21c42f35991f4056a1bab7628b820ad5dde4 Mon Sep 17 00:00:00 2001 From: Stranmor Date: Thu, 5 Feb 2026 11:49:59 +0300 Subject: [PATCH] fix(memory): add TTL, LRU eviction, and ring buffers to prevent memory leaks - ACPSessionManager: add 1h TTL + 50 session limit with periodic cleanup - Instance cache: add 30min idle TTL + 10 instance LRU limit - LSP clients: add 20 client limit with oldest-first eviction - PTY buffers: replace string concatenation with RingBuffer class Addresses issues #10913, #9140, #5363 --- packages/opencode/src/acp/session.ts | 78 ++++++++++++++++++++--- packages/opencode/src/lsp/index.ts | 10 +++ packages/opencode/src/project/instance.ts | 60 ++++++++++++++--- packages/opencode/src/pty/index.ts | 58 ++++++++++++++--- 4 files changed, 179 insertions(+), 27 deletions(-) diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 18aa4231301..33a6dcc384c 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -5,16 +5,76 @@ import type { OpencodeClient } from "@opencode-ai/sdk/v2" const log = Log.create({ service: "acp-session-manager" }) +/** Session TTL in milliseconds (1 hour) */ +const SESSION_TTL_MS = 60 * 60 * 1000 +/** Cleanup interval in milliseconds (5 minutes) */ +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000 +/** Maximum number of sessions to keep */ +const MAX_SESSIONS = 50 + +interface SessionEntry { + state: ACPSessionState + lastAccess: number +} + export class ACPSessionManager { - private sessions = new Map() + private sessions = new Map() + private cleanupTimer: ReturnType | undefined private sdk: OpencodeClient constructor(sdk: OpencodeClient) { this.sdk = sdk + this.startCleanupTimer() + } + + private startCleanupTimer() { + if (this.cleanupTimer) return + this.cleanupTimer = setInterval(() => this.cleanup(), CLEANUP_INTERVAL_MS) + this.cleanupTimer.unref() + } + + private cleanup() { + const now = Date.now() + const toDelete: string[] = [] + + for (const [id, entry] of this.sessions) { + if (now - entry.lastAccess > SESSION_TTL_MS) { + toDelete.push(id) + } + } + + if (toDelete.length > 0) { + log.info("cleaning_up_stale_sessions", { count: toDelete.length }) + for (const id of toDelete) { + this.sessions.delete(id) + } + } + + if (this.sessions.size > MAX_SESSIONS) { + const sorted = [...this.sessions.entries()].sort((a, b) => a[1].lastAccess - b[1].lastAccess) + const excess = sorted.slice(0, this.sessions.size - MAX_SESSIONS) + log.info("evicting_excess_sessions", { count: excess.length }) + for (const [id] of excess) { + this.sessions.delete(id) + } + } + } + + dispose() { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer) + this.cleanupTimer = undefined + } + this.sessions.clear() } tryGet(sessionId: string): ACPSessionState | undefined { - return this.sessions.get(sessionId) + const entry = this.sessions.get(sessionId) + if (entry) { + entry.lastAccess = Date.now() + return entry.state + } + return undefined } async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { @@ -40,7 +100,7 @@ export class ACPSessionManager { } log.info("creating_session", { state }) - this.sessions.set(sessionId, state) + this.sessions.set(sessionId, { state, lastAccess: Date.now() }) return state } @@ -71,17 +131,18 @@ export class ACPSessionManager { } log.info("loading_session", { state }) - this.sessions.set(sessionId, state) + this.sessions.set(sessionId, { state, lastAccess: Date.now() }) return state } get(sessionId: string): ACPSessionState { - const session = this.sessions.get(sessionId) - if (!session) { + const entry = this.sessions.get(sessionId) + if (!entry) { log.error("session not found", { sessionId }) throw RequestError.invalidParams(JSON.stringify({ error: `Session not found: ${sessionId}` })) } - return session + entry.lastAccess = Date.now() + return entry.state } getModel(sessionId: string) { @@ -92,7 +153,6 @@ export class ACPSessionManager { setModel(sessionId: string, model: ACPSessionState["model"]) { const session = this.get(sessionId) session.model = model - this.sessions.set(sessionId, session) return session } @@ -104,14 +164,12 @@ export class ACPSessionManager { setVariant(sessionId: string, variant?: string) { const session = this.get(sessionId) session.variant = variant - this.sessions.set(sessionId, session) return session } setMode(sessionId: string, modeId: string) { const session = this.get(sessionId) session.modeId = modeId - this.sessions.set(sessionId, session) return session } } diff --git a/packages/opencode/src/lsp/index.ts b/packages/opencode/src/lsp/index.ts index 0fd3b69dfcd..4e01f73b956 100644 --- a/packages/opencode/src/lsp/index.ts +++ b/packages/opencode/src/lsp/index.ts @@ -14,6 +14,8 @@ import { Flag } from "@/flag/flag" export namespace LSP { const log = Log.create({ service: "lsp" }) + const MAX_LSP_CLIENTS = 20 + export const Event = { Updated: BusEvent.define("lsp.updated", z.object({})), } @@ -217,6 +219,14 @@ export namespace LSP { return existing } + if (s.clients.length >= MAX_LSP_CLIENTS) { + const oldest = s.clients.shift() + if (oldest) { + log.info("evicting_oldest_lsp_client", { serverID: oldest.serverID, root: oldest.root }) + oldest.shutdown().catch(() => {}) + } + } + s.clients.push(client) return client } diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index 98031f18d3f..0bf66f9f239 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -6,24 +6,64 @@ import { iife } from "@/util/iife" import { GlobalBus } from "@/bus/global" import { Filesystem } from "@/util/filesystem" +const MAX_CACHED_INSTANCES = 10 +const INSTANCE_IDLE_TTL_MS = 30 * 60 * 1000 + interface Context { directory: string worktree: string project: Project.Info } + +interface CacheEntry { + context: Promise + lastAccess: number +} + const context = Context.create("instance") -const cache = new Map>() +const cache = new Map() const disposal = { all: undefined as Promise | undefined, } +function evictIdleInstances() { + const now = Date.now() + const toEvict: string[] = [] + + for (const [key, entry] of cache) { + if (now - entry.lastAccess > INSTANCE_IDLE_TTL_MS) { + toEvict.push(key) + } + } + + for (const key of toEvict) { + const entry = cache.get(key) + if (!entry) continue + cache.delete(key) + entry.context + .then((ctx) => context.provide(ctx, () => State.dispose(key))) + .catch(() => {}) + } + + if (cache.size > MAX_CACHED_INSTANCES) { + const sorted = [...cache.entries()].sort((a, b) => a[1].lastAccess - b[1].lastAccess) + const excess = sorted.slice(0, cache.size - MAX_CACHED_INSTANCES) + for (const [key, entry] of excess) { + cache.delete(key) + entry.context + .then((ctx) => context.provide(ctx, () => State.dispose(key))) + .catch(() => {}) + } + } +} + export const Instance = { async provide(input: { directory: string; init?: () => Promise; fn: () => R }): Promise { let existing = cache.get(input.directory) if (!existing) { Log.Default.info("creating instance", { directory: input.directory }) - existing = iife(async () => { + const contextPromise = iife(async () => { const { project, sandbox } = await Project.fromDirectory(input.directory) const ctx = { directory: input.directory, @@ -35,9 +75,13 @@ export const Instance = { }) return ctx }) + existing = { context: contextPromise, lastAccess: Date.now() } cache.set(input.directory, existing) + evictIdleInstances() + } else { + existing.lastAccess = Date.now() } - const ctx = await existing + const ctx = await existing.context return context.provide(ctx, async () => { return input.fn() }) @@ -86,20 +130,20 @@ export const Instance = { disposal.all = iife(async () => { Log.Default.info("disposing all instances") const entries = [...cache.entries()] - for (const [key, value] of entries) { - if (cache.get(key) !== value) continue + for (const [key, entry] of entries) { + if (cache.get(key)?.context !== entry.context) continue - const ctx = await value.catch((error) => { + const ctx = await entry.context.catch((error) => { Log.Default.warn("instance dispose failed", { key, error }) return undefined }) if (!ctx) { - if (cache.get(key) === value) cache.delete(key) + if (cache.get(key)?.context === entry.context) cache.delete(key) continue } - if (cache.get(key) !== value) continue + if (cache.get(key)?.context !== entry.context) continue await context.provide(ctx, async () => { await Instance.dispose() diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index a27ee9a7441..74c683824a2 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -16,6 +16,49 @@ export namespace Pty { const BUFFER_LIMIT = 1024 * 1024 * 2 const BUFFER_CHUNK = 64 * 1024 + class RingBuffer { + private chunks: string[] = [] + private totalSize = 0 + private readonly maxSize: number + + constructor(maxSize: number) { + this.maxSize = maxSize + } + + append(data: string) { + this.chunks.push(data) + this.totalSize += data.length + this.trim() + } + + private trim() { + while (this.totalSize > this.maxSize && this.chunks.length > 1) { + const removed = this.chunks.shift()! + this.totalSize -= removed.length + } + if (this.totalSize > this.maxSize && this.chunks.length === 1) { + this.chunks[0] = this.chunks[0].slice(-this.maxSize) + this.totalSize = this.chunks[0].length + } + } + + flush(): string { + const result = this.chunks.join("") + this.chunks = [] + this.totalSize = 0 + return result.slice(-this.maxSize) + } + + restore(data: string) { + this.chunks = [data] + this.totalSize = data.length + } + + get size() { + return this.totalSize + } + } + const pty = lazy(async () => { const { spawn } = await import("bun-pty") return spawn @@ -67,7 +110,7 @@ export namespace Pty { interface ActiveSession { info: Info process: IPty - buffer: string + buffer: RingBuffer subscribers: Set } @@ -138,7 +181,7 @@ export namespace Pty { const session: ActiveSession = { info, process: ptyProcess, - buffer: "", + buffer: new RingBuffer(BUFFER_LIMIT), subscribers: new Set(), } state().set(id, session) @@ -153,9 +196,7 @@ export namespace Pty { ws.send(data) } if (open) return - session.buffer += data - if (session.buffer.length <= BUFFER_LIMIT) return - session.buffer = session.buffer.slice(-BUFFER_LIMIT) + session.buffer.append(data) }) ptyProcess.onExit(({ exitCode }) => { log.info("session exited", { id, exitCode }) @@ -223,16 +264,15 @@ export namespace Pty { } log.info("client connected to session", { id }) session.subscribers.add(ws) - if (session.buffer) { - const buffer = session.buffer.length <= BUFFER_LIMIT ? session.buffer : session.buffer.slice(-BUFFER_LIMIT) - session.buffer = "" + if (session.buffer.size > 0) { + const buffer = session.buffer.flush() try { for (let i = 0; i < buffer.length; i += BUFFER_CHUNK) { ws.send(buffer.slice(i, i + BUFFER_CHUNK)) } } catch { session.subscribers.delete(ws) - session.buffer = buffer + session.buffer.restore(buffer) ws.close() return }