Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 68 additions & 10 deletions packages/opencode/src/acp/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,76 @@ import type { OpencodeClient } from "@opencode-ai/sdk/v2"

const log = Log.create({ service: "acp-session-manager" })

/** Session TTL in milliseconds (1 hour) */
const SESSION_TTL_MS = 60 * 60 * 1000
/** Cleanup interval in milliseconds (5 minutes) */
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000
/** Maximum number of sessions to keep */
const MAX_SESSIONS = 50

interface SessionEntry {
state: ACPSessionState
lastAccess: number
}

export class ACPSessionManager {
private sessions = new Map<string, ACPSessionState>()
private sessions = new Map<string, SessionEntry>()
private cleanupTimer: ReturnType<typeof setInterval> | undefined
private sdk: OpencodeClient

constructor(sdk: OpencodeClient) {
this.sdk = sdk
this.startCleanupTimer()
}

private startCleanupTimer() {
if (this.cleanupTimer) return
this.cleanupTimer = setInterval(() => this.cleanup(), CLEANUP_INTERVAL_MS)
this.cleanupTimer.unref()
}

private cleanup() {
const now = Date.now()
const toDelete: string[] = []

for (const [id, entry] of this.sessions) {
if (now - entry.lastAccess > SESSION_TTL_MS) {
toDelete.push(id)
}
}

if (toDelete.length > 0) {
log.info("cleaning_up_stale_sessions", { count: toDelete.length })
for (const id of toDelete) {
this.sessions.delete(id)
}
}

if (this.sessions.size > MAX_SESSIONS) {
const sorted = [...this.sessions.entries()].sort((a, b) => a[1].lastAccess - b[1].lastAccess)
const excess = sorted.slice(0, this.sessions.size - MAX_SESSIONS)
log.info("evicting_excess_sessions", { count: excess.length })
for (const [id] of excess) {
this.sessions.delete(id)
}
}
}

dispose() {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer)
this.cleanupTimer = undefined
}
this.sessions.clear()
}

tryGet(sessionId: string): ACPSessionState | undefined {
return this.sessions.get(sessionId)
const entry = this.sessions.get(sessionId)
if (entry) {
entry.lastAccess = Date.now()
return entry.state
}
return undefined
}

async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise<ACPSessionState> {
Expand All @@ -40,7 +100,7 @@ export class ACPSessionManager {
}
log.info("creating_session", { state })

this.sessions.set(sessionId, state)
this.sessions.set(sessionId, { state, lastAccess: Date.now() })
return state
}

Expand Down Expand Up @@ -71,17 +131,18 @@ export class ACPSessionManager {
}
log.info("loading_session", { state })

this.sessions.set(sessionId, state)
this.sessions.set(sessionId, { state, lastAccess: Date.now() })
return state
}

