From b08c03be746874cdc8ff81e1dcbb0642331089af Mon Sep 17 00:00:00 2001 From: cte Date: Sat, 18 Oct 2025 13:29:14 -0700 Subject: [PATCH 1/4] Single channel extension bridge --- .../cloud/src/bridge/BridgeOrchestrator.ts | 89 ---- packages/cloud/src/bridge/ExtensionChannel.ts | 30 ++ packages/cloud/src/bridge/TaskChannel.ts | 241 ----------- .../src/bridge/__tests__/TaskChannel.test.ts | 407 ------------------ packages/cloud/src/bridge/index.ts | 1 - packages/types/src/task.ts | 19 +- src/core/task/Task.ts | 59 +-- src/core/webview/ClineProvider.ts | 171 ++++---- 8 files changed, 144 insertions(+), 873 deletions(-) delete mode 100644 packages/cloud/src/bridge/TaskChannel.ts delete mode 100644 packages/cloud/src/bridge/__tests__/TaskChannel.test.ts diff --git a/packages/cloud/src/bridge/BridgeOrchestrator.ts b/packages/cloud/src/bridge/BridgeOrchestrator.ts index 16ad0244f08a..30a110abe88b 100644 --- a/packages/cloud/src/bridge/BridgeOrchestrator.ts +++ b/packages/cloud/src/bridge/BridgeOrchestrator.ts @@ -3,10 +3,8 @@ import os from "os" import { type TaskProviderLike, - type TaskLike, type CloudUserInfo, type ExtensionBridgeCommand, - type TaskBridgeCommand, type StaticAppProperties, type GitProperties, ConnectionState, @@ -16,7 +14,6 @@ import { import { SocketTransport } from "./SocketTransport.js" import { ExtensionChannel } from "./ExtensionChannel.js" -import { TaskChannel } from "./TaskChannel.js" export interface BridgeOrchestratorOptions { userId: string @@ -35,8 +32,6 @@ export interface BridgeOrchestratorOptions { export class BridgeOrchestrator { private static instance: BridgeOrchestrator | null = null - private static pendingTask: TaskLike | null = null - // Core private readonly userId: string private readonly socketBridgeUrl: string @@ -50,7 +45,6 @@ export class BridgeOrchestrator { // Components private socketTransport: SocketTransport private extensionChannel: ExtensionChannel - private taskChannel: TaskChannel // Reconnection private readonly MAX_RECONNECT_ATTEMPTS = Infinity @@ -153,22 +147,6 @@ export class BridgeOrchestrator { } } - /** - * @TODO: What if subtasks also get spawned? We'd probably want deferred - * subscriptions for those too. - */ - public static async subscribeToTask(task: TaskLike): Promise { - const instance = BridgeOrchestrator.instance - - if (instance && instance.socketTransport.isConnected()) { - console.log(`[BridgeOrchestrator#subscribeToTask] Subscribing to task ${task.taskId}`) - await instance.subscribeToTask(task) - } else { - console.log(`[BridgeOrchestrator#subscribeToTask] Deferring subscription for task ${task.taskId}`) - BridgeOrchestrator.pendingTask = task - } - } - private constructor(options: BridgeOrchestratorOptions) { this.userId = options.userId this.socketBridgeUrl = options.socketBridgeUrl @@ -206,13 +184,6 @@ export class BridgeOrchestrator { provider: this.provider, isCloudAgent: this.isCloudAgent, }) - - this.taskChannel = new TaskChannel({ - instanceId: this.instanceId, - appProperties: this.appProperties, - gitProperties: this.gitProperties, - isCloudAgent: this.isCloudAgent, - }) } private setupSocketListeners() { @@ -235,14 +206,6 @@ export class BridgeOrchestrator { this.extensionChannel?.handleCommand(message) }) - - socket.on(TaskSocketEvents.RELAYED_COMMAND, (message: TaskBridgeCommand) => { - console.log( - `[BridgeOrchestrator] on(${TaskSocketEvents.RELAYED_COMMAND}) -> ${message.type} for ${message.taskId}`, - ) - - this.taskChannel.handleCommand(message) - }) } private async handleConnect() { @@ -254,27 +217,10 @@ export class BridgeOrchestrator { } await this.extensionChannel.onConnect(socket) - await this.taskChannel.onConnect(socket) - - if (BridgeOrchestrator.pendingTask) { - console.log( - `[BridgeOrchestrator#handleConnect] Subscribing to task ${BridgeOrchestrator.pendingTask.taskId}`, - ) - - try { - await this.subscribeToTask(BridgeOrchestrator.pendingTask) - BridgeOrchestrator.pendingTask = null - } catch (error) { - console.error( - `[BridgeOrchestrator#handleConnect] subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`, - ) - } - } } private handleDisconnect() { this.extensionChannel.onDisconnect() - this.taskChannel.onDisconnect() } private async handleReconnect() { @@ -291,39 +237,6 @@ export class BridgeOrchestrator { this.setupSocketListeners() await this.extensionChannel.onReconnect(socket) - await this.taskChannel.onReconnect(socket) - } - - // Task API - - public async subscribeToTask(task: TaskLike): Promise { - const socket = this.socketTransport.getSocket() - - if (!socket || !this.socketTransport.isConnected()) { - console.warn("[BridgeOrchestrator] Cannot subscribe to task: not connected. Will retry when connected.") - this.taskChannel.addPendingTask(task) - - if ( - this.connectionState === ConnectionState.DISCONNECTED || - this.connectionState === ConnectionState.FAILED - ) { - await this.connect() - } - - return - } - - await this.taskChannel.subscribeToTask(task, socket) - } - - public async unsubscribeFromTask(taskId: string): Promise { - const socket = this.socketTransport.getSocket() - - if (!socket) { - return - } - - await this.taskChannel.unsubscribeFromTask(taskId, socket) } // Shared API @@ -339,10 +252,8 @@ export class BridgeOrchestrator { public async disconnect(): Promise { await this.extensionChannel.cleanup(this.socketTransport.getSocket()) - await this.taskChannel.cleanup(this.socketTransport.getSocket()) await this.socketTransport.disconnect() BridgeOrchestrator.instance = null - BridgeOrchestrator.pendingTask = null } public async reconnect(): Promise { diff --git a/packages/cloud/src/bridge/ExtensionChannel.ts b/packages/cloud/src/bridge/ExtensionChannel.ts index 91371551df0f..3fa8b6589560 100644 --- a/packages/cloud/src/bridge/ExtensionChannel.ts +++ b/packages/cloud/src/bridge/ExtensionChannel.ts @@ -174,6 +174,36 @@ export class ExtensionChannel extends BaseChannel< } private setupListeners(): void { + // private readonly eventMapping: readonly TaskEventMapping[] = [ + // { + // from: RooCodeEventName.Message, + // to: TaskBridgeEventName.Message, + // createPayload: (task: TaskLike, data: { action: string; message: ClineMessage }) => ({ + // type: TaskBridgeEventName.Message, + // taskId: task.taskId, + // action: data.action, + // message: data.message, + // }), + // }, + // { + // from: RooCodeEventName.TaskModeSwitched, + // to: TaskBridgeEventName.TaskModeSwitched, + // createPayload: (task: TaskLike, mode: string) => ({ + // type: TaskBridgeEventName.TaskModeSwitched, + // taskId: task.taskId, + // mode, + // }), + // }, + // { + // from: RooCodeEventName.TaskInteractive, + // to: TaskBridgeEventName.TaskInteractive, + // createPayload: (task: TaskLike, _taskId: string) => ({ + // type: TaskBridgeEventName.TaskInteractive, + // taskId: task.taskId, + // }), + // }, + // ] as const + const eventMapping = [ { from: RooCodeEventName.TaskCreated, to: ExtensionBridgeEventName.TaskCreated }, { from: RooCodeEventName.TaskStarted, to: ExtensionBridgeEventName.TaskStarted }, diff --git a/packages/cloud/src/bridge/TaskChannel.ts b/packages/cloud/src/bridge/TaskChannel.ts deleted file mode 100644 index 433e740d4edb..000000000000 --- a/packages/cloud/src/bridge/TaskChannel.ts +++ /dev/null @@ -1,241 +0,0 @@ -import type { Socket } from "socket.io-client" - -import { - type ClineMessage, - type TaskEvents, - type TaskLike, - type TaskBridgeCommand, - type TaskBridgeEvent, - type JoinResponse, - type LeaveResponse, - RooCodeEventName, - TaskBridgeEventName, - TaskBridgeCommandName, - TaskSocketEvents, -} from "@roo-code/types" - -import { type BaseChannelOptions, BaseChannel } from "./BaseChannel.js" - -type TaskEventListener = { - [K in keyof TaskEvents]: (...args: TaskEvents[K]) => void | Promise -}[keyof TaskEvents] - -type TaskEventMapping = { - from: keyof TaskEvents - to: TaskBridgeEventName - createPayload: (task: TaskLike, ...args: any[]) => any // eslint-disable-line @typescript-eslint/no-explicit-any -} - -// eslint-disable-next-line @typescript-eslint/no-empty-object-type -interface TaskChannelOptions extends BaseChannelOptions {} - -/** - * Manages task-level communication channels. - * Handles task subscriptions, messaging, and task-specific commands. - */ -export class TaskChannel extends BaseChannel< - TaskBridgeCommand, - TaskSocketEvents, - TaskBridgeEvent | { taskId: string } -> { - private subscribedTasks: Map = new Map() - private pendingTasks: Map = new Map() - private taskListeners: Map> = new Map() - - private readonly eventMapping: readonly TaskEventMapping[] = [ - { - from: RooCodeEventName.Message, - to: TaskBridgeEventName.Message, - createPayload: (task: TaskLike, data: { action: string; message: ClineMessage }) => ({ - type: TaskBridgeEventName.Message, - taskId: task.taskId, - action: data.action, - message: data.message, - }), - }, - { - from: RooCodeEventName.TaskModeSwitched, - to: TaskBridgeEventName.TaskModeSwitched, - createPayload: (task: TaskLike, mode: string) => ({ - type: TaskBridgeEventName.TaskModeSwitched, - taskId: task.taskId, - mode, - }), - }, - { - from: RooCodeEventName.TaskInteractive, - to: TaskBridgeEventName.TaskInteractive, - createPayload: (task: TaskLike, _taskId: string) => ({ - type: TaskBridgeEventName.TaskInteractive, - taskId: task.taskId, - }), - }, - ] as const - - constructor(options: TaskChannelOptions) { - super(options) - } - - protected async handleCommandImplementation(command: TaskBridgeCommand): Promise { - const task = this.subscribedTasks.get(command.taskId) - - if (!task) { - console.error(`[TaskChannel] Unable to find task ${command.taskId}`) - return - } - - switch (command.type) { - case TaskBridgeCommandName.Message: - console.log( - `[TaskChannel] ${TaskBridgeCommandName.Message} ${command.taskId} -> submitUserMessage()`, - command, - ) - - await task.submitUserMessage( - command.payload.text, - command.payload.images, - command.payload.mode, - command.payload.providerProfile, - ) - - break - - case TaskBridgeCommandName.ApproveAsk: - console.log( - `[TaskChannel] ${TaskBridgeCommandName.ApproveAsk} ${command.taskId} -> approveAsk()`, - command, - ) - - task.approveAsk(command.payload) - break - - case TaskBridgeCommandName.DenyAsk: - console.log(`[TaskChannel] ${TaskBridgeCommandName.DenyAsk} ${command.taskId} -> denyAsk()`, command) - task.denyAsk(command.payload) - break - } - } - - protected async handleConnect(socket: Socket): Promise { - // Rejoin all subscribed tasks. - for (const taskId of this.subscribedTasks.keys()) { - await this.publish(TaskSocketEvents.JOIN, { taskId }) - } - - // Subscribe to any pending tasks. - for (const task of this.pendingTasks.values()) { - await this.subscribeToTask(task, socket) - } - - this.pendingTasks.clear() - } - - protected async handleReconnect(_socket: Socket): Promise { - // Rejoin all subscribed tasks. - for (const taskId of this.subscribedTasks.keys()) { - await this.publish(TaskSocketEvents.JOIN, { taskId }) - } - } - - protected async handleCleanup(socket: Socket): Promise { - const unsubscribePromises = [] - - for (const taskId of this.subscribedTasks.keys()) { - unsubscribePromises.push(this.unsubscribeFromTask(taskId, socket)) - } - - await Promise.allSettled(unsubscribePromises) - this.subscribedTasks.clear() - this.taskListeners.clear() - this.pendingTasks.clear() - } - - /** - * Add a task to the pending queue (will be subscribed when connected). - */ - public addPendingTask(task: TaskLike): void { - this.pendingTasks.set(task.taskId, task) - } - - public async subscribeToTask(task: TaskLike, _socket: Socket): Promise { - const taskId = task.taskId - - await this.publish(TaskSocketEvents.JOIN, { taskId }, (response: JoinResponse) => { - if (response.success) { - console.log(`[TaskChannel#subscribeToTask] subscribed to ${taskId}`) - this.subscribedTasks.set(taskId, task) - this.setupTaskListeners(task) - } else { - console.error(`[TaskChannel#subscribeToTask] failed to subscribe to ${taskId}: ${response.error}`) - } - }) - } - - public async unsubscribeFromTask(taskId: string, _socket: Socket): Promise { - const task = this.subscribedTasks.get(taskId) - - if (!task) { - return - } - - await this.publish(TaskSocketEvents.LEAVE, { taskId }, (response: LeaveResponse) => { - if (response.success) { - console.log(`[TaskChannel#unsubscribeFromTask] unsubscribed from ${taskId}`) - } else { - console.error(`[TaskChannel#unsubscribeFromTask] failed to unsubscribe from ${taskId}`) - } - - // If we failed to unsubscribe then something is probably wrong and - // we should still discard this task from `subscribedTasks`. - this.removeTaskListeners(task) - this.subscribedTasks.delete(taskId) - }) - } - - private setupTaskListeners(task: TaskLike): void { - if (this.taskListeners.has(task.taskId)) { - console.warn(`[TaskChannel] Listeners already exist for task, removing old listeners for ${task.taskId}`) - this.removeTaskListeners(task) - } - - const listeners = new Map() - - this.eventMapping.forEach(({ from, to, createPayload }) => { - const listener = (...args: unknown[]) => { - const payload = createPayload(task, ...args) - this.publish(TaskSocketEvents.EVENT, payload) - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - task.on(from, listener as any) - listeners.set(to, listener) - }) - - this.taskListeners.set(task.taskId, listeners) - } - - private removeTaskListeners(task: TaskLike): void { - const listeners = this.taskListeners.get(task.taskId) - - if (!listeners) { - return - } - - this.eventMapping.forEach(({ from, to }) => { - const listener = listeners.get(to) - if (listener) { - try { - task.off(from, listener as any) // eslint-disable-line @typescript-eslint/no-explicit-any - } catch (error) { - console.error( - `[TaskChannel] task.off(${from}) failed for task ${task.taskId}: ${ - error instanceof Error ? error.message : String(error) - }`, - ) - } - } - }) - - this.taskListeners.delete(task.taskId) - } -} diff --git a/packages/cloud/src/bridge/__tests__/TaskChannel.test.ts b/packages/cloud/src/bridge/__tests__/TaskChannel.test.ts deleted file mode 100644 index f4f15266077b..000000000000 --- a/packages/cloud/src/bridge/__tests__/TaskChannel.test.ts +++ /dev/null @@ -1,407 +0,0 @@ -/* eslint-disable @typescript-eslint/no-unsafe-function-type */ -/* eslint-disable @typescript-eslint/no-explicit-any */ - -import type { Socket } from "socket.io-client" - -import { - type TaskLike, - type ClineMessage, - type StaticAppProperties, - RooCodeEventName, - TaskBridgeEventName, - TaskBridgeCommandName, - TaskSocketEvents, - TaskStatus, -} from "@roo-code/types" - -import { TaskChannel } from "../TaskChannel.js" - -describe("TaskChannel", () => { - let mockSocket: Socket - let taskChannel: TaskChannel - let mockTask: TaskLike - const instanceId = "test-instance-123" - const taskId = "test-task-456" - - const appProperties: StaticAppProperties = { - appName: "roo-code", - appVersion: "1.0.0", - vscodeVersion: "1.0.0", - platform: "darwin", - editorName: "Roo Code", - hostname: "test-host", - } - - beforeEach(() => { - // Create mock socket - mockSocket = { - emit: vi.fn(), - on: vi.fn(), - off: vi.fn(), - disconnect: vi.fn(), - } as unknown as Socket - - // Create mock task with event emitter functionality - const listeners = new Map unknown>>() - mockTask = { - taskId, - taskStatus: TaskStatus.Running, - taskAsk: undefined, - metadata: {}, - on: vi.fn((event: string, listener: (...args: unknown[]) => unknown) => { - if (!listeners.has(event)) { - listeners.set(event, new Set()) - } - listeners.get(event)!.add(listener) - return mockTask - }), - off: vi.fn((event: string, listener: (...args: unknown[]) => unknown) => { - const eventListeners = listeners.get(event) - if (eventListeners) { - eventListeners.delete(listener) - if (eventListeners.size === 0) { - listeners.delete(event) - } - } - return mockTask - }), - approveAsk: vi.fn(), - denyAsk: vi.fn(), - submitUserMessage: vi.fn(), - abortTask: vi.fn(), - // Helper to trigger events in tests - _triggerEvent: (event: string, ...args: any[]) => { - const eventListeners = listeners.get(event) - if (eventListeners) { - eventListeners.forEach((listener) => listener(...args)) - } - }, - _getListenerCount: (event: string) => { - return listeners.get(event)?.size || 0 - }, - } as unknown as TaskLike & { - _triggerEvent: (event: string, ...args: any[]) => void - _getListenerCount: (event: string) => number - } - - // Create task channel instance - taskChannel = new TaskChannel({ - instanceId, - appProperties, - isCloudAgent: false, - }) - }) - - afterEach(() => { - vi.clearAllMocks() - }) - - describe("Event Mapping Refactoring", () => { - it("should use the unified event mapping approach", () => { - // Access the private eventMapping through type assertion - const channel = taskChannel as any - - // Verify eventMapping exists and has the correct structure - expect(channel.eventMapping).toBeDefined() - expect(Array.isArray(channel.eventMapping)).toBe(true) - expect(channel.eventMapping.length).toBe(3) - - // Verify each mapping has the required properties - channel.eventMapping.forEach((mapping: any) => { - expect(mapping).toHaveProperty("from") - expect(mapping).toHaveProperty("to") - expect(mapping).toHaveProperty("createPayload") - expect(typeof mapping.createPayload).toBe("function") - }) - - // Verify specific mappings - expect(channel.eventMapping[0].from).toBe(RooCodeEventName.Message) - expect(channel.eventMapping[0].to).toBe(TaskBridgeEventName.Message) - - expect(channel.eventMapping[1].from).toBe(RooCodeEventName.TaskModeSwitched) - expect(channel.eventMapping[1].to).toBe(TaskBridgeEventName.TaskModeSwitched) - - expect(channel.eventMapping[2].from).toBe(RooCodeEventName.TaskInteractive) - expect(channel.eventMapping[2].to).toBe(TaskBridgeEventName.TaskInteractive) - }) - - it("should setup listeners using the event mapping", async () => { - // Mock the publish method to simulate successful subscription - const channel = taskChannel as any - channel.publish = vi.fn((event: string, data: any, callback?: Function) => { - if (event === TaskSocketEvents.JOIN && callback) { - // Simulate successful join response - callback({ success: true }) - } - return true - }) - - // Connect and subscribe to task - await taskChannel.onConnect(mockSocket) - await channel.subscribeToTask(mockTask, mockSocket) - - // Wait for async operations - await new Promise((resolve) => setTimeout(resolve, 0)) - - // Verify listeners were registered for all mapped events - const task = mockTask as any - expect(task._getListenerCount(RooCodeEventName.Message)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskModeSwitched)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskInteractive)).toBe(1) - }) - - it("should correctly transform Message event payloads", async () => { - // Setup channel with task - const channel = taskChannel as any - let publishCalls: any[] = [] - - channel.publish = vi.fn((event: string, data: any, callback?: Function) => { - publishCalls.push({ event, data }) - - if (event === TaskSocketEvents.JOIN && callback) { - callback({ success: true }) - } - - return true - }) - - await taskChannel.onConnect(mockSocket) - await channel.subscribeToTask(mockTask, mockSocket) - await new Promise((resolve) => setTimeout(resolve, 0)) - - // Clear previous calls - publishCalls = [] - - // Trigger Message event - const messageData = { - action: "test-action", - message: { type: "say", text: "Hello" } as ClineMessage, - } - - ;(mockTask as any)._triggerEvent(RooCodeEventName.Message, messageData) - - // Verify the event was published with correct payload - expect(publishCalls.length).toBe(1) - expect(publishCalls[0]).toEqual({ - event: TaskSocketEvents.EVENT, - data: { - type: TaskBridgeEventName.Message, - taskId: taskId, - action: messageData.action, - message: messageData.message, - }, - }) - }) - - it("should correctly transform TaskModeSwitched event payloads", async () => { - // Setup channel with task - const channel = taskChannel as any - let publishCalls: any[] = [] - - channel.publish = vi.fn((event: string, data: any, callback?: Function) => { - publishCalls.push({ event, data }) - - if (event === TaskSocketEvents.JOIN && callback) { - callback({ success: true }) - } - - return true - }) - - await taskChannel.onConnect(mockSocket) - await channel.subscribeToTask(mockTask, mockSocket) - await new Promise((resolve) => setTimeout(resolve, 0)) - - // Clear previous calls - publishCalls = [] - - // Trigger TaskModeSwitched event - const mode = "architect" - ;(mockTask as any)._triggerEvent(RooCodeEventName.TaskModeSwitched, mode) - - // Verify the event was published with correct payload - expect(publishCalls.length).toBe(1) - expect(publishCalls[0]).toEqual({ - event: TaskSocketEvents.EVENT, - data: { - type: TaskBridgeEventName.TaskModeSwitched, - taskId: taskId, - mode: mode, - }, - }) - }) - - it("should correctly transform TaskInteractive event payloads", async () => { - // Setup channel with task - const channel = taskChannel as any - let publishCalls: any[] = [] - - channel.publish = vi.fn((event: string, data: any, callback?: Function) => { - publishCalls.push({ event, data }) - if (event === TaskSocketEvents.JOIN && callback) { - callback({ success: true }) - } - return true - }) - - await taskChannel.onConnect(mockSocket) - await channel.subscribeToTask(mockTask, mockSocket) - await new Promise((resolve) => setTimeout(resolve, 0)) - - // Clear previous calls - publishCalls = [] - - // Trigger TaskInteractive event - ;(mockTask as any)._triggerEvent(RooCodeEventName.TaskInteractive, taskId) - - // Verify the event was published with correct payload - expect(publishCalls.length).toBe(1) - expect(publishCalls[0]).toEqual({ - event: TaskSocketEvents.EVENT, - data: { - type: TaskBridgeEventName.TaskInteractive, - taskId: taskId, - }, - }) - }) - - it("should properly clean up listeners using event mapping", async () => { - // Setup channel with task - const channel = taskChannel as any - - channel.publish = vi.fn((event: string, data: any, callback?: Function) => { - if (event === TaskSocketEvents.JOIN && callback) { - callback({ success: true }) - } - if (event === TaskSocketEvents.LEAVE && callback) { - callback({ success: true }) - } - return true - }) - - await taskChannel.onConnect(mockSocket) - await channel.subscribeToTask(mockTask, mockSocket) - await new Promise((resolve) => setTimeout(resolve, 0)) - - // Verify listeners are registered - const task = mockTask as any - expect(task._getListenerCount(RooCodeEventName.Message)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskModeSwitched)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskInteractive)).toBe(1) - - // Clean up - await taskChannel.cleanup(mockSocket) - - // Verify all listeners were removed - expect(task._getListenerCount(RooCodeEventName.Message)).toBe(0) - expect(task._getListenerCount(RooCodeEventName.TaskModeSwitched)).toBe(0) - expect(task._getListenerCount(RooCodeEventName.TaskInteractive)).toBe(0) - }) - - it("should handle duplicate listener prevention", async () => { - // Setup channel with task - await taskChannel.onConnect(mockSocket) - - // Subscribe to the same task twice - const channel = taskChannel as any - channel.subscribedTasks.set(taskId, mockTask) - channel.setupTaskListeners(mockTask) - - // Try to setup listeners again (should remove old ones first) - const warnSpy = vi.spyOn(console, "warn") - channel.setupTaskListeners(mockTask) - - // Verify warning was logged - expect(warnSpy).toHaveBeenCalledWith( - `[TaskChannel] Listeners already exist for task, removing old listeners for ${taskId}`, - ) - - // Verify only one set of listeners exists - const task = mockTask as any - expect(task._getListenerCount(RooCodeEventName.Message)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskModeSwitched)).toBe(1) - expect(task._getListenerCount(RooCodeEventName.TaskInteractive)).toBe(1) - - warnSpy.mockRestore() - }) - }) - - describe("Command Handling", () => { - beforeEach(async () => { - // Setup channel with a subscribed task - await taskChannel.onConnect(mockSocket) - const channel = taskChannel as any - channel.subscribedTasks.set(taskId, mockTask) - }) - - it("should handle Message command", async () => { - const command = { - type: TaskBridgeCommandName.Message, - taskId, - timestamp: Date.now(), - payload: { - text: "Hello, world!", - images: ["image1.png"], - }, - } - - await taskChannel.handleCommand(command) - - expect(mockTask.submitUserMessage).toHaveBeenCalledWith( - command.payload.text, - command.payload.images, - undefined, - undefined, - ) - }) - - it("should handle ApproveAsk command", async () => { - const command = { - type: TaskBridgeCommandName.ApproveAsk, - taskId, - timestamp: Date.now(), - payload: { - text: "Approved", - }, - } - - await taskChannel.handleCommand(command) - - expect(mockTask.approveAsk).toHaveBeenCalledWith(command.payload) - }) - - it("should handle DenyAsk command", async () => { - const command = { - type: TaskBridgeCommandName.DenyAsk, - taskId, - timestamp: Date.now(), - payload: { - text: "Denied", - }, - } - - await taskChannel.handleCommand(command) - - expect(mockTask.denyAsk).toHaveBeenCalledWith(command.payload) - }) - - it("should log error for unknown task", async () => { - const errorSpy = vi.spyOn(console, "error") - - const command = { - type: TaskBridgeCommandName.Message, - taskId: "unknown-task", - timestamp: Date.now(), - payload: { - text: "Hello", - }, - } - - await taskChannel.handleCommand(command) - - expect(errorSpy).toHaveBeenCalledWith(`[TaskChannel] Unable to find task unknown-task`) - - errorSpy.mockRestore() - }) - }) -}) diff --git a/packages/cloud/src/bridge/index.ts b/packages/cloud/src/bridge/index.ts index 94873c09fdfa..dc213ed652b0 100644 --- a/packages/cloud/src/bridge/index.ts +++ b/packages/cloud/src/bridge/index.ts @@ -3,4 +3,3 @@ export { type SocketTransportOptions, SocketTransport } from "./SocketTransport. export { BaseChannel } from "./BaseChannel.js" export { ExtensionChannel } from "./ExtensionChannel.js" -export { TaskChannel } from "./TaskChannel.js" diff --git a/packages/types/src/task.ts b/packages/types/src/task.ts index d3db937c6615..ce9f9bb99562 100644 --- a/packages/types/src/task.ts +++ b/packages/types/src/task.ts @@ -58,7 +58,10 @@ export interface TaskProviderLike { } export type TaskProviderEvents = { + // Task Provider Lifecycle (1) [RooCodeEventName.TaskCreated]: [task: TaskLike] + + // Task Lifecycle (9) [RooCodeEventName.TaskStarted]: [taskId: string] [RooCodeEventName.TaskCompleted]: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] [RooCodeEventName.TaskAborted]: [taskId: string] @@ -69,14 +72,22 @@ export type TaskProviderEvents = { [RooCodeEventName.TaskResumable]: [taskId: string] [RooCodeEventName.TaskIdle]: [taskId: string] + // Subtask Lifecycle (3) [RooCodeEventName.TaskPaused]: [taskId: string] [RooCodeEventName.TaskUnpaused]: [taskId: string] [RooCodeEventName.TaskSpawned]: [taskId: string] + // Task Execution (4) + [RooCodeEventName.Message]: [taskId: string, action: "created" | "updated", message: ClineMessage] + [RooCodeEventName.TaskModeSwitched]: [taskId: string, mode: string] + [RooCodeEventName.TaskAskResponded]: [taskId: string] [RooCodeEventName.TaskUserMessage]: [taskId: string] + // Task Analytics (2) [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage] + [RooCodeEventName.TaskToolFailed]: [taskId: string, tool: ToolName, error: string] + // Configuration Changes (2) [RooCodeEventName.ModeChanged]: [mode: string] [RooCodeEventName.ProviderProfileChanged]: [config: { name: string; provider?: string }] } @@ -130,7 +141,7 @@ export interface TaskLike { } export type TaskEvents = { - // Task Lifecycle + // Task Lifecycle (9) [RooCodeEventName.TaskStarted]: [] [RooCodeEventName.TaskCompleted]: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] [RooCodeEventName.TaskAborted]: [] @@ -141,18 +152,18 @@ export type TaskEvents = { [RooCodeEventName.TaskResumable]: [taskId: string] [RooCodeEventName.TaskIdle]: [taskId: string] - // Subtask Lifecycle + // Subtask Lifecycle (3) [RooCodeEventName.TaskPaused]: [taskId: string] [RooCodeEventName.TaskUnpaused]: [taskId: string] [RooCodeEventName.TaskSpawned]: [taskId: string] - // Task Execution + // Task Execution (4) [RooCodeEventName.Message]: [{ action: "created" | "updated"; message: ClineMessage }] [RooCodeEventName.TaskModeSwitched]: [taskId: string, mode: string] [RooCodeEventName.TaskAskResponded]: [] [RooCodeEventName.TaskUserMessage]: [taskId: string] - // Task Analytics + // Task Analytics (2) [RooCodeEventName.TaskToolFailed]: [taskId: string, tool: ToolName, error: string] [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage] } diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 8beaf1235e8c..8b525a74c3c6 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -37,7 +37,7 @@ import { QueuedMessage, } from "@roo-code/types" import { TelemetryService } from "@roo-code/telemetry" -import { CloudService, BridgeOrchestrator } from "@roo-code/cloud" +import { CloudService } from "@roo-code/cloud" // api import { ApiHandler, ApiHandlerCreateMessageMetadata, buildApiHandler } from "../../api" @@ -125,7 +125,6 @@ export interface TaskOptions extends CreateTaskOptions { apiConfiguration: ProviderSettings enableDiff?: boolean enableCheckpoints?: boolean - enableBridge?: boolean fuzzyMatchThreshold?: number consecutiveMistakeLimit?: number task?: string @@ -270,9 +269,6 @@ export class Task extends EventEmitter implements TaskLike { checkpointService?: RepoPerTaskCheckpointService checkpointServiceInitializing = false - // Task Bridge - enableBridge: boolean - // Message Queue Service public readonly messageQueueService: MessageQueueService private messageQueueStateChangedHandler: (() => void) | undefined @@ -303,7 +299,6 @@ export class Task extends EventEmitter implements TaskLike { apiConfiguration, enableDiff = false, enableCheckpoints = true, - enableBridge = false, fuzzyMatchThreshold = 1.0, consecutiveMistakeLimit = DEFAULT_CONSECUTIVE_MISTAKE_LIMIT, task, @@ -362,7 +357,6 @@ export class Task extends EventEmitter implements TaskLike { this.globalStoragePath = provider.context.globalStorageUri.fsPath this.diffViewProvider = new DiffViewProvider(this.cwd, this) this.enableCheckpoints = enableCheckpoints - this.enableBridge = enableBridge this.parentTask = parentTask this.taskNumber = taskNumber @@ -1188,16 +1182,6 @@ export class Task extends EventEmitter implements TaskLike { // Start / Resume / Abort / Dispose private async startTask(task?: string, images?: string[]): Promise { - if (this.enableBridge) { - try { - await BridgeOrchestrator.subscribeToTask(this) - } catch (error) { - console.error( - `[Task#startTask] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`, - ) - } - } - // `conversationHistory` (for API) and `clineMessages` (for webview) // need to be in sync. // If the extension process were killed, then on restart the @@ -1215,46 +1199,18 @@ export class Task extends EventEmitter implements TaskLike { await this.say("text", task, images) this.isInitialized = true - let imageBlocks: Anthropic.ImageBlockParam[] = formatResponse.imageBlocks(images) - - // Task starting - await this.initiateTaskLoop([ { type: "text", text: `\n${task}\n`, }, - ...imageBlocks, + ...formatResponse.imageBlocks(images), ]) } private async resumeTaskFromHistory() { - if (this.enableBridge) { - try { - await BridgeOrchestrator.subscribeToTask(this) - } catch (error) { - console.error( - `[Task#resumeTaskFromHistory] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}`, - ) - } - } - const modifiedClineMessages = await this.getSavedClineMessages() - // Check for any stored GPT-5 response IDs in the message history. - const gpt5Messages = modifiedClineMessages.filter( - (m): m is ClineMessage & ClineMessageWithMetadata => - m.type === "say" && - m.say === "text" && - !!(m as ClineMessageWithMetadata).metadata?.gpt5?.previous_response_id, - ) - - if (gpt5Messages.length > 0) { - const lastGpt5Message = gpt5Messages[gpt5Messages.length - 1] - // The lastGpt5Message contains the previous_response_id that can be - // used for continuity. - } - // Remove any resume messages that may have been added before. const lastRelevantMessageIndex = findLastIndex( modifiedClineMessages, @@ -1268,6 +1224,7 @@ export class Task extends EventEmitter implements TaskLike { // Remove any trailing reasoning-only UI messages that were not part of the persisted API conversation while (modifiedClineMessages.length > 0) { const last = modifiedClineMessages[modifiedClineMessages.length - 1] + if (last.type === "say" && last.say === "reasoning") { modifiedClineMessages.pop() } else { @@ -1552,16 +1509,6 @@ export class Task extends EventEmitter implements TaskLike { this.pauseInterval = undefined } - if (this.enableBridge) { - BridgeOrchestrator.getInstance() - ?.unsubscribeFromTask(this.taskId) - .catch((error) => - console.error( - `[Task#dispose] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}`, - ), - ) - } - // Release any terminals associated with this task. try { // Release any terminals associated with this task. diff --git a/src/core/webview/ClineProvider.ts b/src/core/webview/ClineProvider.ts index 91b868796689..d34b7d02fb99 100644 --- a/src/core/webview/ClineProvider.ts +++ b/src/core/webview/ClineProvider.ts @@ -12,6 +12,7 @@ import * as vscode from "vscode" import { type TaskProviderLike, type TaskProviderEvents, + type TaskEvents, type GlobalState, type ProviderName, type ProviderSettings, @@ -33,6 +34,7 @@ import { type CloudOrganizationMembership, type CreateTaskOptions, type TokenUsage, + type ToolName, RooCodeEventName, requestyDefaultModelId, openRouterDefaultModelId, @@ -190,34 +192,39 @@ export class ClineProvider this.marketplaceManager = new MarketplaceManager(this.context, this.customModesManager) - // Forward task events to the provider. + // Forward task events to the provider. // We do something fairly similar for the IPC-based API. - this.taskCreationCallback = (instance: Task) => { - this.emit(RooCodeEventName.TaskCreated, instance) + // We create named listener functions so we can remove them later. + this.taskCreationCallback = (task: Task) => { + // Task Provider Lifecycle (1) + this.emit(RooCodeEventName.TaskCreated, task) - // Create named listener functions so we can remove them later. - const onTaskStarted = () => this.emit(RooCodeEventName.TaskStarted, instance.taskId) + // Task Lifecycle (9) + const onTaskStarted = () => this.emit(RooCodeEventName.TaskStarted, task.taskId) const onTaskCompleted = (taskId: string, tokenUsage: any, toolUsage: any) => this.emit(RooCodeEventName.TaskCompleted, taskId, tokenUsage, toolUsage) const onTaskAborted = async () => { - this.emit(RooCodeEventName.TaskAborted, instance.taskId) + this.emit(RooCodeEventName.TaskAborted, task.taskId) try { // Only rehydrate on genuine streaming failures. // User-initiated cancels are handled by cancelTask(). - if (instance.abortReason === "streaming_failed") { - // Defensive safeguard: if another path already replaced this instance, skip + if (task.abortReason === "streaming_failed") { + // Defensive safeguard: if another path already replaced + // this instance, skip. const current = this.getCurrentTask() - if (current && current.instanceId !== instance.instanceId) { + + if (current && current.instanceId !== task.instanceId) { this.log( - `[onTaskAborted] Skipping rehydrate: current instance ${current.instanceId} != aborted ${instance.instanceId}`, + `[onTaskAborted] Skipping rehydrate: current instance ${current.instanceId} != aborted ${task.instanceId}`, ) + return } - const { historyItem } = await this.getTaskWithId(instance.taskId) - const rootTask = instance.rootTask - const parentTask = instance.parentTask + const { historyItem } = await this.getTaskWithId(task.taskId) + const rootTask = task.rootTask + const parentTask = task.parentTask await this.createTaskWithHistoryItem({ ...historyItem, rootTask, parentTask }) } } catch (error) { @@ -228,51 +235,96 @@ export class ClineProvider ) } } - const onTaskFocused = () => this.emit(RooCodeEventName.TaskFocused, instance.taskId) - const onTaskUnfocused = () => this.emit(RooCodeEventName.TaskUnfocused, instance.taskId) + const onTaskFocused = () => this.emit(RooCodeEventName.TaskFocused, task.taskId) + const onTaskUnfocused = () => this.emit(RooCodeEventName.TaskUnfocused, task.taskId) const onTaskActive = (taskId: string) => this.emit(RooCodeEventName.TaskActive, taskId) const onTaskInteractive = (taskId: string) => this.emit(RooCodeEventName.TaskInteractive, taskId) const onTaskResumable = (taskId: string) => this.emit(RooCodeEventName.TaskResumable, taskId) const onTaskIdle = (taskId: string) => this.emit(RooCodeEventName.TaskIdle, taskId) + + // Subtask Lifecycle (3) const onTaskPaused = (taskId: string) => this.emit(RooCodeEventName.TaskPaused, taskId) const onTaskUnpaused = (taskId: string) => this.emit(RooCodeEventName.TaskUnpaused, taskId) const onTaskSpawned = (taskId: string) => this.emit(RooCodeEventName.TaskSpawned, taskId) + + // Task Execution (4) + const onMessage = ({ action, message }: { action: "created" | "updated"; message: ClineMessage }) => + this.emit(RooCodeEventName.Message, task.taskId, action, message) + const onTaskModeSwitched = (taskId: string, mode: string) => + this.emit(RooCodeEventName.TaskModeSwitched, taskId, mode) + const onTaskAskResponded = () => this.emit(RooCodeEventName.TaskAskResponded, task.taskId) const onTaskUserMessage = (taskId: string) => this.emit(RooCodeEventName.TaskUserMessage, taskId) + + // Task Analytics (2) const onTaskTokenUsageUpdated = (taskId: string, tokenUsage: TokenUsage) => this.emit(RooCodeEventName.TaskTokenUsageUpdated, taskId, tokenUsage) + const onTaskToolFailed = (taskId: string, tool: ToolName, error: string) => + this.emit(RooCodeEventName.TaskToolFailed, taskId, tool, error) + + // Configuration Changes (2) + // RooCodeEventName.ModeChanged + // RooCodeEventName.ProviderProfileChanged + // These don't need to be relayed since they are natively emitted + // by the provider. // Attach the listeners. - instance.on(RooCodeEventName.TaskStarted, onTaskStarted) - instance.on(RooCodeEventName.TaskCompleted, onTaskCompleted) - instance.on(RooCodeEventName.TaskAborted, onTaskAborted) - instance.on(RooCodeEventName.TaskFocused, onTaskFocused) - instance.on(RooCodeEventName.TaskUnfocused, onTaskUnfocused) - instance.on(RooCodeEventName.TaskActive, onTaskActive) - instance.on(RooCodeEventName.TaskInteractive, onTaskInteractive) - instance.on(RooCodeEventName.TaskResumable, onTaskResumable) - instance.on(RooCodeEventName.TaskIdle, onTaskIdle) - instance.on(RooCodeEventName.TaskPaused, onTaskPaused) - instance.on(RooCodeEventName.TaskUnpaused, onTaskUnpaused) - instance.on(RooCodeEventName.TaskSpawned, onTaskSpawned) - instance.on(RooCodeEventName.TaskUserMessage, onTaskUserMessage) - instance.on(RooCodeEventName.TaskTokenUsageUpdated, onTaskTokenUsageUpdated) + + // Task Lifecycle (9) + task.on(RooCodeEventName.TaskStarted, onTaskStarted) + task.on(RooCodeEventName.TaskCompleted, onTaskCompleted) + task.on(RooCodeEventName.TaskAborted, onTaskAborted) + task.on(RooCodeEventName.TaskFocused, onTaskFocused) + task.on(RooCodeEventName.TaskUnfocused, onTaskUnfocused) + task.on(RooCodeEventName.TaskActive, onTaskActive) + task.on(RooCodeEventName.TaskInteractive, onTaskInteractive) + task.on(RooCodeEventName.TaskResumable, onTaskResumable) + task.on(RooCodeEventName.TaskIdle, onTaskIdle) + + // Subtask Lifecycle (3) + task.on(RooCodeEventName.TaskPaused, onTaskPaused) + task.on(RooCodeEventName.TaskUnpaused, onTaskUnpaused) + task.on(RooCodeEventName.TaskSpawned, onTaskSpawned) + + // Task Execution (4) + task.on(RooCodeEventName.Message, onMessage) + task.on(RooCodeEventName.TaskModeSwitched, onTaskModeSwitched) + task.on(RooCodeEventName.TaskAskResponded, onTaskAskResponded) + task.on(RooCodeEventName.TaskUserMessage, onTaskUserMessage) + + // Task Analytics (2) + task.on(RooCodeEventName.TaskTokenUsageUpdated, onTaskTokenUsageUpdated) + task.on(RooCodeEventName.TaskToolFailed, onTaskToolFailed) // Store the cleanup functions for later removal. - this.taskEventListeners.set(instance, [ - () => instance.off(RooCodeEventName.TaskStarted, onTaskStarted), - () => instance.off(RooCodeEventName.TaskCompleted, onTaskCompleted), - () => instance.off(RooCodeEventName.TaskAborted, onTaskAborted), - () => instance.off(RooCodeEventName.TaskFocused, onTaskFocused), - () => instance.off(RooCodeEventName.TaskUnfocused, onTaskUnfocused), - () => instance.off(RooCodeEventName.TaskActive, onTaskActive), - () => instance.off(RooCodeEventName.TaskInteractive, onTaskInteractive), - () => instance.off(RooCodeEventName.TaskResumable, onTaskResumable), - () => instance.off(RooCodeEventName.TaskIdle, onTaskIdle), - () => instance.off(RooCodeEventName.TaskUserMessage, onTaskUserMessage), - () => instance.off(RooCodeEventName.TaskPaused, onTaskPaused), - () => instance.off(RooCodeEventName.TaskUnpaused, onTaskUnpaused), - () => instance.off(RooCodeEventName.TaskSpawned, onTaskSpawned), - () => instance.off(RooCodeEventName.TaskTokenUsageUpdated, onTaskTokenUsageUpdated), + this.taskEventListeners.set(task, [ + // Task Provider Lifecycle (1) + () => task.off(RooCodeEventName.TaskStarted, onTaskStarted), + + // Task Lifecycle (9) + () => task.off(RooCodeEventName.TaskCompleted, onTaskCompleted), + () => task.off(RooCodeEventName.TaskAborted, onTaskAborted), + () => task.off(RooCodeEventName.TaskFocused, onTaskFocused), + () => task.off(RooCodeEventName.TaskUnfocused, onTaskUnfocused), + () => task.off(RooCodeEventName.TaskActive, onTaskActive), + () => task.off(RooCodeEventName.TaskInteractive, onTaskInteractive), + () => task.off(RooCodeEventName.TaskResumable, onTaskResumable), + () => task.off(RooCodeEventName.TaskIdle, onTaskIdle), + () => task.off(RooCodeEventName.TaskUserMessage, onTaskUserMessage), + + // Subtask Lifecycle (3) + () => task.off(RooCodeEventName.TaskPaused, onTaskPaused), + () => task.off(RooCodeEventName.TaskUnpaused, onTaskUnpaused), + () => task.off(RooCodeEventName.TaskSpawned, onTaskSpawned), + + // Task Execution (4) + () => task.off(RooCodeEventName.Message, onMessage), + () => task.off(RooCodeEventName.TaskModeSwitched, onTaskModeSwitched), + () => task.off(RooCodeEventName.TaskAskResponded, onTaskAskResponded), + () => task.off(RooCodeEventName.TaskUserMessage, onTaskUserMessage), + + // Task Analytics (2) + () => task.off(RooCodeEventName.TaskTokenUsageUpdated, onTaskTokenUsageUpdated), + () => task.off(RooCodeEventName.TaskToolFailed, onTaskToolFailed), ]) } @@ -915,7 +967,6 @@ export class ClineProvider taskNumber: historyItem.number, workspacePath: historyItem.workspace, onCreated: this.taskCreationCallback, - enableBridge: BridgeOrchestrator.isEnabled(cloudUserInfo, taskSyncEnabled), }) await this.addClineToStack(task) @@ -2345,35 +2396,6 @@ export class ClineProvider sessionId: vscode.env.sessionId, isCloudAgent: CloudService.instance.isCloudAgent, }) - - const bridge = BridgeOrchestrator.getInstance() - - if (bridge) { - const currentTask = this.getCurrentTask() - - if (currentTask && !currentTask.enableBridge) { - try { - currentTask.enableBridge = true - await BridgeOrchestrator.subscribeToTask(currentTask) - } catch (error) { - const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator.subscribeToTask() failed: ${error instanceof Error ? error.message : String(error)}` - this.log(message) - console.error(message) - } - } - } else { - for (const task of this.clineStack) { - if (task.enableBridge) { - try { - await BridgeOrchestrator.getInstance()?.unsubscribeFromTask(task.taskId) - } catch (error) { - const message = `[ClineProvider#remoteControlEnabled] BridgeOrchestrator#unsubscribeFromTask() failed: ${error instanceof Error ? error.message : String(error)}` - this.log(message) - console.error(message) - } - } - } - } } /** @@ -2560,7 +2582,6 @@ export class ClineProvider parentTask, taskNumber: this.clineStack.length + 1, onCreated: this.taskCreationCallback, - enableBridge: BridgeOrchestrator.isEnabled(cloudUserInfo, remoteControlEnabled), initialTodos: options.initialTodos, ...options, }) From 47db4d3f66940e251b69c8c1600720393c0ac0ee Mon Sep 17 00:00:00 2001 From: cte Date: Mon, 20 Oct 2025 16:32:26 -0700 Subject: [PATCH 2/4] More progress --- packages/cloud/src/bridge/BaseChannel.ts | 142 ------------ .../cloud/src/bridge/BridgeOrchestrator.ts | 2 - packages/cloud/src/bridge/ExtensionChannel.ts | 173 +++++++++------ packages/cloud/src/bridge/index.ts | 1 - packages/types/src/cloud.ts | 206 ++++++------------ packages/types/src/events.ts | 12 +- 6 files changed, 172 insertions(+), 364 deletions(-) delete mode 100644 packages/cloud/src/bridge/BaseChannel.ts diff --git a/packages/cloud/src/bridge/BaseChannel.ts b/packages/cloud/src/bridge/BaseChannel.ts deleted file mode 100644 index 1b2615b24c35..000000000000 --- a/packages/cloud/src/bridge/BaseChannel.ts +++ /dev/null @@ -1,142 +0,0 @@ -import type { Socket } from "socket.io-client" -import * as vscode from "vscode" - -import type { StaticAppProperties, GitProperties } from "@roo-code/types" - -export interface BaseChannelOptions { - instanceId: string - appProperties: StaticAppProperties - gitProperties?: GitProperties - isCloudAgent: boolean -} - -/** - * Abstract base class for communication channels in the bridge system. - * Provides common functionality for bidirectional communication between - * the VSCode extension and web application. - * - * @template TCommand - Type of commands this channel can receive. - * @template TEvent - Type of events this channel can publish. - */ -export abstract class BaseChannel { - protected socket: Socket | null = null - protected readonly instanceId: string - protected readonly appProperties: StaticAppProperties - protected readonly gitProperties?: GitProperties - protected readonly isCloudAgent: boolean - - constructor(options: BaseChannelOptions) { - this.instanceId = options.instanceId - this.appProperties = options.appProperties - this.gitProperties = options.gitProperties - this.isCloudAgent = options.isCloudAgent - } - - /** - * Called when socket connects. - */ - public async onConnect(socket: Socket): Promise { - this.socket = socket - await this.handleConnect(socket) - } - - /** - * Called when socket disconnects. - */ - public onDisconnect(): void { - this.socket = null - this.handleDisconnect() - } - - /** - * Called when socket reconnects. - */ - public async onReconnect(socket: Socket): Promise { - this.socket = socket - await this.handleReconnect(socket) - } - - /** - * Cleanup resources. - */ - public async cleanup(socket: Socket | null): Promise { - if (socket) { - await this.handleCleanup(socket) - } - - this.socket = null - } - - /** - * Emit a socket event with error handling. - */ - protected publish( - eventName: TEventName, - data: TEventData, - callback?: (params: Params) => void, - ): boolean { - if (!this.socket) { - console.error(`[${this.constructor.name}#emit] socket not available for ${eventName}`) - return false - } - - try { - // console.log(`[${this.constructor.name}#emit] emit() -> ${eventName}`, data) - this.socket.emit(eventName, data, callback) - - return true - } catch (error) { - console.error( - `[${this.constructor.name}#emit] emit() failed -> ${eventName}: ${ - error instanceof Error ? error.message : String(error) - }`, - ) - - return false - } - } - - /** - * Handle incoming commands - template method that ensures common functionality - * is executed before subclass-specific logic. - * - * This method should be called by subclasses to handle commands. - * It will execute common functionality and then delegate to the abstract - * handleCommandImplementation method. - */ - public async handleCommand(command: TCommand): Promise { - // Common functionality: focus the sidebar. - await vscode.commands.executeCommand(`${this.appProperties.appName}.SidebarProvider.focus`) - - // Delegate to subclass-specific implementation. - await this.handleCommandImplementation(command) - } - - /** - * Handle command-specific logic - must be implemented by subclasses. - * This method is called after common functionality has been executed. - */ - protected abstract handleCommandImplementation(command: TCommand): Promise - - /** - * Handle connection-specific logic. - */ - protected abstract handleConnect(socket: Socket): Promise - - /** - * Handle disconnection-specific logic. - */ - protected handleDisconnect(): void { - // Default implementation - can be overridden. - } - - /** - * Handle reconnection-specific logic. - */ - protected abstract handleReconnect(socket: Socket): Promise - - /** - * Handle cleanup-specific logic. - */ - protected abstract handleCleanup(socket: Socket): Promise -} diff --git a/packages/cloud/src/bridge/BridgeOrchestrator.ts b/packages/cloud/src/bridge/BridgeOrchestrator.ts index 30a110abe88b..52ef7fa6be44 100644 --- a/packages/cloud/src/bridge/BridgeOrchestrator.ts +++ b/packages/cloud/src/bridge/BridgeOrchestrator.ts @@ -9,7 +9,6 @@ import { type GitProperties, ConnectionState, ExtensionSocketEvents, - TaskSocketEvents, } from "@roo-code/types" import { SocketTransport } from "./SocketTransport.js" @@ -196,7 +195,6 @@ export class BridgeOrchestrator { // Remove any existing listeners first to prevent duplicates. socket.off(ExtensionSocketEvents.RELAYED_COMMAND) - socket.off(TaskSocketEvents.RELAYED_COMMAND) socket.off("connected") socket.on(ExtensionSocketEvents.RELAYED_COMMAND, (message: ExtensionBridgeCommand) => { diff --git a/packages/cloud/src/bridge/ExtensionChannel.ts b/packages/cloud/src/bridge/ExtensionChannel.ts index 3fa8b6589560..1a46e3419901 100644 --- a/packages/cloud/src/bridge/ExtensionChannel.ts +++ b/packages/cloud/src/bridge/ExtensionChannel.ts @@ -1,11 +1,14 @@ +import * as vscode from "vscode" import type { Socket } from "socket.io-client" import { type TaskProviderLike, type TaskProviderEvents, type ExtensionInstance, - type ExtensionBridgeCommand, type ExtensionBridgeEvent, + type ExtensionBridgeCommand, + type StaticAppProperties, + type GitProperties, RooCodeEventName, TaskStatus, ExtensionBridgeCommandName, @@ -14,22 +17,22 @@ import { HEARTBEAT_INTERVAL_MS, } from "@roo-code/types" -import { type BaseChannelOptions, BaseChannel } from "./BaseChannel.js" - -interface ExtensionChannelOptions extends BaseChannelOptions { +interface ExtensionChannelOptions { + instanceId: string + appProperties: StaticAppProperties + gitProperties?: GitProperties + isCloudAgent: boolean userId: string provider: TaskProviderLike } -/** - * Manages the extension-level communication channel. - * Handles extension registration, heartbeat, and extension-specific commands. - */ -export class ExtensionChannel extends BaseChannel< - ExtensionBridgeCommand, - ExtensionSocketEvents, - ExtensionBridgeEvent | ExtensionInstance -> { +export class ExtensionChannel { + private socket: Socket | null = null + private readonly instanceId: string + private readonly appProperties: StaticAppProperties + private readonly gitProperties?: GitProperties + private readonly isCloudAgent: boolean + private userId: string private provider: TaskProviderLike private extensionInstance: ExtensionInstance @@ -37,12 +40,10 @@ export class ExtensionChannel extends BaseChannel< private eventListeners: Map void> = new Map() constructor(options: ExtensionChannelOptions) { - super({ - instanceId: options.instanceId, - appProperties: options.appProperties, - gitProperties: options.gitProperties, - isCloudAgent: options.isCloudAgent, - }) + this.instanceId = options.instanceId + this.appProperties = options.appProperties + this.gitProperties = options.gitProperties + this.isCloudAgent = options.isCloudAgent this.userId = options.userId this.provider = options.provider @@ -62,7 +63,26 @@ export class ExtensionChannel extends BaseChannel< this.setupListeners() } - protected async handleCommandImplementation(command: ExtensionBridgeCommand): Promise { + public async onConnect(socket: Socket): Promise { + this.socket = socket + await this.registerInstance(socket) + this.startHeartbeat(socket) + } + + public async onReconnect(socket: Socket): Promise { + this.socket = socket + await this.registerInstance(socket) + this.startHeartbeat(socket) + } + + public onDisconnect(): void { + this.socket = null + this.stopHeartbeat() + } + + public async handleCommand(command: ExtensionBridgeCommand): Promise { + await vscode.commands.executeCommand(`${this.appProperties.appName}.SidebarProvider.focus`) + if (command.instanceId !== this.instanceId) { console.log(`[ExtensionChannel] command -> instance id mismatch | ${this.instanceId}`, { messageInstanceId: command.instanceId, @@ -117,24 +137,40 @@ export class ExtensionChannel extends BaseChannel< } } - protected async handleConnect(socket: Socket): Promise { - await this.registerInstance(socket) - this.startHeartbeat(socket) - } + public async cleanup(socket: Socket | null): Promise { + this.stopHeartbeat() + this.cleanupListeners() - protected async handleReconnect(socket: Socket): Promise { - await this.registerInstance(socket) - this.startHeartbeat(socket) + if (socket) { + await this.unregisterInstance(socket) + this.socket = null + } } - protected override handleDisconnect(): void { - this.stopHeartbeat() - } + private publish( + eventName: ExtensionSocketEvents, + data: ExtensionBridgeEvent | ExtensionInstance, + callback?: (params: Params) => void, + ): boolean { + if (!this.socket) { + console.error(`[${this.constructor.name}#emit] socket not available for ${eventName}`) + return false + } - protected async handleCleanup(socket: Socket): Promise { - this.stopHeartbeat() - this.cleanupListeners() - await this.unregisterInstance(socket) + try { + // console.log(`[${this.constructor.name}#emit] emit() -> ${eventName}`, data) + this.socket.emit(eventName, data, callback) + + return true + } catch (error) { + console.error( + `[${this.constructor.name}#emit] emit() failed -> ${eventName}: ${ + error instanceof Error ? error.message : String(error) + }`, + ) + + return false + } } private async registerInstance(_socket: Socket): Promise { @@ -174,38 +210,11 @@ export class ExtensionChannel extends BaseChannel< } private setupListeners(): void { - // private readonly eventMapping: readonly TaskEventMapping[] = [ - // { - // from: RooCodeEventName.Message, - // to: TaskBridgeEventName.Message, - // createPayload: (task: TaskLike, data: { action: string; message: ClineMessage }) => ({ - // type: TaskBridgeEventName.Message, - // taskId: task.taskId, - // action: data.action, - // message: data.message, - // }), - // }, - // { - // from: RooCodeEventName.TaskModeSwitched, - // to: TaskBridgeEventName.TaskModeSwitched, - // createPayload: (task: TaskLike, mode: string) => ({ - // type: TaskBridgeEventName.TaskModeSwitched, - // taskId: task.taskId, - // mode, - // }), - // }, - // { - // from: RooCodeEventName.TaskInteractive, - // to: TaskBridgeEventName.TaskInteractive, - // createPayload: (task: TaskLike, _taskId: string) => ({ - // type: TaskBridgeEventName.TaskInteractive, - // taskId: task.taskId, - // }), - // }, - // ] as const - const eventMapping = [ + // Task Provider Lifecycle (1) { from: RooCodeEventName.TaskCreated, to: ExtensionBridgeEventName.TaskCreated }, + + // Task Lifecycle (9) { from: RooCodeEventName.TaskStarted, to: ExtensionBridgeEventName.TaskStarted }, { from: RooCodeEventName.TaskCompleted, to: ExtensionBridgeEventName.TaskCompleted }, { from: RooCodeEventName.TaskAborted, to: ExtensionBridgeEventName.TaskAborted }, @@ -215,21 +224,47 @@ export class ExtensionChannel extends BaseChannel< { from: RooCodeEventName.TaskInteractive, to: ExtensionBridgeEventName.TaskInteractive }, { from: RooCodeEventName.TaskResumable, to: ExtensionBridgeEventName.TaskResumable }, { from: RooCodeEventName.TaskIdle, to: ExtensionBridgeEventName.TaskIdle }, + + // Subtask Lifecycle (3) { from: RooCodeEventName.TaskPaused, to: ExtensionBridgeEventName.TaskPaused }, { from: RooCodeEventName.TaskUnpaused, to: ExtensionBridgeEventName.TaskUnpaused }, { from: RooCodeEventName.TaskSpawned, to: ExtensionBridgeEventName.TaskSpawned }, + + // Task Execution (4) + { from: RooCodeEventName.Message, to: ExtensionBridgeEventName.Message }, + { from: RooCodeEventName.TaskModeSwitched, to: ExtensionBridgeEventName.TaskModeSwitched }, + { from: RooCodeEventName.TaskAskResponded, to: ExtensionBridgeEventName.TaskAskResponded }, { from: RooCodeEventName.TaskUserMessage, to: ExtensionBridgeEventName.TaskUserMessage }, + + // Task Analytics (2) { from: RooCodeEventName.TaskTokenUsageUpdated, to: ExtensionBridgeEventName.TaskTokenUsageUpdated }, + { from: RooCodeEventName.TaskToolFailed, to: ExtensionBridgeEventName.TaskToolFailed }, + + // Configuration Changes (2) + // (Not propagated currently.) ] as const eventMapping.forEach(({ from, to }) => { // Create and store the listener function for cleanup. const listener = async (..._args: unknown[]) => { - this.publish(ExtensionSocketEvents.EVENT, { - type: to, - instance: await this.updateInstance(), - timestamp: Date.now(), - }) + if (from === RooCodeEventName.Message) { + const [taskId, action, message] = _args as TaskProviderEvents[RooCodeEventName.Message] + + this.publish(ExtensionSocketEvents.EVENT, { + type: to, + instance: await this.updateInstance(), + timestamp: Date.now(), + taskId, + action, + message, + }) + } else { + this.publish(ExtensionSocketEvents.EVENT, { + type: to, + instance: await this.updateInstance(), + timestamp: Date.now(), + }) + } } this.eventListeners.set(from, listener) diff --git a/packages/cloud/src/bridge/index.ts b/packages/cloud/src/bridge/index.ts index dc213ed652b0..352e2ee4a529 100644 --- a/packages/cloud/src/bridge/index.ts +++ b/packages/cloud/src/bridge/index.ts @@ -1,5 +1,4 @@ export { type BridgeOrchestratorOptions, BridgeOrchestrator } from "./BridgeOrchestrator.js" export { type SocketTransportOptions, SocketTransport } from "./SocketTransport.js" -export { BaseChannel } from "./BaseChannel.js" export { ExtensionChannel } from "./ExtensionChannel.js" diff --git a/packages/types/src/cloud.ts b/packages/types/src/cloud.ts index b412eb18891f..340debda2dee 100644 --- a/packages/types/src/cloud.ts +++ b/packages/types/src/cloud.ts @@ -421,7 +421,10 @@ export type ExtensionInstance = z.infer */ export enum ExtensionBridgeEventName { + // Task Provider Lifecycle (1) TaskCreated = RooCodeEventName.TaskCreated, + + // Task Lifecycle (9) TaskStarted = RooCodeEventName.TaskStarted, TaskCompleted = RooCodeEventName.TaskCompleted, TaskAborted = RooCodeEventName.TaskAborted, @@ -432,129 +435,115 @@ export enum ExtensionBridgeEventName { TaskResumable = RooCodeEventName.TaskResumable, TaskIdle = RooCodeEventName.TaskIdle, + // Subtask Lifecycle (3) TaskPaused = RooCodeEventName.TaskPaused, TaskUnpaused = RooCodeEventName.TaskUnpaused, TaskSpawned = RooCodeEventName.TaskSpawned, + // Task Execution (4) + Message = RooCodeEventName.Message, + TaskModeSwitched = RooCodeEventName.TaskModeSwitched, + TaskAskResponded = RooCodeEventName.TaskAskResponded, TaskUserMessage = RooCodeEventName.TaskUserMessage, + // Task Analytics (2) TaskTokenUsageUpdated = RooCodeEventName.TaskTokenUsageUpdated, - - ModeChanged = RooCodeEventName.ModeChanged, - ProviderProfileChanged = RooCodeEventName.ProviderProfileChanged, - - InstanceRegistered = "instance_registered", - InstanceUnregistered = "instance_unregistered", - HeartbeatUpdated = "heartbeat_updated", + TaskToolFailed = RooCodeEventName.TaskToolFailed, } +const extensionBridgeEventPayloadSchema = z.object({ + instance: extensionInstanceSchema, + timestamp: z.number(), +}) + export const extensionBridgeEventSchema = z.discriminatedUnion("type", [ + // Task Provider Lifecycle (1) z.object({ type: z.literal(ExtensionBridgeEventName.TaskCreated), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), + + // Task Lifecycle (9) z.object({ type: z.literal(ExtensionBridgeEventName.TaskStarted), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskCompleted), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskAborted), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskFocused), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskUnfocused), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskActive), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskInteractive), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskResumable), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskIdle), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), + // Subtask Lifecycle (3) z.object({ type: z.literal(ExtensionBridgeEventName.TaskPaused), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskUnpaused), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ type: z.literal(ExtensionBridgeEventName.TaskSpawned), - instance: extensionInstanceSchema, - timestamp: z.number(), + ...extensionBridgeEventPayloadSchema.shape, }), + // Task Execution (4) z.object({ - type: z.literal(ExtensionBridgeEventName.TaskUserMessage), - instance: extensionInstanceSchema, - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.Message), + ...extensionBridgeEventPayloadSchema.shape, + taskId: z.string(), + action: z.string(), + message: clineMessageSchema, }), - z.object({ - type: z.literal(ExtensionBridgeEventName.TaskTokenUsageUpdated), - instance: extensionInstanceSchema, - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.TaskModeSwitched), + ...extensionBridgeEventPayloadSchema.shape, }), - z.object({ - type: z.literal(ExtensionBridgeEventName.ModeChanged), - instance: extensionInstanceSchema, - mode: z.string(), - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.TaskAskResponded), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ - type: z.literal(ExtensionBridgeEventName.ProviderProfileChanged), - instance: extensionInstanceSchema, - providerProfile: z.object({ name: z.string(), provider: z.string().optional() }), - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.TaskUserMessage), + ...extensionBridgeEventPayloadSchema.shape, }), + // Task Analytics (2) z.object({ - type: z.literal(ExtensionBridgeEventName.InstanceRegistered), - instance: extensionInstanceSchema, - timestamp: z.number(), - }), - z.object({ - type: z.literal(ExtensionBridgeEventName.InstanceUnregistered), - instance: extensionInstanceSchema, - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.TaskTokenUsageUpdated), + ...extensionBridgeEventPayloadSchema.shape, }), z.object({ - type: z.literal(ExtensionBridgeEventName.HeartbeatUpdated), - instance: extensionInstanceSchema, - timestamp: z.number(), + type: z.literal(ExtensionBridgeEventName.TaskToolFailed), + ...extensionBridgeEventPayloadSchema.shape, }), ]) @@ -568,6 +557,9 @@ export enum ExtensionBridgeCommandName { StartTask = "start_task", StopTask = "stop_task", ResumeTask = "resume_task", + Message = "message", + ApproveAsk = "approve_ask", + DenyAsk = "deny_ask", } export const extensionBridgeCommandSchema = z.discriminatedUnion("type", [ @@ -594,55 +586,11 @@ export const extensionBridgeCommandSchema = z.discriminatedUnion("type", [ payload: z.object({ taskId: z.string() }), timestamp: z.number(), }), -]) - -export type ExtensionBridgeCommand = z.infer - -/** - * TaskBridgeEvent - */ - -export enum TaskBridgeEventName { - Message = RooCodeEventName.Message, - TaskModeSwitched = RooCodeEventName.TaskModeSwitched, - TaskInteractive = RooCodeEventName.TaskInteractive, -} - -export const taskBridgeEventSchema = z.discriminatedUnion("type", [ - z.object({ - type: z.literal(TaskBridgeEventName.Message), - taskId: z.string(), - action: z.string(), - message: clineMessageSchema, - }), - z.object({ - type: z.literal(TaskBridgeEventName.TaskModeSwitched), - taskId: z.string(), - mode: z.string(), - }), - z.object({ - type: z.literal(TaskBridgeEventName.TaskInteractive), - taskId: z.string(), - }), -]) - -export type TaskBridgeEvent = z.infer - -/** - * TaskBridgeCommand - */ - -export enum TaskBridgeCommandName { - Message = "message", - ApproveAsk = "approve_ask", - DenyAsk = "deny_ask", -} - -export const taskBridgeCommandSchema = z.discriminatedUnion("type", [ z.object({ - type: z.literal(TaskBridgeCommandName.Message), - taskId: z.string(), + type: z.literal(ExtensionBridgeCommandName.Message), + instanceId: z.string(), payload: z.object({ + taskId: z.string(), text: z.string(), images: z.array(z.string()).optional(), mode: z.string().optional(), @@ -651,18 +599,20 @@ export const taskBridgeCommandSchema = z.discriminatedUnion("type", [ timestamp: z.number(), }), z.object({ - type: z.literal(TaskBridgeCommandName.ApproveAsk), - taskId: z.string(), + type: z.literal(ExtensionBridgeCommandName.ApproveAsk), + instanceId: z.string(), payload: z.object({ + taskId: z.string(), text: z.string().optional(), images: z.array(z.string()).optional(), }), timestamp: z.number(), }), z.object({ - type: z.literal(TaskBridgeCommandName.DenyAsk), - taskId: z.string(), + type: z.literal(ExtensionBridgeCommandName.DenyAsk), + instanceId: z.string(), payload: z.object({ + taskId: z.string(), text: z.string().optional(), images: z.array(z.string()).optional(), }), @@ -670,7 +620,7 @@ export const taskBridgeCommandSchema = z.discriminatedUnion("type", [ }), ]) -export type TaskBridgeCommand = z.infer +export type ExtensionBridgeCommand = z.infer /** * ExtensionSocketEvents @@ -691,38 +641,6 @@ export enum ExtensionSocketEvents { RELAYED_COMMAND = "extension:relayed_command", // relay from server } -/** - * TaskSocketEvents - */ - -export enum TaskSocketEvents { - JOIN = "task:join", - LEAVE = "task:leave", - - EVENT = "task:event", // event from extension task - RELAYED_EVENT = "task:relayed_event", // relay from server - - COMMAND = "task:command", // command from user - RELAYED_COMMAND = "task:relayed_command", // relay from server -} - -/** - * `emit()` Response Types - */ - -export type JoinResponse = { - success: boolean - error?: string - taskId?: string - timestamp?: string -} - -export type LeaveResponse = { - success: boolean - taskId?: string - timestamp?: string -} - /** * UsageStats */ diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index b33320c7be55..8dcf12e0ee8d 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -8,7 +8,7 @@ import { toolNamesSchema, toolUsageSchema } from "./tool.js" */ export enum RooCodeEventName { - // Task Provider Lifecycle + // Task Provider Lifecycle (1) TaskCreated = "taskCreated", // Task Lifecycle @@ -22,26 +22,26 @@ export enum RooCodeEventName { TaskResumable = "taskResumable", TaskIdle = "taskIdle", - // Subtask Lifecycle + // Subtask Lifecycle (3) TaskPaused = "taskPaused", TaskUnpaused = "taskUnpaused", TaskSpawned = "taskSpawned", - // Task Execution + // Task Execution (4) Message = "message", TaskModeSwitched = "taskModeSwitched", TaskAskResponded = "taskAskResponded", TaskUserMessage = "taskUserMessage", - // Task Analytics + // Task Analytics (2) TaskTokenUsageUpdated = "taskTokenUsageUpdated", TaskToolFailed = "taskToolFailed", - // Configuration Changes + // Configuration Changes (2) ModeChanged = "modeChanged", ProviderProfileChanged = "providerProfileChanged", - // Evals + // Evals (2) EvalPass = "evalPass", EvalFail = "evalFail", } From c878f3a9579a612d2041b04ef2e70a74479df541 Mon Sep 17 00:00:00 2001 From: cte Date: Mon, 20 Oct 2025 16:55:52 -0700 Subject: [PATCH 3/4] More progress --- .../cloud/src/bridge/BridgeOrchestrator.ts | 40 +++++-------------- packages/cloud/src/bridge/index.ts | 4 -- packages/cloud/src/index.ts | 2 +- src/extension.ts | 6 +-- 4 files changed, 13 insertions(+), 39 deletions(-) delete mode 100644 packages/cloud/src/bridge/index.ts diff --git a/packages/cloud/src/bridge/BridgeOrchestrator.ts b/packages/cloud/src/bridge/BridgeOrchestrator.ts index 52ef7fa6be44..b097072a7af9 100644 --- a/packages/cloud/src/bridge/BridgeOrchestrator.ts +++ b/packages/cloud/src/bridge/BridgeOrchestrator.ts @@ -23,11 +23,6 @@ export interface BridgeOrchestratorOptions { isCloudAgent: boolean } -/** - * Central orchestrator for the extension bridge system. - * Coordinates communication between the VSCode extension and web application - * through WebSocket connections and manages extension/task channels. - */ export class BridgeOrchestrator { private static instance: BridgeOrchestrator | null = null @@ -85,7 +80,7 @@ export class BridgeOrchestrator { } } - public static async connect(options: BridgeOrchestratorOptions) { + private static async connect(options: BridgeOrchestratorOptions) { const instance = BridgeOrchestrator.instance if (!instance) { @@ -103,13 +98,10 @@ export class BridgeOrchestrator { ) } } else { - if ( - instance.connectionState === ConnectionState.FAILED || - instance.connectionState === ConnectionState.DISCONNECTED - ) { - console.log( - `[BridgeOrchestrator#connectOrDisconnect] Re-connecting... (state: ${instance.connectionState})`, - ) + const connectionState = instance.socketTransport.getConnectionState() + + if (connectionState === ConnectionState.FAILED || connectionState === ConnectionState.DISCONNECTED) { + console.log(`[BridgeOrchestrator#connectOrDisconnect] Re-connecting... (state: ${connectionState})`) instance.reconnect().catch((error) => { console.error( @@ -118,7 +110,7 @@ export class BridgeOrchestrator { }) } else { console.log( - `[BridgeOrchestrator#connectOrDisconnect] Already connected or connecting (state: ${instance.connectionState})`, + `[BridgeOrchestrator#connectOrDisconnect] Already connected or connecting (state: ${connectionState})`, ) } } @@ -128,11 +120,10 @@ export class BridgeOrchestrator { const instance = BridgeOrchestrator.instance if (instance) { - try { - console.log( - `[BridgeOrchestrator#connectOrDisconnect] Disconnecting... (state: ${instance.connectionState})`, - ) + const connectionState = instance.socketTransport.getConnectionState() + try { + console.log(`[BridgeOrchestrator#connectOrDisconnect] Disconnecting... (state: ${connectionState})`) await instance.disconnect() } catch (error) { console.error( @@ -237,28 +228,19 @@ export class BridgeOrchestrator { await this.extensionChannel.onReconnect(socket) } - // Shared API - - public get connectionState(): ConnectionState { - return this.socketTransport.getConnectionState() - } - private async connect(): Promise { await this.socketTransport.connect() this.setupSocketListeners() } - public async disconnect(): Promise { + private async disconnect(): Promise { await this.extensionChannel.cleanup(this.socketTransport.getSocket()) await this.socketTransport.disconnect() BridgeOrchestrator.instance = null } - public async reconnect(): Promise { + private async reconnect(): Promise { await this.socketTransport.reconnect() - - // After a manual reconnect, we have a new socket instance - // so we need to set up listeners again. this.setupSocketListeners() } } diff --git a/packages/cloud/src/bridge/index.ts b/packages/cloud/src/bridge/index.ts deleted file mode 100644 index 352e2ee4a529..000000000000 --- a/packages/cloud/src/bridge/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export { type BridgeOrchestratorOptions, BridgeOrchestrator } from "./BridgeOrchestrator.js" -export { type SocketTransportOptions, SocketTransport } from "./SocketTransport.js" - -export { ExtensionChannel } from "./ExtensionChannel.js" diff --git a/packages/cloud/src/index.ts b/packages/cloud/src/index.ts index 65b92ebc13c1..7eb5be50c21d 100644 --- a/packages/cloud/src/index.ts +++ b/packages/cloud/src/index.ts @@ -2,7 +2,7 @@ export * from "./config.js" export { CloudService } from "./CloudService.js" -export { BridgeOrchestrator } from "./bridge/index.js" +export { BridgeOrchestrator } from "./bridge/BridgeOrchestrator.js" export { RetryQueue } from "./retry-queue/index.js" export type { QueuedRequest, QueueStats, RetryQueueConfig, RetryQueueEvents } from "./retry-queue/index.js" diff --git a/src/extension.ts b/src/extension.ts index 5db0996ad657..c75f75591161 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -353,11 +353,7 @@ export async function deactivate() { } } - const bridge = BridgeOrchestrator.getInstance() - - if (bridge) { - await bridge.disconnect() - } + await BridgeOrchestrator.disconnect() await McpServerManager.cleanup(extensionContext) TelemetryService.instance.shutdown() From 86977d27b5157389b936599044ac52eaf7c45d65 Mon Sep 17 00:00:00 2001 From: cte Date: Mon, 20 Oct 2025 17:08:31 -0700 Subject: [PATCH 4/4] More progress --- .../cloud/src/bridge/BridgeOrchestrator.ts | 72 +++++-------------- packages/cloud/src/bridge/ExtensionChannel.ts | 2 +- packages/cloud/src/bridge/SocketTransport.ts | 18 ++--- 3 files changed, 28 insertions(+), 64 deletions(-) diff --git a/packages/cloud/src/bridge/BridgeOrchestrator.ts b/packages/cloud/src/bridge/BridgeOrchestrator.ts index b097072a7af9..d0d48cdee434 100644 --- a/packages/cloud/src/bridge/BridgeOrchestrator.ts +++ b/packages/cloud/src/bridge/BridgeOrchestrator.ts @@ -1,6 +1,8 @@ import crypto from "crypto" import os from "os" +import type { Socket } from "socket.io-client" + import { type TaskProviderLike, type CloudUserInfo, @@ -91,7 +93,7 @@ export class BridgeOrchestrator { await options.provider.getTelemetryProperties() BridgeOrchestrator.instance = new BridgeOrchestrator(options) - await BridgeOrchestrator.instance.connect() + await BridgeOrchestrator.instance.socketTransport.connect() } catch (error) { console.error( `[BridgeOrchestrator#connectOrDisconnect] connect() failed: ${error instanceof Error ? error.message : String(error)}`, @@ -103,7 +105,7 @@ export class BridgeOrchestrator { if (connectionState === ConnectionState.FAILED || connectionState === ConnectionState.DISCONNECTED) { console.log(`[BridgeOrchestrator#connectOrDisconnect] Re-connecting... (state: ${connectionState})`) - instance.reconnect().catch((error) => { + instance.socketTransport.reconnect().catch((error) => { console.error( `[BridgeOrchestrator#connectOrDisconnect] reconnect() failed: ${error instanceof Error ? error.message : String(error)}`, ) @@ -124,7 +126,7 @@ export class BridgeOrchestrator { try { console.log(`[BridgeOrchestrator#connectOrDisconnect] Disconnecting... (state: ${connectionState})`) - await instance.disconnect() + await instance.socketTransport.disconnect() } catch (error) { console.error( `[BridgeOrchestrator#connectOrDisconnect] disconnect() failed: ${error instanceof Error ? error.message : String(error)}`, @@ -161,9 +163,18 @@ export class BridgeOrchestrator { reconnectionDelay: this.RECONNECT_DELAY, reconnectionDelayMax: this.RECONNECT_DELAY_MAX, }, - onConnect: () => this.handleConnect(), - onDisconnect: () => this.handleDisconnect(), - onReconnect: () => this.handleReconnect(), + onConnect: async (socket: Socket) => { + this.setupSocketListeners() + await this.extensionChannel.onConnect(socket) + }, + onDisconnect: async () => { + await this.extensionChannel.onDisconnect() + await this.extensionChannel.cleanup(this.socketTransport.getSocket()) + }, + onReconnect: async (socket: Socket) => { + this.setupSocketListeners() + await this.extensionChannel.onReconnect(socket) + }, }) this.extensionChannel = new ExtensionChannel({ @@ -193,54 +204,7 @@ export class BridgeOrchestrator { `[BridgeOrchestrator] on(${ExtensionSocketEvents.RELAYED_COMMAND}) -> ${message.type} for ${message.instanceId}`, ) - this.extensionChannel?.handleCommand(message) + this.extensionChannel.handleCommand(message) }) } - - private async handleConnect() { - const socket = this.socketTransport.getSocket() - - if (!socket) { - console.error("[BridgeOrchestrator#handleConnect] Socket not available") - return - } - - await this.extensionChannel.onConnect(socket) - } - - private handleDisconnect() { - this.extensionChannel.onDisconnect() - } - - private async handleReconnect() { - const socket = this.socketTransport.getSocket() - - if (!socket) { - console.error("[BridgeOrchestrator] Socket not available after reconnect") - return - } - - // Re-setup socket listeners to ensure they're properly configured - // after automatic reconnection (Socket.IO's built-in reconnection) - // The socket.off() calls in setupSocketListeners prevent duplicates - this.setupSocketListeners() - - await this.extensionChannel.onReconnect(socket) - } - - private async connect(): Promise { - await this.socketTransport.connect() - this.setupSocketListeners() - } - - private async disconnect(): Promise { - await this.extensionChannel.cleanup(this.socketTransport.getSocket()) - await this.socketTransport.disconnect() - BridgeOrchestrator.instance = null - } - - private async reconnect(): Promise { - await this.socketTransport.reconnect() - this.setupSocketListeners() - } } diff --git a/packages/cloud/src/bridge/ExtensionChannel.ts b/packages/cloud/src/bridge/ExtensionChannel.ts index 1a46e3419901..26134d92c4f9 100644 --- a/packages/cloud/src/bridge/ExtensionChannel.ts +++ b/packages/cloud/src/bridge/ExtensionChannel.ts @@ -75,7 +75,7 @@ export class ExtensionChannel { this.startHeartbeat(socket) } - public onDisconnect(): void { + public async onDisconnect(): Promise { this.socket = null this.stopHeartbeat() } diff --git a/packages/cloud/src/bridge/SocketTransport.ts b/packages/cloud/src/bridge/SocketTransport.ts index 2df3cf95eba6..bb024562442c 100644 --- a/packages/cloud/src/bridge/SocketTransport.ts +++ b/packages/cloud/src/bridge/SocketTransport.ts @@ -5,9 +5,9 @@ import { ConnectionState, type RetryConfig } from "@roo-code/types" export interface SocketTransportOptions { url: string socketOptions: Partial - onConnect?: () => void | Promise - onDisconnect?: (reason: string) => void - onReconnect?: () => void | Promise + onConnect?: (socket: Socket) => Promise + onDisconnect?: (reason: string) => Promise + onReconnect?: (socket: Socket) => Promise logger?: { log: (message: string, ...args: unknown[]) => void error: (message: string, ...args: unknown[]) => void @@ -126,12 +126,12 @@ export class SocketTransport { this.connectionState = ConnectionState.CONNECTED if (this.isPreviouslyConnected) { - if (this.options.onReconnect) { - await this.options.onReconnect() + if (this.options.onReconnect && this.socket) { + await this.options.onReconnect(this.socket) } } else { - if (this.options.onConnect) { - await this.options.onConnect() + if (this.options.onConnect && this.socket) { + await this.options.onConnect(this.socket) } } @@ -195,8 +195,8 @@ export class SocketTransport { console.log(`[SocketTransport#_connect] on(reconnect) - ${attempt}`) this.connectionState = ConnectionState.CONNECTED - if (this.options.onReconnect) { - this.options.onReconnect() + if (this.options.onReconnect && this.socket) { + this.options.onReconnect(this.socket) } })