diff --git a/README.md b/README.md index 9326f65d..7085a57e 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,7 @@ ai-devkit agent send "run the tests and report back" --id --wait npm test 2>&1 | ai-devkit agent send --id --stdin # Pipe a session through Telegram — operate your agent from your phone -ai-devkit channel start telegram +ai-devkit channel start telegram --agent --daemon ``` Useful for long-running tasks, scheduled work, or checking on an agent from your phone at lunch. diff --git a/docs/ai/design/feature-channel-daemon-mode.md b/docs/ai/design/feature-channel-daemon-mode.md new file mode 100644 index 00000000..b183f0d8 --- /dev/null +++ b/docs/ai/design/feature-channel-daemon-mode.md @@ -0,0 +1,92 @@ +--- +phase: design +title: "Channel Daemon Mode: System Design" +description: CLI daemon lifecycle design for ai-devkit channel bridges +--- + +# System Design: Channel Daemon Mode + +## Architecture Overview + +`channel start --daemon` is implemented as a CLI process wrapper around an internal bridge runner. The parent CLI validates arguments, checks bridge state, spawns a detached child process, records metadata in the existing bridge registry, and exits. The child runs `channel-daemon.js`, which calls `runChannelBridge()` directly instead of re-entering Commander. Daemon stdout and stderr are appended to a per-channel log file and the parent prints that path after startup. + +```mermaid +graph TD + USER[Developer CLI] --> START[channel start --daemon] + START --> STATE[ChannelService bridge registry] + START -->|no live daemon| SPAWN[Detached child process] + SPAWN --> CHILD[channel-daemon.js] + SPAWN --> LOG[(~/.ai-devkit/channel-logs/name.log)] + CHILD --> BRIDGE[runChannelBridge] + BRIDGE --> TELEGRAM[Telegram adapter] + BRIDGE --> AGENT[Agent manager] + STATE --> FILE[(~/.ai-devkit/channel-bridges.json)] + USER --> STOP[channel stop] + STOP --> STATE + STOP -->|SIGTERM| CHILD + CHILD -->|cleanup| STATE +``` + +## Data Models + +### ChannelBridgeProcess +```typescript +interface ChannelBridgeProcess { + channelName: string; + channelType: string; + agentName: string; + agentPid: number; + bridgePid: number; + startedAt: string; + logPath?: string; +} +``` + +State is stored in the existing `~/.ai-devkit/channel-bridges.json` registry with restrictive permissions when possible. Daemon parent processes initially record `agentPid: 0`; `runChannelBridge()` overwrites the entry with the resolved agent PID after it finishes startup. Daemon logs are stored separately at `~/.ai-devkit/channel-logs/.log`. + +## API Design + +### CLI +```text +ai-devkit channel start [name] --agent --daemon +ai-devkit channel stop +``` + +### Internal Service +```typescript +class ChannelService { + getLiveBridges(): Promise; + startDaemonBridge(input: StartDaemonBridgeInput): Promise; + stopBridge(channelName?: string): Promise; + unregisterBridge(channelName: string): Promise; +} +``` + +The service owns process spawning, PID liveness checks, stale state cleanup, and stop signaling. + +## Component Breakdown + +- `packages/cli/src/services/channel/channel.service.ts`: bridge registry, daemon spawn, and stop lifecycle. +- `packages/cli/src/services/channel/channel-runner.ts`: reusable foreground bridge runtime. +- `packages/cli/src/channel-daemon.ts`: internal daemon child entrypoint. +- `packages/cli/src/commands/channel.ts`: adds the `--daemon` start option and `stop` subcommand. +- Existing channel bridge code remains the foreground execution path. + +## Design Decisions + +### Recommended Approach: Detached Child Process +Spawn a detached Node process that runs a dedicated internal daemon entrypoint. The daemon entrypoint calls the same `runChannelBridge()` function used by foreground start. This keeps bridge logic in one place, avoids hidden public CLI flags, and avoids OS-specific service management. + +### Alternatives Considered +- **External process manager**: PM2/systemd/launchd would provide mature daemon features, but would add setup burden and platform-specific instructions. +- **Always background by default**: Simpler lifecycle for long-running bridges, but it would break existing users who expect foreground logs and Ctrl-C behavior. +- **Multiple daemon registry**: More flexible, but unnecessary for the current single-user Telegram bridge and increases stop/status complexity. + +## Non-Functional Requirements + +- Daemon start should return quickly after spawning and recording state. +- Daemon start and status should print the log path so users can inspect background failures. +- Stop should prefer graceful shutdown with `SIGTERM`. +- Stale state should not block future daemon starts. +- Foreground behavior must remain backward compatible. +- Bridge metadata must not include bot tokens or other secrets. diff --git a/docs/ai/implementation/feature-channel-daemon-mode.md b/docs/ai/implementation/feature-channel-daemon-mode.md new file mode 100644 index 00000000..9bd3d925 --- /dev/null +++ b/docs/ai/implementation/feature-channel-daemon-mode.md @@ -0,0 +1,60 @@ +--- +phase: implementation +title: "Channel Daemon Mode: Implementation Guide" +description: Implementation notes for channel daemon start and stop +--- + +# Implementation Guide: Channel Daemon Mode + +## Development Setup + +- Work in `feature-channel-daemon-mode`. +- Use `npm ci` from the repository root. +- Run targeted CLI tests during development. + +## Code Structure + +- `packages/cli/src/commands/channel.ts`: command registration and user-facing messages. +- `packages/cli/src/services/channel/channel-runner.ts`: shared foreground bridge runtime. +- `packages/cli/src/services/channel/channel.service.ts`: bridge registry, daemon spawn, and stop lifecycle. +- `packages/cli/src/channel-daemon.ts`: internal detached child entrypoint. + +## Implementation Notes + +### Core Features +- `channel start --daemon` validates the same channel and agent arguments as foreground start. +- The parent process spawns a detached `channel-daemon.js` child process. +- The parent process redirects daemon stdout and stderr to `~/.ai-devkit/channel-logs/.log` and prints that path. +- `channel status` prints the log path for running daemon bridges when it is present in bridge state. +- Foreground start and daemon child execution both call `runChannelBridge()`. +- Dev mode launches `src/channel-daemon.ts` through `ts-node`; built mode launches `dist/channel-daemon.js`. +- `channel stop [name]` reads the existing bridge registry, checks PID liveness, sends `SIGTERM`, and clears state. + +### Patterns & Best Practices +- Keep daemon metadata free of secrets. +- Prefer dependency injection for `spawn`, `process.kill`, and filesystem paths in tests. +- Treat stale state as recoverable and remove it before starting a new daemon. + +## Integration Points + +- Reuse the existing channel resolution and bridge startup logic. +- Persist daemon bridge state in the existing `~/.ai-devkit/channel-bridges.json` registry. +- Persist daemon log location in bridge state as `logPath`. +- Keep channel connector unchanged. + +## Error Handling + +- No daemon state: print a clear no-op message. +- Stale state: remove it and continue for start, report it for stop. +- Permission or signal failure: show a clear error and keep enough state for manual inspection. + +## Performance Considerations + +- Daemon start should not block on long-lived bridge loops. +- PID checks should be synchronous and cheap. + +## Security Notes + +- Do not write bot tokens, chat IDs, or message content to daemon state. +- Treat daemon logs as local operational logs; they capture child process stdout and stderr for debugging. +- Use restrictive file permissions for state files where supported. diff --git a/docs/ai/planning/feature-channel-daemon-mode.md b/docs/ai/planning/feature-channel-daemon-mode.md new file mode 100644 index 00000000..a923d0f0 --- /dev/null +++ b/docs/ai/planning/feature-channel-daemon-mode.md @@ -0,0 +1,62 @@ +--- +phase: planning +title: "Channel Daemon Mode: Planning" +description: Task breakdown for adding daemon start and channel stop +--- + +# Planning: Channel Daemon Mode + +## Milestones + +- [x] Milestone 1: Document and review daemon requirements/design +- [x] Milestone 2: Implement daemon lifecycle service +- [x] Milestone 3: Wire CLI options and stop command +- [x] Milestone 4: Add focused tests and verification + +## Task Breakdown + +### Phase 1: Foundation +- [x] Task 1.1: Identify current channel command structure and bridge execution path. +- [x] Task 1.2: Add daemon state model and state file helpers. +- [x] Task 1.3: Add PID liveness checks with stale state cleanup. + +### Phase 2: Core Features +- [x] Task 2.1: Add `--daemon` option to `channel start`. +- [x] Task 2.2: Spawn detached child process for daemon start. +- [x] Task 2.3: Add an internal daemon entrypoint to avoid recursive CLI spawning. +- [x] Task 2.4: Add `channel stop` command that terminates the recorded daemon. + +### Phase 3: Integration & Polish +- [x] Task 3.1: Prevent duplicate daemon starts while a recorded daemon is alive. +- [x] Task 3.2: Improve user-facing messages for started, stopped, absent, and stale daemon states. +- [x] Task 3.3: Add unit tests for daemon start/stop behavior. +- [x] Task 3.4: Run CLI/package tests and feature lint. + +## Dependencies + +- Existing `packages/cli/src/commands/channel.ts` bridge implementation. +- Existing channel service tests and config path conventions. +- Node.js `child_process.spawn` and process signaling. + +## Timeline & Estimates + +- Foundation: Small +- Core CLI lifecycle: Medium +- Tests and verification: Small to medium + +## Risks & Mitigation + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Detached child cannot be reliably confirmed as healthy immediately | Start may report success before bridge fully connects | Record process spawn success in v1; keep foreground mode for debugging | +| Stale PID points to an unrelated process after PID reuse | Stop could signal the wrong process | Store command metadata and keep stale checks conservative; future work can add heartbeat | +| Duplicate Telegram polling daemons conflict | Bot updates may be consumed unpredictably | Enforce one live daemon state in v1 | + +## Resources Needed + +- Existing Jest test setup. +- Existing CLI command and channel service patterns. + +## Progress Summary + +Implemented daemon lifecycle support in the existing channel service and command layer. `channel start --daemon` now spawns a detached `channel-daemon.js` child, records the child PID in the existing bridge registry, and prevents duplicate live bridges. Foreground start and daemon child execution share `runChannelBridge()`. `channel stop [name]` terminates a recorded live bridge with `SIGTERM` and cleans up registry state. Feature lint, CLI lint, CLI build, dependency builds, and the full CLI Jest suite pass. diff --git a/docs/ai/requirements/feature-channel-daemon-mode.md b/docs/ai/requirements/feature-channel-daemon-mode.md new file mode 100644 index 00000000..9ef025a8 --- /dev/null +++ b/docs/ai/requirements/feature-channel-daemon-mode.md @@ -0,0 +1,76 @@ +--- +phase: requirements +title: "Channel Daemon Mode: Requirements" +description: Add background start and stop lifecycle support for ai-devkit channel bridges +--- + +# Requirements: Channel Daemon Mode + +## Problem Statement + +`ai-devkit channel start` currently runs the channel bridge as a foreground long-lived process. That works for local debugging, but it ties the bridge lifecycle to an active terminal and makes it awkward to keep a Telegram-to-agent bridge running while doing other work. + +Developers need a daemon mode so they can start the bridge in the background, close the invoking shell, and later stop the bridge with a dedicated command. + +## Goals & Objectives + +### Primary Goals +- Add daemon mode to `ai-devkit channel start`. +- Add `ai-devkit channel stop` to terminate a running daemon bridge. +- Persist enough bridge metadata to make stop/status behavior deterministic. +- Preserve the existing foreground `channel start` behavior by default. + +### Secondary Goals +- Keep daemon lifecycle logic inside the CLI package. +- Reuse the existing channel bridge implementation instead of duplicating bridge behavior. +- Make stale daemon state easy to detect and recover from. + +### Non-Goals +- Running multiple channel bridge daemons concurrently. +- Implementing OS-specific launch agents, services, or auto-start on boot. +- Adding a new channel connector package API for daemon management. +- Managing or stopping the target AI agent process. + +## User Stories & Use Cases + +### US-1: Start in the Background +As a developer, I want to run `ai-devkit channel start --agent --daemon` so that the channel bridge keeps running after the command returns. + +### US-2: Stop the Background Bridge +As a developer, I want to run `ai-devkit channel stop` so that the running channel bridge shuts down cleanly. + +### US-3: See Useful Feedback +As a developer, I want the start command to print the daemon PID and basic context so that I can tell what is running. + +### US-4: Avoid Duplicate Daemons +As a developer, I want daemon start to fail when an existing bridge daemon is already running so that two bridge processes do not compete for the same Telegram bot polling stream. + +### US-5: Recover from Stale State +As a developer, I want stale daemon metadata to be ignored or removed when the recorded process is no longer alive so that a crashed daemon does not block future starts. + +## Success Criteria + +- `ai-devkit channel start --agent ` still runs the bridge in the foreground. +- `ai-devkit channel start --agent --daemon` spawns a detached bridge process and exits successfully after recording daemon state. +- `ai-devkit channel stop` sends a graceful termination signal to the recorded daemon process and removes daemon state once stopped. +- Stop reports a clear message when no daemon is running. +- Start refuses to launch a second daemon when the recorded PID is alive. +- Unit tests cover daemon start, duplicate prevention, stale state cleanup, and stop behavior. + +## Constraints & Assumptions + +### Constraints +- Follow existing TypeScript, Commander, Jest, and CLI service patterns. +- Use Node.js standard process primitives for cross-platform compatibility. +- Persist bridge state under the existing `~/.ai-devkit` area. +- Do not require additional runtime dependencies. + +### Assumptions +- One daemon bridge at a time is sufficient for the current channel UX. +- The daemon can re-enter the existing `channel start` command internally with a private child-process flag. +- The existing `channel status` command can be extended later if it does not already surface daemon state. + +## Questions & Open Items + +- Should `channel stop` accept a channel name in the future if multiple concurrent daemons are added? +- Should daemon logs be written to a user-visible file in `~/.ai-devkit`, or should v1 redirect to ignored stdio only? diff --git a/docs/ai/testing/feature-channel-daemon-mode.md b/docs/ai/testing/feature-channel-daemon-mode.md new file mode 100644 index 00000000..675de3f5 --- /dev/null +++ b/docs/ai/testing/feature-channel-daemon-mode.md @@ -0,0 +1,69 @@ +--- +phase: testing +title: "Channel Daemon Mode: Testing Strategy" +description: Test coverage plan for channel daemon start and stop +--- + +# Testing Strategy: Channel Daemon Mode + +## Test Coverage Goals + +- Cover all new daemon lifecycle branches in unit tests. +- Keep foreground `channel start` behavior covered by existing tests. +- Avoid tests that require a real Telegram bot or real long-running child process. + +## Unit Tests + +### ChannelService +- [x] Starts a daemon by spawning a detached child process and writing state. +- [x] Redirects daemon stdout and stderr to the per-channel log file. +- [x] Refuses to start when recorded daemon PID is alive. +- [x] Removes stale state when recorded PID is not alive. +- [x] Stops a running bridge with `SIGTERM` and clears state. +- [x] Requires a channel name when multiple live bridges are running. + +### Channel Command +- [x] `channel start --daemon` delegates to daemon service. +- [x] `channel start` without `--daemon` uses existing foreground behavior. +- [x] Daemon start launches `channel-daemon.js` instead of using a hidden CLI child option. +- [x] Dev-mode daemon start launches `channel-daemon.ts` through `ts-node`. +- [x] `channel stop` delegates to daemon stop and prints useful output. +- [x] `channel start --daemon` prints the daemon log path. +- [x] `channel status` shows the daemon log path for a running bridge. + +## Integration Tests + +- [x] CLI command parser accepts `channel start --agent --daemon`. +- [x] CLI command parser accepts `channel stop`. + +## End-to-End Tests + +- Manual later: connect a Telegram channel, start daemon bridge, send a message, stop daemon, confirm no further messages are processed. + +## Test Data + +- Temporary daemon state directory. +- Mock child process spawn result. +- Mock PID liveness and kill behavior. + +## Test Reporting & Coverage + +- Run targeted CLI tests after implementation. +- Run `npx ai-devkit@latest lint --feature channel-daemon-mode`. + +Targeted evidence: +- `npx jest src/__tests__/commands/channel.test.ts src/__tests__/services/channel/channel.service.test.ts --runInBand`: 35 passed. +- `npm run test --workspace packages/cli -- --runInBand`: 35 suites passed, 559 tests passed. +- `npm run build`: 4 projects built successfully. + +## Manual Testing + +- Optional manual Telegram smoke test requires a real bot token and a running agent. + +## Performance Testing + +- Not required for v1; daemon lifecycle operations are local process and file operations. + +## Bug Tracking + +- Regressions should be covered by focused Jest tests before broader CLI test execution. diff --git a/packages/cli/src/__tests__/commands/channel.test.ts b/packages/cli/src/__tests__/commands/channel.test.ts index fcb7ea30..0ea5f846 100644 --- a/packages/cli/src/__tests__/commands/channel.test.ts +++ b/packages/cli/src/__tests__/commands/channel.test.ts @@ -48,6 +48,8 @@ const mockChannelService = { getLiveBridgeByChannel: jest.fn<(channelName: string) => Promise>(), registerBridge: jest.fn<(entry: unknown) => Promise>(), unregisterBridge: jest.fn<(channelName: string) => Promise>(), + startDaemonBridge: jest.fn<(entry: unknown) => Promise>(), + stopBridge: jest.fn<(channelName?: string) => Promise>(), }; jest.mock('@ai-devkit/channel-connector', () => ({ @@ -69,7 +71,7 @@ jest.mock('@ai-devkit/agent-manager', () => ({ TtyWriter: { send: jest.fn(), }, -})); +}), { virtual: true }); jest.mock('inquirer', () => ({ __esModule: true, @@ -107,8 +109,11 @@ jest.mock('../../services/channel/channel.service', () => ({ // eslint-disable-next-line @typescript-eslint/no-var-requires const { registerChannelCommand, - startOutputPolling, } = require('../../commands/channel'); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { + startOutputPolling, +} = require('../../services/channel/channel-runner'); const POLL_INTERVAL_MS = 2000; @@ -164,10 +169,31 @@ describe('startOutputPolling', () => { mockChannelService.getLiveBridgeByChannel.mockClear(); mockChannelService.registerBridge.mockClear(); mockChannelService.unregisterBridge.mockClear(); + mockChannelService.startDaemonBridge.mockClear(); + mockChannelService.stopBridge.mockClear(); mockChannelService.resolveConnectChannelName.mockImplementation((name?: string) => name ?? 'telegram'); mockChannelService.resolveStartChannelName.mockImplementation((config: any, name?: string) => name ?? Object.keys(config.channels)[0]); mockChannelService.getLiveBridges.mockResolvedValue([]); mockChannelService.getLiveBridgeByChannel.mockResolvedValue(undefined); + mockChannelService.startDaemonBridge.mockResolvedValue({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 0, + bridgePid: 9876, + startedAt: '2026-05-24T00:00:00.000Z', + }); + mockChannelService.stopBridge.mockResolvedValue({ + stopped: true, + bridge: { + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 4321, + bridgePid: 9876, + startedAt: '2026-05-24T00:00:00.000Z', + }, + }); mockGetMe.mockResolvedValue({ username: 'test_bot' }); jest.clearAllMocks(); }); @@ -383,10 +409,32 @@ describe('channel command', () => { mockChannelService.getLiveBridgeByChannel.mockClear(); mockChannelService.registerBridge.mockClear(); mockChannelService.unregisterBridge.mockClear(); + mockChannelService.startDaemonBridge.mockClear(); + mockChannelService.stopBridge.mockClear(); mockChannelService.resolveConnectChannelName.mockImplementation((name?: string) => name ?? 'telegram'); mockChannelService.resolveStartChannelName.mockImplementation((config: any, name?: string) => name ?? Object.keys(config.channels)[0]); mockChannelService.getLiveBridges.mockResolvedValue([]); mockChannelService.getLiveBridgeByChannel.mockResolvedValue(undefined); + mockChannelService.startDaemonBridge.mockResolvedValue({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 0, + bridgePid: 9876, + logPath: '/tmp/channel-logs/personal.log', + startedAt: '2026-05-24T00:00:00.000Z', + }); + mockChannelService.stopBridge.mockResolvedValue({ + stopped: true, + bridge: { + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 4321, + bridgePid: 9876, + startedAt: '2026-05-24T00:00:00.000Z', + }, + }); mockGetMe.mockReset(); mockGetMe.mockResolvedValue({ username: 'test_bot' }); mockSpinner.start.mockReset(); @@ -521,6 +569,7 @@ describe('channel command', () => { personal: personalEntry, }, }); + mockConfigStore.getChannel.mockResolvedValue(personalEntry); const agent = makeAgent({ name: 'codex-main', type: 'codex', pid: 4321 }); mockAgentManager.listAgents.mockResolvedValue([agent]); mockAgentManager.resolveAgent.mockReturnValue(agent); @@ -554,4 +603,74 @@ describe('channel command', () => { jest.clearAllTimers(); jest.useRealTimers(); }); + + it('starts a daemon bridge without resolving the agent in the parent process', async () => { + mockConfigStore.getConfig.mockResolvedValue({ + channels: { + personal: personalEntry, + }, + }); + + const program = new Command(); + registerChannelCommand(program); + await program.parseAsync(['node', 'test', 'channel', 'start', 'personal', '--agent', 'codex-main', '--daemon']); + + expect(mockChannelService.startDaemonBridge).toHaveBeenCalledWith(expect.objectContaining({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + command: process.execPath, + args: expect.arrayContaining(['--channel', 'personal', '--agent', 'codex-main']), + cwd: process.cwd(), + })); + const daemonInput = mockChannelService.startDaemonBridge.mock.calls[0][0] as { args: string[] }; + expect(daemonInput.args[0]).toEqual(expect.stringContaining('ts-node')); + expect(daemonInput.args[1]).toEqual(expect.stringContaining('channel-daemon.ts')); + expect(mockAgentManager.listAgents).not.toHaveBeenCalled(); + expect(ui.success).toHaveBeenCalledWith('Channel bridge daemon started for "personal" (PID: 9876).'); + expect(ui.info).toHaveBeenCalledWith('Logs: /tmp/channel-logs/personal.log'); + }); + + it('shows the daemon log path in channel status', async () => { + mockConfigStore.getConfig.mockResolvedValue({ + channels: { + personal: personalEntry, + }, + }); + mockChannelService.getLiveBridges.mockResolvedValue([{ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 4321, + bridgePid: 9876, + logPath: '/tmp/channel-logs/personal.log', + startedAt: '2026-05-24T00:00:00.000Z', + }]); + + const program = new Command(); + registerChannelCommand(program); + await program.parseAsync(['node', 'test', 'channel', 'status', 'personal']); + + expect(ui.text).toHaveBeenCalledWith(' Logs: /tmp/channel-logs/personal.log'); + }); + + it('stops a running channel bridge', async () => { + const program = new Command(); + registerChannelCommand(program); + await program.parseAsync(['node', 'test', 'channel', 'stop', 'personal']); + + expect(mockChannelService.stopBridge).toHaveBeenCalledWith('personal'); + expect(ui.success).toHaveBeenCalledWith('Channel bridge stopped: personal (PID: 9876).'); + }); + + it('reports when no channel bridge is running during stop', async () => { + mockChannelService.stopBridge.mockResolvedValue({ stopped: false }); + + const program = new Command(); + registerChannelCommand(program); + await program.parseAsync(['node', 'test', 'channel', 'stop']); + + expect(mockChannelService.stopBridge).toHaveBeenCalledWith(undefined); + expect(ui.info).toHaveBeenCalledWith('No running channel bridge found.'); + }); }); diff --git a/packages/cli/src/__tests__/services/channel/channel.service.test.ts b/packages/cli/src/__tests__/services/channel/channel.service.test.ts index 321733a1..2fce6b8b 100644 --- a/packages/cli/src/__tests__/services/channel/channel.service.test.ts +++ b/packages/cli/src/__tests__/services/channel/channel.service.test.ts @@ -7,6 +7,8 @@ describe('ChannelService', () => { let tmpDir: string; let registryPath: string; let alivePids: Set; + let spawned: Array<{ command: string; args: string[]; options: { cwd: string; detached: true; stdio: ['ignore', number, number] }; unref: jest.Mock }>; + let killed: Array<{ pid: number; signal: NodeJS.Signals }>; let service: ChannelService; const personalEntry = { @@ -23,7 +25,20 @@ describe('ChannelService', () => { tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'channel-service-test-')); registryPath = path.join(tmpDir, 'channel-bridges.json'); alivePids = new Set(); - service = new ChannelService(registryPath, pid => alivePids.has(pid)); + spawned = []; + killed = []; + service = new ChannelService( + registryPath, + pid => alivePids.has(pid), + (command: string, args: string[], options: { cwd: string; detached: true; stdio: ['ignore', number, number] }) => { + const child = { pid: 300, unref: jest.fn() }; + spawned.push({ command, args, options, unref: child.unref }); + return child; + }, + (pid: number, signal: NodeJS.Signals) => { + killed.push({ pid, signal }); + }, + ); }); afterEach(() => { @@ -165,4 +180,135 @@ describe('ChannelService', () => { expect(await service.getLiveBridgeByChannel('personal')).toBeUndefined(); }); + + it('starts a daemon bridge by spawning a detached child and recording its pid', async () => { + const bridge = await service.startDaemonBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + command: 'node', + args: ['dist/channel-daemon.js', '--channel', 'personal', '--agent', 'codex-main'], + cwd: '/tmp/project', + }); + alivePids.add(300); + + expect(spawned).toEqual([expect.objectContaining({ + command: 'node', + args: ['dist/channel-daemon.js', '--channel', 'personal', '--agent', 'codex-main'], + options: { + cwd: '/tmp/project', + detached: true, + stdio: ['ignore', expect.any(Number), expect.any(Number)], + }, + })]); + expect(bridge).toEqual(expect.objectContaining({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 0, + bridgePid: 300, + logPath: path.join(tmpDir, 'channel-logs', 'personal.log'), + })); + expect(spawned[0].options.stdio[1]).toBe(spawned[0].options.stdio[2]); + expect(fs.readFileSync(path.join(tmpDir, 'channel-logs', 'personal.log'), 'utf-8')).toContain( + '[command] node dist/channel-daemon.js --channel personal --agent codex-main', + ); + expect(await service.getLiveBridgeByChannel('personal')).toEqual(expect.objectContaining({ + channelName: 'personal', + bridgePid: 300, + logPath: path.join(tmpDir, 'channel-logs', 'personal.log'), + })); + }); + + it('refuses to start a daemon when the channel already has a live bridge', async () => { + alivePids.add(200); + await service.registerBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 100, + bridgePid: 200, + startedAt: '2026-05-23T00:00:00.000Z', + }); + + await expect(service.startDaemonBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + command: 'node', + args: ['dist/cli.js'], + cwd: '/tmp/project', + })).rejects.toThrow('Channel "personal" bridge is already running (PID: 200).'); + expect(spawned).toEqual([]); + }); + + it('prunes stale state before starting a daemon bridge', async () => { + await service.registerBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 100, + bridgePid: 200, + startedAt: '2026-05-23T00:00:00.000Z', + }); + + await service.startDaemonBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + command: 'node', + args: ['dist/cli.js'], + cwd: '/tmp/project', + }); + + alivePids.add(300); + expect(await service.getLiveBridgeByChannel('personal')).toEqual(expect.objectContaining({ + bridgePid: 300, + })); + }); + + it('stops the only live bridge when no channel name is provided', async () => { + alivePids.add(200); + await service.registerBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 100, + bridgePid: 200, + startedAt: '2026-05-23T00:00:00.000Z', + }); + + const result = await service.stopBridge(); + + expect(result).toEqual({ + stopped: true, + bridge: expect.objectContaining({ channelName: 'personal', bridgePid: 200 }), + }); + expect(killed).toEqual([{ pid: 200, signal: 'SIGTERM' }]); + expect(await service.getLiveBridgeByChannel('personal')).toBeUndefined(); + }); + + it('requires a channel name when multiple live bridges exist', async () => { + alivePids.add(200); + alivePids.add(201); + await service.registerBridge({ + channelName: 'personal', + channelType: 'telegram', + agentName: 'codex-main', + agentPid: 100, + bridgePid: 200, + startedAt: '2026-05-23T00:00:00.000Z', + }); + await service.registerBridge({ + channelName: 'work', + channelType: 'telegram', + agentName: 'claude-review', + agentPid: 101, + bridgePid: 201, + startedAt: '2026-05-23T00:01:00.000Z', + }); + + await expect(service.stopBridge()).rejects.toThrow('Multiple channel bridges are running. Specify one: personal, work'); + expect(killed).toEqual([]); + }); }); diff --git a/packages/cli/src/channel-daemon.ts b/packages/cli/src/channel-daemon.ts new file mode 100644 index 00000000..48765420 --- /dev/null +++ b/packages/cli/src/channel-daemon.ts @@ -0,0 +1,48 @@ +#!/usr/bin/env node + +import { enableDebug } from './util/debug'; +import { getErrorMessage } from './util/text'; +import { ui } from './util/terminal-ui'; +import { runChannelBridge } from './services/channel/channel-runner'; + +interface DaemonArgs { + channelName?: string; + agentName?: string; + debug?: boolean; +} + +function readOption(args: string[], name: string): string | undefined { + const index = args.indexOf(name); + if (index === -1) return undefined; + return args[index + 1]; +} + +function parseArgs(args: string[]): DaemonArgs { + return { + channelName: readOption(args, '--channel'), + agentName: readOption(args, '--agent'), + debug: args.includes('--debug'), + }; +} + +async function main(): Promise { + const args = parseArgs(process.argv.slice(2)); + + if (args.debug) { + enableDebug(); + } + + if (!args.channelName || !args.agentName) { + throw new Error('Channel daemon requires --channel and --agent .'); + } + + await runChannelBridge({ + channelName: args.channelName, + agentName: args.agentName, + }); +} + +main().catch((error: unknown) => { + ui.error(getErrorMessage(error)); + process.exit(1); +}); diff --git a/packages/cli/src/commands/channel.ts b/packages/cli/src/commands/channel.ts index 9fc62582..3203a252 100644 --- a/packages/cli/src/commands/channel.ts +++ b/packages/cli/src/commands/channel.ts @@ -1,21 +1,10 @@ +import * as path from 'path'; +import { createRequire } from 'module'; import { Command } from 'commander'; import chalk from 'chalk'; import inquirer from 'inquirer'; -import { - AgentManager, - ClaudeCodeAdapter, - CodexAdapter, - GeminiCliAdapter, - TerminalFocusManager, - TtyWriter, - type AgentAdapter, - type AgentInfo, - type TerminalLocation, -} from '@ai-devkit/agent-manager'; import { Telegraf } from 'telegraf'; import { - ChannelManager, - TelegramAdapter, TELEGRAM_CHANNEL_TYPE, ConfigStore, type ChannelEntry, @@ -23,185 +12,28 @@ import { } from '@ai-devkit/channel-connector'; import { ui } from '../util/terminal-ui'; import { withErrorHandler } from '../util/errors'; -import { getErrorMessage } from '../util/text'; import { createLogger, enableDebug } from '../util/debug'; import { ChannelService } from '../services/channel/channel.service'; +import { runChannelBridge } from '../services/channel/channel-runner'; const debug = createLogger('channel'); -const AGENT_POLL_INTERVAL_MS = 2000; - -function createAgentManager(): AgentManager { - const manager = new AgentManager(); - manager.registerAdapter(new ClaudeCodeAdapter()); - manager.registerAdapter(new CodexAdapter()); - manager.registerAdapter(new GeminiCliAdapter()); - return manager; -} - -async function resolveTargetAgent(agentManager: AgentManager, agentName: string): Promise { - const agents = await agentManager.listAgents(); - - if (agents.length === 0) { - ui.error('No running agents detected.'); - return null; - } - - const resolved = agentManager.resolveAgent(agentName, agents); - if (!resolved) { - ui.error(`No agent found matching "${agentName}".`); - ui.info('Available agents:'); - agents.forEach(a => ui.text(` - ${a.name}`)); - return null; - } - - if (Array.isArray(resolved)) { - const { selectedAgent } = await inquirer.prompt([{ - type: 'list', - name: 'selectedAgent', - message: 'Multiple agents match. Select one:', - choices: resolved.map(a => ({ - name: `${a.name} (PID: ${a.pid})`, - value: a, - })), - }]); - return selectedAgent; - } - - return resolved as AgentInfo; -} - -function setupInputHandler( - telegram: TelegramAdapter, - terminalLocation: TerminalLocation, - chatIdRef: { value: string | null }, - onAuthorize?: (chatId: string) => Promise, -): void { - telegram.onMessage(async (msg) => { - debug(`Received message from chat ID: ${msg.chatId}, text length: ${msg.text?.length ?? 0}`); - - if (!chatIdRef.value) { - chatIdRef.value = msg.chatId; - await onAuthorize?.(msg.chatId); - ui.info(`Authorized Telegram user (chat ID: ${msg.chatId})`); - } - - if (msg.chatId !== chatIdRef.value) { - debug(`Rejected message from unauthorized chat ID: ${msg.chatId}`); - await telegram.sendMessage(msg.chatId, 'Unauthorized. Only the first user is allowed.'); - return; - } - - try { - await TtyWriter.send(terminalLocation, msg.text); - debug(`Sent message to agent terminal (length: ${msg.text?.length ?? 0})`); - } catch (error: unknown) { - const message = getErrorMessage(error); - ui.error(`Failed to send to agent: ${message}`); - await telegram.sendMessage(msg.chatId, `Failed to send to agent: ${message}`); - } - }); -} - -export function startOutputPolling( - telegram: TelegramAdapter, - agentAdapter: AgentAdapter, - agent: AgentInfo, - chatIdRef: { value: string | null }, -): NodeJS.Timeout { - let lastMessageCount = 0; - - debug(`startOutputPolling: sessionFilePath=${agent.sessionFilePath ?? 'null'}`); - - if (agent.sessionFilePath) { - try { - const existing = agentAdapter.getConversation(agent.sessionFilePath); - lastMessageCount = existing.length; - debug(`Initial conversation length: ${lastMessageCount}`); - } catch (error: unknown) { - debug(`Initial getConversation threw: ${getErrorMessage(error)}`); - } +const nodeRequire = createRequire(__filename); + +function resolveDaemonLaunch(): { command: string; args: string[] } { + if (path.extname(__filename) === '.ts') { + return { + command: process.execPath, + args: [ + nodeRequire.resolve('ts-node/dist/bin.js'), + path.resolve(__dirname, '..', 'channel-daemon.ts'), + ], + }; } - let tickCount = 0; - let lastReportedLength = lastMessageCount; - - return setInterval(async () => { - tickCount += 1; - - if (!chatIdRef.value) { - if (tickCount % 15 === 1) { - debug(`poll skip: no authorized chat yet (tick ${tickCount})`); - } - return; - } - if (!agent.sessionFilePath) { - if (tickCount % 15 === 1) { - debug(`poll skip: agent has no sessionFilePath (tick ${tickCount})`); - } - return; - } - - let newMessages; - try { - const conversation = agentAdapter.getConversation(agent.sessionFilePath); - newMessages = conversation.slice(lastMessageCount); - if (conversation.length !== lastReportedLength) { - debug(`Conversation length changed: ${lastReportedLength} -> ${conversation.length} (lastMessageCount=${lastMessageCount}, new=${newMessages.length})`); - lastReportedLength = conversation.length; - } - lastMessageCount = conversation.length; - } catch (error: unknown) { - debug(`getConversation threw: ${getErrorMessage(error)}`); - return; - } - - if (newMessages.length > 0) { - debug(`Polled ${newMessages.length} new message(s) from agent conversation`); - } - - for (const msg of newMessages) { - const contentType = typeof msg.content; - const contentLen = msg.content ? String(msg.content).length : 0; - debug(`message: role=${msg.role}, contentType=${contentType}, length=${contentLen}`); - - if (msg.role === 'user' || !msg.content) { - debug(`skipping message (role=${msg.role}, hasContent=${Boolean(msg.content)})`); - continue; - } - - try { - await telegram.sendMessage(chatIdRef.value, msg.content); - debug(`Sent agent response to Telegram (role: ${msg.role}, length: ${contentLen})`); - } catch (error: unknown) { - const message = getErrorMessage(error); - ui.error(`Failed to send agent response to Telegram: ${message}`); - debug(`sendMessage failed: ${message}`); - } - } - }, AGENT_POLL_INTERVAL_MS); -} - -function setupGracefulShutdown( - manager: ChannelManager, - pollInterval: NodeJS.Timeout, - channelService: ChannelService, - channelName: string, -): void { - const shutdown = async () => { - debug('Shutdown signal received'); - ui.info('\nShutting down...'); - clearInterval(pollInterval); - debug('Output polling stopped'); - await manager.stopAll(); - debug('ChannelManager stopped'); - await channelService.unregisterBridge(channelName); - debug(`Removed channel bridge entry: ${channelName}`); - ui.success('Channel bridge stopped.'); - process.exit(0); + return { + command: process.execPath, + args: [path.resolve(__dirname, '..', 'channel-daemon.js')], }; - - process.on('SIGINT', shutdown); - process.on('SIGTERM', shutdown); } export function registerChannelCommand(program: Command): void { @@ -341,6 +173,7 @@ export function registerChannelCommand(program: Command): void { .command('start [name]') .description('Start the channel bridge to a running agent') .requiredOption('--agent ', 'Name of the agent to bridge') + .option('--daemon', 'Start the channel bridge in the background') .option('--debug', 'Enable debug logging') .action(withErrorHandler('start channel bridge', async (name: string | undefined, options) => { if (options.debug) { @@ -363,91 +196,61 @@ export function registerChannelCommand(program: Command): void { } return; } - if (runningBridge) { - ui.error(`Channel "${channelName}" bridge is already running (PID: ${runningBridge.bridgePid}).`); - return; - } - const telegramConfig = channelEntry.config as TelegramConfig; - debug(`Telegram channel "${channelName}" found: bot=@${telegramConfig.botUsername}`); - - debug(`Resolving agent: "${options.agent}"`); - const agentManager = createAgentManager(); - const agent = await resolveTargetAgent(agentManager, options.agent); - if (!agent) return; + if (options.daemon) { + const daemonLaunch = resolveDaemonLaunch(); + const daemonArgs = [ + ...daemonLaunch.args, + '--channel', + channelName, + '--agent', + options.agent, + ]; + if (options.debug) { + daemonArgs.push('--debug'); + } - debug(`Agent resolved: name=${agent.name}, type=${agent.type}, pid=${agent.pid}`); - debug(`Agent session file: ${agent.sessionFilePath ?? 'none'}`); + const bridge = await channelService.startDaemonBridge({ + channelName, + channelType: TELEGRAM_CHANNEL_TYPE, + agentName: options.agent, + command: daemonLaunch.command, + args: daemonArgs, + cwd: process.cwd(), + }); - const agentAdapter = agentManager.getAdapter(agent.type); - if (!agentAdapter) { - ui.error(`Unsupported agent type: ${agent.type}`); + ui.success(`Channel bridge daemon started for "${channelName}" (PID: ${bridge.bridgePid}).`); + if (bridge.logPath) { + ui.info(`Logs: ${bridge.logPath}`); + } + ui.info(`Run "ai-devkit channel stop ${channelName}" to stop it.`); return; } - debug(`Agent adapter loaded for type: ${agent.type}`); - - debug(`Looking up terminal for PID: ${agent.pid}`); - const focusManager = new TerminalFocusManager(); - const terminalLocation = await focusManager.findTerminal(agent.pid); - - if (!terminalLocation) { - ui.error(`Cannot find terminal for agent "${agent.name}" (PID: ${agent.pid}).`); + if (runningBridge) { + ui.error(`Channel "${channelName}" bridge is already running (PID: ${runningBridge.bridgePid}).`); return; } - debug(`Terminal found: ${JSON.stringify(terminalLocation)}`); - - const telegram = new TelegramAdapter({ botToken: telegramConfig.botToken }); - const chatIdRef = { - value: telegramConfig.authorizedChatId !== undefined - ? String(telegramConfig.authorizedChatId) - : null, - }; - - setupInputHandler(telegram, terminalLocation, chatIdRef, async (chatId) => { - const latest = await configStore.getChannel(channelName); - if (!latest) return; - const latestTelegramConfig = latest.config as TelegramConfig; - await configStore.saveChannel(channelName, { - ...latest, - config: { - ...latestTelegramConfig, - authorizedChatId: Number(chatId), - }, - }); - }); - debug(`Starting output polling (interval: ${AGENT_POLL_INTERVAL_MS}ms)`); - const pollInterval = startOutputPolling(telegram, agentAdapter, agent, chatIdRef); - - const manager = new ChannelManager(); - manager.registerAdapter(telegram); - setupGracefulShutdown(manager, pollInterval, channelService, channelName); - - ui.success(`Bridge started: ${channelName} (@${telegramConfig.botUsername}) <-> Agent "${agent.name}" (PID: ${agent.pid})`); - ui.info('Send a message to your Telegram bot to start chatting.'); - ui.info('Press Ctrl+C to stop.\n'); - - await channelService.registerBridge({ + await runChannelBridge({ channelName, - channelType: TELEGRAM_CHANNEL_TYPE, - agentName: agent.name, - agentPid: agent.pid, - bridgePid: process.pid, - startedAt: new Date().toISOString(), + agentName: options.agent, + configStore, + channelService, }); - debug(`Registered channel bridge entry: ${channelName}`); + })); - try { - debug('Calling manager.startAll()'); - await manager.startAll(); - debug('ChannelManager started successfully'); - } catch (error) { - await channelService.unregisterBridge(channelName); - throw error; + channelCommand + .command('stop [name]') + .description('Stop a running channel bridge') + .action(withErrorHandler('stop channel bridge', async (name: string | undefined) => { + const result = await channelService.stopBridge(name); + if (!result.stopped || !result.bridge) { + ui.info('No running channel bridge found.'); + return; } - await new Promise(() => {}); + ui.success(`Channel bridge stopped: ${result.bridge.channelName} (PID: ${result.bridge.bridgePid}).`); })); channelCommand @@ -475,6 +278,9 @@ export function registerChannelCommand(program: Command): void { ui.text(` Bot: @${telegramConfig.botUsername || 'unknown'}`); ui.text(` Authorized: ${telegramConfig.authorizedChatId ? 'yes' : 'no'}`); ui.text(` Bridge: ${bridge ? chalk.green(`running (PID: ${bridge.bridgePid}, agent: ${bridge.agentName})`) : chalk.dim('stopped')}`); + if (bridge?.logPath) { + ui.text(` Logs: ${bridge.logPath}`); + } ui.text(` Configured: ${entry.createdAt || 'unknown'}`); ui.breakline(); } diff --git a/packages/cli/src/services/channel/channel-runner.ts b/packages/cli/src/services/channel/channel-runner.ts new file mode 100644 index 00000000..29303d51 --- /dev/null +++ b/packages/cli/src/services/channel/channel-runner.ts @@ -0,0 +1,306 @@ +import inquirer from 'inquirer'; +import { + AgentManager, + ClaudeCodeAdapter, + CodexAdapter, + GeminiCliAdapter, + TerminalFocusManager, + TtyWriter, + type AgentAdapter, + type AgentInfo, + type TerminalLocation, +} from '@ai-devkit/agent-manager'; +import { + ChannelManager, + ConfigStore, + TelegramAdapter, + TELEGRAM_CHANNEL_TYPE, + type TelegramConfig, +} from '@ai-devkit/channel-connector'; +import { ui } from '../../util/terminal-ui'; +import { getErrorMessage } from '../../util/text'; +import { createLogger } from '../../util/debug'; +import { ChannelService } from './channel.service'; + +const debug = createLogger('channel'); +const AGENT_POLL_INTERVAL_MS = 2000; + +export interface RunChannelBridgeInput { + channelName: string; + agentName: string; + configStore?: ConfigStore; + channelService?: ChannelService; +} + +function createAgentManager(): AgentManager { + const manager = new AgentManager(); + manager.registerAdapter(new ClaudeCodeAdapter()); + manager.registerAdapter(new CodexAdapter()); + manager.registerAdapter(new GeminiCliAdapter()); + return manager; +} + +async function resolveTargetAgent(agentManager: AgentManager, agentName: string): Promise { + const agents = await agentManager.listAgents(); + + if (agents.length === 0) { + ui.error('No running agents detected.'); + return null; + } + + const resolved = agentManager.resolveAgent(agentName, agents); + if (!resolved) { + ui.error(`No agent found matching "${agentName}".`); + ui.info('Available agents:'); + agents.forEach(a => ui.text(` - ${a.name}`)); + return null; + } + + if (Array.isArray(resolved)) { + const { selectedAgent } = await inquirer.prompt([{ + type: 'list', + name: 'selectedAgent', + message: 'Multiple agents match. Select one:', + choices: resolved.map(a => ({ + name: `${a.name} (PID: ${a.pid})`, + value: a, + })), + }]); + return selectedAgent; + } + + return resolved as AgentInfo; +} + +function setupInputHandler( + telegram: TelegramAdapter, + terminalLocation: TerminalLocation, + chatIdRef: { value: string | null }, + onAuthorize?: (chatId: string) => Promise, +): void { + telegram.onMessage(async (msg) => { + debug(`Received message from chat ID: ${msg.chatId}, text length: ${msg.text?.length ?? 0}`); + + if (!chatIdRef.value) { + chatIdRef.value = msg.chatId; + await onAuthorize?.(msg.chatId); + ui.info(`Authorized Telegram user (chat ID: ${msg.chatId})`); + } + + if (msg.chatId !== chatIdRef.value) { + debug(`Rejected message from unauthorized chat ID: ${msg.chatId}`); + await telegram.sendMessage(msg.chatId, 'Unauthorized. Only the first user is allowed.'); + return; + } + + try { + await TtyWriter.send(terminalLocation, msg.text); + debug(`Sent message to agent terminal (length: ${msg.text?.length ?? 0})`); + } catch (error: unknown) { + const message = getErrorMessage(error); + ui.error(`Failed to send to agent: ${message}`); + await telegram.sendMessage(msg.chatId, `Failed to send to agent: ${message}`); + } + }); +} + +export function startOutputPolling( + telegram: TelegramAdapter, + agentAdapter: AgentAdapter, + agent: AgentInfo, + chatIdRef: { value: string | null }, +): NodeJS.Timeout { + let lastMessageCount = 0; + + debug(`startOutputPolling: sessionFilePath=${agent.sessionFilePath ?? 'null'}`); + + if (agent.sessionFilePath) { + try { + const existing = agentAdapter.getConversation(agent.sessionFilePath); + lastMessageCount = existing.length; + debug(`Initial conversation length: ${lastMessageCount}`); + } catch (error: unknown) { + debug(`Initial getConversation threw: ${getErrorMessage(error)}`); + } + } + + let tickCount = 0; + let lastReportedLength = lastMessageCount; + + return setInterval(async () => { + tickCount += 1; + + if (!chatIdRef.value) { + if (tickCount % 15 === 1) { + debug(`poll skip: no authorized chat yet (tick ${tickCount})`); + } + return; + } + if (!agent.sessionFilePath) { + if (tickCount % 15 === 1) { + debug(`poll skip: agent has no sessionFilePath (tick ${tickCount})`); + } + return; + } + + let newMessages; + try { + const conversation = agentAdapter.getConversation(agent.sessionFilePath); + newMessages = conversation.slice(lastMessageCount); + if (conversation.length !== lastReportedLength) { + debug(`Conversation length changed: ${lastReportedLength} -> ${conversation.length} (lastMessageCount=${lastMessageCount}, new=${newMessages.length})`); + lastReportedLength = conversation.length; + } + lastMessageCount = conversation.length; + } catch (error: unknown) { + debug(`getConversation threw: ${getErrorMessage(error)}`); + return; + } + + if (newMessages.length > 0) { + debug(`Polled ${newMessages.length} new message(s) from agent conversation`); + } + + for (const msg of newMessages) { + const contentType = typeof msg.content; + const contentLen = msg.content ? String(msg.content).length : 0; + debug(`message: role=${msg.role}, contentType=${contentType}, length=${contentLen}`); + + if (msg.role === 'user' || !msg.content) { + debug(`skipping message (role=${msg.role}, hasContent=${Boolean(msg.content)})`); + continue; + } + + try { + await telegram.sendMessage(chatIdRef.value, msg.content); + debug(`Sent agent response to Telegram (role: ${msg.role}, length: ${contentLen})`); + } catch (error: unknown) { + const message = getErrorMessage(error); + ui.error(`Failed to send agent response to Telegram: ${message}`); + debug(`sendMessage failed: ${message}`); + } + } + }, AGENT_POLL_INTERVAL_MS); +} + +function setupGracefulShutdown( + manager: ChannelManager, + pollInterval: NodeJS.Timeout, + channelService: ChannelService, + channelName: string, +): void { + const shutdown = async () => { + debug('Shutdown signal received'); + ui.info('\nShutting down...'); + clearInterval(pollInterval); + debug('Output polling stopped'); + await manager.stopAll(); + debug('ChannelManager stopped'); + await channelService.unregisterBridge(channelName); + debug(`Removed channel bridge entry: ${channelName}`); + ui.success('Channel bridge stopped.'); + process.exit(0); + }; + + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); +} + +export async function runChannelBridge(input: RunChannelBridgeInput): Promise { + const configStore = input.configStore ?? new ConfigStore(); + const channelService = input.channelService ?? new ChannelService(); + + debug(`Starting channel bridge: channel=${input.channelName}, agent=${input.agentName}`); + const channelEntry = await configStore.getChannel(input.channelName); + const runningBridge = await channelService.getLiveBridgeByChannel(input.channelName); + + if (!channelEntry) { + ui.error(`No channel configured with name "${input.channelName}".`); + return; + } + if (runningBridge && runningBridge.bridgePid !== process.pid) { + ui.error(`Channel "${input.channelName}" bridge is already running (PID: ${runningBridge.bridgePid}).`); + return; + } + + const telegramConfig = channelEntry.config as TelegramConfig; + debug(`Telegram channel "${input.channelName}" found: bot=@${telegramConfig.botUsername}`); + + debug(`Resolving agent: "${input.agentName}"`); + const agentManager = createAgentManager(); + const agent = await resolveTargetAgent(agentManager, input.agentName); + if (!agent) return; + + debug(`Agent resolved: name=${agent.name}, type=${agent.type}, pid=${agent.pid}`); + debug(`Agent session file: ${agent.sessionFilePath ?? 'none'}`); + + const agentAdapter = agentManager.getAdapter(agent.type); + if (!agentAdapter) { + ui.error(`Unsupported agent type: ${agent.type}`); + return; + } + + debug(`Agent adapter loaded for type: ${agent.type}`); + + debug(`Looking up terminal for PID: ${agent.pid}`); + const focusManager = new TerminalFocusManager(); + const terminalLocation = await focusManager.findTerminal(agent.pid); + + if (!terminalLocation) { + ui.error(`Cannot find terminal for agent "${agent.name}" (PID: ${agent.pid}).`); + return; + } + + debug(`Terminal found: ${JSON.stringify(terminalLocation)}`); + + const telegram = new TelegramAdapter({ botToken: telegramConfig.botToken }); + const chatIdRef = { + value: telegramConfig.authorizedChatId !== undefined + ? String(telegramConfig.authorizedChatId) + : null, + }; + + setupInputHandler(telegram, terminalLocation, chatIdRef, async (chatId) => { + const latest = await configStore.getChannel(input.channelName); + if (!latest) return; + const latestTelegramConfig = latest.config as TelegramConfig; + await configStore.saveChannel(input.channelName, { + ...latest, + config: { + ...latestTelegramConfig, + authorizedChatId: Number(chatId), + }, + }); + }); + debug(`Starting output polling (interval: ${AGENT_POLL_INTERVAL_MS}ms)`); + const pollInterval = startOutputPolling(telegram, agentAdapter, agent, chatIdRef); + + const manager = new ChannelManager(); + manager.registerAdapter(telegram); + setupGracefulShutdown(manager, pollInterval, channelService, input.channelName); + + ui.success(`Bridge started: ${input.channelName} (@${telegramConfig.botUsername}) <-> Agent "${agent.name}" (PID: ${agent.pid})`); + ui.info('Send a message to your Telegram bot to start chatting.'); + ui.info('Press Ctrl+C to stop.\n'); + + await channelService.registerBridge({ + channelName: input.channelName, + channelType: TELEGRAM_CHANNEL_TYPE, + agentName: agent.name, + agentPid: agent.pid, + bridgePid: process.pid, + startedAt: new Date().toISOString(), + }); + debug(`Registered channel bridge entry: ${input.channelName}`); + + try { + debug('Calling manager.startAll()'); + await manager.startAll(); + debug('ChannelManager started successfully'); + } catch (error) { + await channelService.unregisterBridge(input.channelName); + throw error; + } + + await new Promise(() => {}); +} diff --git a/packages/cli/src/services/channel/channel.service.ts b/packages/cli/src/services/channel/channel.service.ts index fcbff1db..6da4cf5d 100644 --- a/packages/cli/src/services/channel/channel.service.ts +++ b/packages/cli/src/services/channel/channel.service.ts @@ -1,14 +1,11 @@ import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; -import { - TELEGRAM_CHANNEL_TYPE, - type ChannelConfig, - type TelegramConfig, -} from '@ai-devkit/channel-connector'; +import { spawn } from 'child_process'; +import type { ChannelConfig, TelegramConfig } from '@ai-devkit/channel-connector'; const DEFAULT_REGISTRY_PATH = path.join(os.homedir(), '.ai-devkit', 'channel-bridges.json'); -const DEFAULT_TELEGRAM_CHANNEL_NAME = TELEGRAM_CHANNEL_TYPE; +const DEFAULT_TELEGRAM_CHANNEL_NAME = 'telegram'; const CHANNEL_NAME_PATTERN = /^[a-z0-9](?:[a-z0-9-]*[a-z0-9])?$/; export interface ChannelBridgeProcess { @@ -18,6 +15,7 @@ export interface ChannelBridgeProcess { agentPid: number; bridgePid: number; startedAt: string; + logPath?: string; } interface ChannelBridgeFile { @@ -25,6 +23,26 @@ interface ChannelBridgeFile { } type PidChecker = (pid: number) => boolean; +type DetachedSpawner = ( + command: string, + args: string[], + options: { cwd: string; detached: true; stdio: ['ignore', number, number] }, +) => { pid?: number; unref: () => void }; +type ProcessKiller = (pid: number, signal: NodeJS.Signals) => void; + +export interface StartDaemonBridgeInput { + channelName: string; + channelType: string; + agentName: string; + command: string; + args: string[]; + cwd: string; +} + +export interface StopBridgeResult { + stopped: boolean; + bridge?: ChannelBridgeProcess; +} function defaultPidChecker(pid: number): boolean { try { @@ -39,6 +57,10 @@ export class ChannelService { constructor( private readonly registryPath = DEFAULT_REGISTRY_PATH, private readonly isPidAlive: PidChecker = defaultPidChecker, + private readonly spawnDetached: DetachedSpawner = (command, args, options) => spawn(command, args, options), + private readonly killProcess: ProcessKiller = (pid, signal) => { + process.kill(pid, signal); + }, ) {} resolveConnectChannelName(name: string | undefined): string { @@ -51,7 +73,7 @@ export class ChannelService { assertUniqueTelegramToken(config: ChannelConfig, targetName: string, botToken: string): void { for (const [name, entry] of Object.entries(config.channels)) { - if (name === targetName || entry.type !== TELEGRAM_CHANNEL_TYPE) continue; + if (name === targetName || entry.type !== DEFAULT_TELEGRAM_CHANNEL_NAME) continue; const telegramConfig = entry.config as TelegramConfig; if (telegramConfig.botToken === botToken) { throw new Error(`Telegram bot token is already configured for channel "${name}".`); @@ -63,7 +85,7 @@ export class ChannelService { if (name !== undefined) return this.resolveConnectChannelName(name); const telegramChannels = Object.entries(config.channels) - .filter(([, entry]) => entry.type === TELEGRAM_CHANNEL_TYPE) + .filter(([, entry]) => entry.type === DEFAULT_TELEGRAM_CHANNEL_NAME) .map(([channelName]) => channelName); if (telegramChannels.length === 1) return telegramChannels[0]; @@ -94,6 +116,70 @@ export class ChannelService { await this.writeBridgeRegistry(registry); } + async startDaemonBridge(input: StartDaemonBridgeInput): Promise { + const runningBridge = await this.getLiveBridgeByChannel(input.channelName); + if (runningBridge) { + throw new Error(`Channel "${input.channelName}" bridge is already running (PID: ${runningBridge.bridgePid}).`); + } + + const logPath = this.getBridgeLogPath(input.channelName); + const logFd = this.openBridgeLog(input, logPath); + let child: { pid?: number; unref: () => void }; + try { + child = this.spawnDetached(input.command, input.args, { + cwd: input.cwd, + detached: true, + stdio: ['ignore', logFd, logFd], + }); + } finally { + fs.closeSync(logFd); + } + + if (!child.pid) { + throw new Error('Failed to start channel bridge daemon: child process did not report a PID.'); + } + + child.unref(); + + const bridge: ChannelBridgeProcess = { + channelName: input.channelName, + channelType: input.channelType, + agentName: input.agentName, + agentPid: 0, + bridgePid: child.pid, + startedAt: new Date().toISOString(), + logPath, + }; + + await this.registerBridge(bridge); + return bridge; + } + + async stopBridge(channelName?: string): Promise { + const liveBridges = await this.getLiveBridges(); + + if (liveBridges.length === 0) { + return { stopped: false }; + } + + let bridge: ChannelBridgeProcess | undefined; + if (channelName) { + bridge = liveBridges.find(candidate => candidate.channelName === this.resolveConnectChannelName(channelName)); + if (!bridge) { + return { stopped: false }; + } + } else if (liveBridges.length === 1) { + bridge = liveBridges[0]; + } else { + throw new Error(`Multiple channel bridges are running. Specify one: ${liveBridges.map(candidate => candidate.channelName).join(', ')}`); + } + + this.killProcess(bridge.bridgePid, 'SIGTERM'); + await this.unregisterBridge(bridge.channelName); + + return { stopped: true, bridge }; + } + async unregisterBridge(channelName: string): Promise { const registry = await this.readBridgeRegistry(); delete registry.bridges[channelName]; @@ -114,4 +200,16 @@ export class ChannelService { fs.mkdirSync(dir, { recursive: true }); fs.writeFileSync(this.registryPath, JSON.stringify(registry, null, 2), { mode: 0o600 }); } + + private getBridgeLogPath(channelName: string): string { + return path.join(path.dirname(this.registryPath), 'channel-logs', `${channelName}.log`); + } + + private openBridgeLog(input: StartDaemonBridgeInput, logPath: string): number { + fs.mkdirSync(path.dirname(logPath), { recursive: true }); + const logFd = fs.openSync(logPath, 'a', 0o600); + fs.writeSync(logFd, `[${new Date().toISOString()}] Starting channel daemon: ${input.channelName} -> ${input.agentName}\n`); + fs.writeSync(logFd, `[command] ${input.command} ${input.args.join(' ')}\n`); + return logFd; + } }