get(sessionId: string): ACPSessionState {
const session = this.sessions.get(sessionId)
if (!session) {
const entry = this.sessions.get(sessionId)
if (!entry) {
log.error("session not found", { sessionId })
throw RequestError.invalidParams(JSON.stringify({ error: `Session not found: ${sessionId}` }))
}
return session
entry.lastAccess = Date.now()
return entry.state
}

getModel(sessionId: string) {
Expand All @@ -92,7 +153,6 @@ export class ACPSessionManager {
setModel(sessionId: string, model: ACPSessionState["model"]) {
const session = this.get(sessionId)
session.model = model
this.sessions.set(sessionId, session)
return session
}

Expand All @@ -104,14 +164,12 @@ export class ACPSessionManager {
setVariant(sessionId: string, variant?: string) {
const session = this.get(sessionId)
session.variant = variant
this.sessions.set(sessionId, session)
return session
}

setMode(sessionId: string, modeId: string) {
const session = this.get(sessionId)
session.modeId = modeId
this.sessions.set(sessionId, session)
return session
}
}
10 changes: 10 additions & 0 deletions packages/opencode/src/lsp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { Flag } from "@/flag/flag"
export namespace LSP {
const log = Log.create({ service: "lsp" })

const MAX_LSP_CLIENTS = 20

export const Event = {
Updated: BusEvent.define("lsp.updated", z.object({})),
}
Expand Down Expand Up @@ -217,6 +219,14 @@ export namespace LSP {
return existing
}

if (s.clients.length >= MAX_LSP_CLIENTS) {
const oldest = s.clients.shift()
if (oldest) {
log.info("evicting_oldest_lsp_client", { serverID: oldest.serverID, root: oldest.root })
oldest.shutdown().catch(() => {})
}
}

s.clients.push(client)
return client
}
Expand Down
60 changes: 52 additions & 8 deletions packages/opencode/src/project/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,64 @@ import { iife } from "@/util/iife"
import { GlobalBus } from "@/bus/global"
import { Filesystem } from "@/util/filesystem"

const MAX_CACHED_INSTANCES = 10
const INSTANCE_IDLE_TTL_MS = 30 * 60 * 1000

interface Context {
directory: string
worktree: string
project: Project.Info
}

interface CacheEntry {
context: Promise<Context>
lastAccess: number
}

const context = Context.create<Context>("instance")
const cache = new Map<string, Promise<Context>>()
const cache = new Map<string, CacheEntry>()

const disposal = {
all: undefined as Promise<void> | undefined,
}

function evictIdleInstances() {
const now = Date.now()
const toEvict: string[] = []

for (const [key, entry] of cache) {
if (now - entry.lastAccess > INSTANCE_IDLE_TTL_MS) {
toEvict.push(key)
}
}

for (const key of toEvict) {
const entry = cache.get(key)
if (!entry) continue
cache.delete(key)
entry.context
.then((ctx) => context.provide(ctx, () => State.dispose(key)))
.catch(() => {})
}

if (cache.size > MAX_CACHED_INSTANCES) {
const sorted = [...cache.entries()].sort((a, b) => a[1].lastAccess - b[1].lastAccess)
const excess = sorted.slice(0, cache.size - MAX_CACHED_INSTANCES)
for (const [key, entry] of excess) {
cache.delete(key)
entry.context
.then((ctx) => context.provide(ctx, () => State.dispose(key)))
.catch(() => {})
}
}
}

export const Instance = {
async provide<R>(input: { directory: string; init?: () => Promise<any>; fn: () => R }): Promise<R> {
let existing = cache.get(input.directory)
if (!existing) {
Log.Default.info("creating instance", { directory: input.directory })
existing = iife(async () => {
const contextPromise = iife(async () => {
const { project, sandbox } = await Project.fromDirectory(input.directory)
const ctx = {
directory: input.directory,
Expand All @@ -35,9 +75,13 @@ export const Instance = {
})
return ctx
})
existing = { context: contextPromise, lastAccess: Date.now() }
cache.set(input.directory, existing)
evictIdleInstances()
} else {
existing.lastAccess = Date.now()
}
const ctx = await existing
const ctx = await existing.context
return context.provide(ctx, async () => {
return input.fn()
})
Expand Down Expand Up @@ -86,20 +130,20 @@ export const Instance = {
disposal.all = iife(async () => {
Log.Default.info("disposing all instances")
const entries = [...cache.entries()]
for (const [key, value] of entries) {
if (cache.get(key) !== value) continue
for (const [key, entry] of entries) {
if (cache.get(key)?.context !== entry.context) continue

const ctx = await value.catch((error) => {
const ctx = await entry.context.catch((error) => {
Log.Default.warn("instance dispose failed", { key, error })
return undefined
})

if (!ctx) {
if (cache.get(key) === value) cache.delete(key)
if (cache.get(key)?.context === entry.context) cache.delete(key)
continue
}

if (cache.get(key) !== value) continue
if (cache.get(key)?.context !== entry.context) continue

await context.provide(ctx, async () => {
await Instance.dispose()
Expand Down
58 changes: 49 additions & 9 deletions packages/opencode/src/pty/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,49 @@ export namespace Pty {
const BUFFER_LIMIT = 1024 * 1024 * 2
const BUFFER_CHUNK = 64 * 1024

class RingBuffer {
private chunks: string[] = []
private totalSize = 0
private readonly maxSize: number

constructor(maxSize: number) {
this.maxSize = maxSize
}

append(data: string) {
this.chunks.push(data)
this.totalSize += data.length
this.trim()
}

private trim() {
while (this.totalSize > this.maxSize && this.chunks.length > 1) {
const removed = this.chunks.shift()!
this.totalSize -= removed.length
}
if (this.totalSize > this.maxSize && this.chunks.length === 1) {
this.chunks[0] = this.chunks[0].slice(-this.maxSize)
this.totalSize = this.chunks[0].length
}
}

flush(): string {
const result = this.chunks.join("")
this.chunks = []
this.totalSize = 0
return result.slice(-this.maxSize)
}

restore(data: string) {
this.chunks = [data]
this.totalSize = data.length
}

get size() {
return this.totalSize
}
}

const pty = lazy(async () => {
const { spawn } = await import("bun-pty")
return spawn
Expand Down Expand Up @@ -67,7 +110,7 @@ export namespace Pty {
interface ActiveSession {
info: Info
process: IPty
buffer: string
buffer: RingBuffer
subscribers: Set<WSContext>
}

Expand Down Expand Up @@ -138,7 +181,7 @@ export namespace Pty {
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
buffer: new RingBuffer(BUFFER_LIMIT),
subscribers: new Set(),
}
state().set(id, session)
Expand All @@ -153,9 +196,7 @@ export namespace Pty {
ws.send(data)
}
if (open) return
session.buffer += data
if (session.buffer.length <= BUFFER_LIMIT) return
session.buffer = session.buffer.slice(-BUFFER_LIMIT)
session.buffer.append(data)
})
ptyProcess.onExit(({ exitCode }) => {
log.info("session exited", { id, exitCode })
Expand Down Expand Up @@ -223,16 +264,15 @@ export namespace Pty {
}
log.info("client connected to session", { id })
session.subscribers.add(ws)
if (session.buffer) {
const buffer = session.buffer.length <= BUFFER_LIMIT ? session.buffer : session.buffer.slice(-BUFFER_LIMIT)
session.buffer = ""
if (session.buffer.size > 0) {
const buffer = session.buffer.flush()
try {
for (let i = 0; i < buffer.length; i += BUFFER_CHUNK) {
ws.send(buffer.slice(i, i + BUFFER_CHUNK))
}
} catch {
session.subscribers.delete(ws)
session.buffer = buffer
session.buffer.restore(buffer)
ws.close()
return
}
Expand Down
Loading