From af5dee45b49354c71f5db8d1661bc5ba2e6340dd Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 10 Apr 2026 16:05:54 +0800 Subject: [PATCH] add task scheduling and management APIs --- README.md | 17 ++ docs/SPEC.md | 146 +++++++------ package.json | 1 + scripts/test-task-sdk-e2e.ts | 119 +++++++++++ src/agent-impl.ts | 280 +++++++++++++++++++++---- src/agentlite-impl.test.ts | 12 +- src/agentlite-impl.ts | 42 +++- src/api/agent.ts | 21 ++ src/api/sdk.ts | 11 + src/api/task.ts | 87 ++++++++ src/db.test.ts | 51 +++++ src/db.ts | 13 ++ src/ipc.ts | 94 ++++----- src/sdk-types.test.ts | 18 ++ src/task-api.test.ts | 387 +++++++++++++++++++++++++++++++++++ src/task-lifecycle.test.ts | 215 +++++++++++++++++++ src/task-utils.ts | 42 ++++ 17 files changed, 1402 insertions(+), 154 deletions(-) create mode 100644 scripts/test-task-sdk-e2e.ts create mode 100644 src/api/task.ts create mode 100644 src/task-api.test.ts create mode 100644 src/task-lifecycle.test.ts create mode 100644 src/task-utils.ts diff --git a/README.md b/README.md index 2548574e9c0..05478121966 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,22 @@ agent.addChannel( telegram({ token: process.env.TELEGRAM_BOT_TOKEN! }), ); await agent.start(); + +await agent.registerGroup('self-chat', { + name: 'Main', + folder: 'main', + trigger: 'always', + isMain: true, +}); + +const task = await agent.scheduleTask({ + jid: 'self-chat', + prompt: 'Send the weekly status summary.', + scheduleType: 'cron', + scheduleValue: '0 9 * * 1', +}); + +console.log(task.id); ``` ## Quick Start @@ -75,6 +91,7 @@ Key files: - `src/api/sdk.ts` - Public API: `createAgentLite()`, `AgentLite` interface - `src/api/agent.ts` - Public API: `Agent` interface +- `src/api/task.ts` - Public task types for SDK scheduling and task management - `src/api/channel-driver.ts` - Public API: `ChannelDriver` interface - `src/agentlite-impl.ts` - AgentLite implementation (hidden from consumers) - `src/agent-impl.ts` - Agent implementation: message loop, channels, groups diff --git a/docs/SPEC.md b/docs/SPEC.md index 815a7db3c9c..9dab517eb63 100644 --- a/docs/SPEC.md +++ b/docs/SPEC.md @@ -73,14 +73,14 @@ A personal Claude assistant with multi-channel support, persistent memory per co ### Technology Stack -| Component | Technology | Purpose | -|-----------|------------|---------| -| Channel System | Channel registry (`src/channels/registry.ts`) | Channels self-register at startup | -| Message Storage | SQLite (better-sqlite3) | Store messages for polling | -| Container Runtime | Containers (Linux VMs) | Isolated environments for agent execution | -| Agent | @anthropic-ai/claude-agent-sdk (0.2.29) | Run Claude with tools and MCP servers | -| Browser Automation | agent-browser + Chromium | Web interaction and screenshots | -| Runtime | Node.js 20+ | Host process for routing and scheduling | +| Component | Technology | Purpose | +| ------------------ | --------------------------------------------- | ----------------------------------------- | +| Channel System | Channel registry (`src/channels/registry.ts`) | Channels self-register at startup | +| Message Storage | SQLite (better-sqlite3) | Store messages for polling | +| Container Runtime | Containers (Linux VMs) | Isolated environments for agent execution | +| Agent | @anthropic-ai/claude-agent-sdk (0.2.29) | Run Claude with tools and MCP servers | +| Browser Automation | agent-browser + Chromium | Web interaction and screenshots | +| Runtime | Node.js 20+ | Host process for routing and scheduling | --- @@ -183,7 +183,9 @@ Channels self-register using a barrel-import pattern: // src/channels/whatsapp.ts import { registerChannel, ChannelOpts } from './registry.js'; - export class WhatsAppChannel implements Channel { /* ... */ } + export class WhatsAppChannel implements Channel { + /* ... */ + } registerChannel('whatsapp', (opts: ChannelOpts) => { // Return null if credentials are missing @@ -215,13 +217,13 @@ Channels self-register using a barrel-import pattern: ### Key Files -| File | Purpose | -|------|---------| -| `src/channels/registry.ts` | Channel factory registry | -| `src/channels/index.ts` | Barrel imports that trigger channel self-registration | -| `src/types.ts` | `Channel` interface, `ChannelOpts`, message types | -| `src/orchestrator.ts` | Orchestrator — instantiates channels, runs message loop | -| `src/router.ts` | Finds the owning channel for a JID, formats messages | +| File | Purpose | +| -------------------------- | ------------------------------------------------------- | +| `src/channels/registry.ts` | Channel factory registry | +| `src/channels/index.ts` | Barrel imports that trigger channel self-registration | +| `src/types.ts` | `Channel` interface, `ChannelOpts`, message types | +| `src/orchestrator.ts` | Orchestrator — instantiates channels, runs message loop | +| `src/router.ts` | Finds the owning channel for a JID, formats messages | ### Adding a New Channel @@ -342,11 +344,18 @@ export const GROUPS_DIR = path.resolve(PROJECT_ROOT, 'groups'); export const DATA_DIR = path.resolve(PROJECT_ROOT, 'data'); // Container configuration -export const BOX_IMAGE = process.env.BOX_IMAGE || 'ghcr.io/boxlite-ai/agentlite-agent:latest'; -export const CONTAINER_TIMEOUT = parseInt(process.env.CONTAINER_TIMEOUT || '1800000', 10); // 30min default +export const BOX_IMAGE = + process.env.BOX_IMAGE || 'ghcr.io/boxlite-ai/agentlite-agent:latest'; +export const CONTAINER_TIMEOUT = parseInt( + process.env.CONTAINER_TIMEOUT || '1800000', + 10, +); // 30min default export const IPC_POLL_INTERVAL = 1000; export const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '1800000', 10); // 30min — keep container alive after last result -export const MAX_CONCURRENT_CONTAINERS = Math.max(1, parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5); +export const MAX_CONCURRENT_CONTAINERS = Math.max( + 1, + parseInt(process.env.MAX_CONCURRENT_CONTAINERS || '5', 10) || 5, +); export const TRIGGER_PATTERN = new RegExp(`^@${ASSISTANT_NAME}\\b`, 'i'); ``` @@ -358,16 +367,16 @@ export const TRIGGER_PATTERN = new RegExp(`^@${ASSISTANT_NAME}\\b`, 'i'); Groups can have additional directories mounted via `containerConfig` in the SQLite `registered_groups` table (stored as JSON in the `container_config` column). Example registration: ```typescript -setRegisteredGroup("1234567890@g.us", { - name: "Dev Team", - folder: "whatsapp_dev-team", - trigger: "@Andy", +setRegisteredGroup('1234567890@g.us', { + name: 'Dev Team', + folder: 'whatsapp_dev-team', + trigger: '@Andy', added_at: new Date().toISOString(), containerConfig: { additionalMounts: [ { - hostPath: "~/projects/webapp", - containerPath: "webapp", + hostPath: '~/projects/webapp', + containerPath: 'webapp', readonly: false, }, ], @@ -387,12 +396,15 @@ Additional mounts appear at `/workspace/extra/{containerPath}` inside the contai Configure authentication in a `.env` file in the project root. Two options: **Option 1: Claude Subscription (OAuth token)** + ```bash CLAUDE_CODE_OAUTH_TOKEN=sk-ant-oat01-... ``` + The token can be extracted from `~/.claude/.credentials.json` if you're logged in to Claude Code. **Option 2: Pay-per-use API Key** + ```bash ANTHROPIC_API_KEY=sk-ant-api03-... ``` @@ -408,12 +420,14 @@ ASSISTANT_NAME=Bot npm start ``` Or edit the default in `src/config.ts`. This changes: + - The trigger pattern (messages must start with `@YourName`) - The response prefix (`YourName:` added automatically) ### Placeholder Values in launchd Files with `{{PLACEHOLDER}}` values need to be configured: + - `{{PROJECT_ROOT}}` - Absolute path to your agentlite installation - `{{NODE_PATH}}` - Path to node binary (detected via `which node`) - `{{HOME}}` - User's home directory @@ -426,11 +440,11 @@ AgentLite uses a hierarchical memory system based on CLAUDE.md files. ### Memory Hierarchy -| Level | Location | Read By | Written By | Purpose | -|-------|----------|---------|------------|---------| -| **Global** | `groups/CLAUDE.md` | All groups | Main only | Preferences, facts, context shared across all conversations | -| **Group** | `groups/{name}/CLAUDE.md` | That group | That group | Group-specific context, conversation memory | -| **Files** | `groups/{name}/*.md` | That group | That group | Notes, research, documents created during conversation | +| Level | Location | Read By | Written By | Purpose | +| ---------- | ------------------------- | ---------- | ---------- | ----------------------------------------------------------- | +| **Global** | `groups/CLAUDE.md` | All groups | Main only | Preferences, facts, context shared across all conversations | +| **Group** | `groups/{name}/CLAUDE.md` | That group | That group | Group-specific context, conversation memory | +| **Files** | `groups/{name}/*.md` | That group | That group | Notes, research, documents created during conversation | ### How Memory Works @@ -515,6 +529,7 @@ Sessions enable conversation continuity - Claude remembers what you talked about ### Trigger Word Matching Messages must start with the trigger pattern (default: `@Andy`): + - `@Andy what's the weather?` → ✅ Triggers Claude - `@andy help me` → ✅ Triggers (case insensitive) - `Hey @Andy` → ❌ Ignored (trigger not at start) @@ -538,18 +553,18 @@ This allows the agent to understand the conversation context even if it wasn't m ### Commands Available in Any Group -| Command | Example | Effect | -|---------|---------|--------| +| Command | Example | Effect | +| ---------------------- | --------------------------- | -------------- | | `@Assistant [message]` | `@Andy what's the weather?` | Talk to Claude | ### Commands Available in Main Channel Only -| Command | Example | Effect | -|---------|---------|--------| -| `@Assistant add group "Name"` | `@Andy add group "Family Chat"` | Register a new group | -| `@Assistant remove group "Name"` | `@Andy remove group "Work Team"` | Unregister a group | -| `@Assistant list groups` | `@Andy list groups` | Show registered groups | -| `@Assistant remember [fact]` | `@Andy remember I prefer dark mode` | Add to global memory | +| Command | Example | Effect | +| -------------------------------- | ----------------------------------- | ---------------------- | +| `@Assistant add group "Name"` | `@Andy add group "Family Chat"` | Register a new group | +| `@Assistant remove group "Name"` | `@Andy remove group "Work Team"` | Unregister a group | +| `@Assistant list groups` | `@Andy list groups` | Show registered groups | +| `@Assistant remember [fact]` | `@Andy remember I prefer dark mode` | Add to global memory | --- @@ -566,11 +581,11 @@ AgentLite has a built-in scheduler that runs tasks as full agents in their group ### Schedule Types -| Type | Value Format | Example | -|------|--------------|---------| -| `cron` | Cron expression | `0 9 * * 1` (Mondays at 9am) | -| `interval` | Milliseconds | `3600000` (every hour) | -| `once` | ISO timestamp | `2024-12-25T09:00:00Z` | +| Type | Value Format | Example | +| ---------- | --------------- | ---------------------------- | +| `cron` | Cron expression | `0 9 * * 1` (Mondays at 9am) | +| `interval` | Milliseconds | `3600000` (every hour) | +| `once` | ISO timestamp | `2024-12-25T09:00:00Z` | ### Creating a Task @@ -603,15 +618,24 @@ Claude: [calls mcp__agentlite__schedule_task] ### Managing Tasks From any group: + - `@Andy list my scheduled tasks` - View tasks for this group - `@Andy pause task [id]` - Pause a task - `@Andy resume task [id]` - Resume a paused task - `@Andy cancel task [id]` - Delete a task From main channel: + - `@Andy list all tasks` - View tasks from all groups - `@Andy schedule task for "Family Chat": [prompt]` - Schedule for another group +From the public Node SDK: + +- `agent.scheduleTask(...)` - Create a scheduled task for a registered group +- `agent.listTasks(...)` - List tasks, optionally filtered by JID or status +- `agent.getTask(taskId)` - Read task details plus run history +- `agent.updateTask(...)`, `pauseTask(...)`, `resumeTask(...)`, `cancelTask(...)` - Manage existing tasks + --- ## MCP Servers @@ -625,13 +649,14 @@ The `agentlite` MCP server is created dynamically per agent call with the curren |------|---------| | `schedule_task` | Schedule a recurring or one-time task | | `list_tasks` | Show tasks (group's tasks, or all if main) | -| `get_task` | Get task details and run history | | `update_task` | Modify task prompt or schedule | | `pause_task` | Pause a task | | `resume_task` | Resume a paused task | | `cancel_task` | Delete a task | | `send_message` | Send a message to the group via its channel | +Note: the built-in MCP surface does not currently expose `get_task`; detailed task reads are available through the public SDK `agent.getTask(taskId)` API. + --- ## Deployment @@ -641,6 +666,7 @@ AgentLite runs as a single macOS launchd service. ### Startup Sequence When AgentLite starts, it: + 1. **Ensures container runtime is running** - Automatically starts it if needed; kills orphaned AgentLite containers from previous runs 2. Initializes the SQLite database (migrates from JSON files if they exist) 3. Loads state from SQLite (registered groups, sessions, router state) @@ -655,6 +681,7 @@ When AgentLite starts, it: ### Service: com.agentlite **launchd/com.agentlite.plist:** + ```xml @@ -716,6 +743,7 @@ tail -f logs/agentlite.log ### Container Isolation All agents run inside containers (lightweight Linux VMs), providing: + - **Filesystem isolation**: Agents can only access mounted directories - **Safe Bash access**: Commands run inside the container, not on your Mac - **Network isolation**: Can be configured per-container if needed @@ -727,6 +755,7 @@ All agents run inside containers (lightweight Linux VMs), providing: WhatsApp messages could contain malicious instructions attempting to manipulate Claude's behavior. **Mitigations:** + - Container isolation limits blast radius - Only registered groups are processed - Trigger word required (reduces accidental processing) @@ -735,6 +764,7 @@ WhatsApp messages could contain malicious instructions attempting to manipulate - Claude's built-in safety training **Recommendations:** + - Only register trusted groups - Review additional directory mounts carefully - Review scheduled tasks periodically @@ -742,14 +772,15 @@ WhatsApp messages could contain malicious instructions attempting to manipulate ### Credential Storage -| Credential | Storage Location | Notes | -|------------|------------------|-------| -| Claude CLI Auth | data/sessions/{group}/.claude/ | Per-group isolation, mounted to /home/node/.claude/ | -| WhatsApp Session | store/auth/ | Auto-created, persists ~20 days | +| Credential | Storage Location | Notes | +| ---------------- | ------------------------------ | --------------------------------------------------- | +| Claude CLI Auth | data/sessions/{group}/.claude/ | Per-group isolation, mounted to /home/node/.claude/ | +| WhatsApp Session | store/auth/ | Auto-created, persists ~20 days | ### File Permissions The groups/ folder contains personal memory and should be protected: + ```bash chmod 700 groups/ ``` @@ -760,15 +791,15 @@ chmod 700 groups/ ### Common Issues -| Issue | Cause | Solution | -|-------|-------|----------| -| No response to messages | Service not running | Check `launchctl list | grep agentlite` | -| "Claude Code process exited with code 1" | Container runtime failed to start | Check logs; AgentLite auto-starts container runtime but may fail | -| "Claude Code process exited with code 1" | Session mount path wrong | Ensure mount is to `/home/node/.claude/` not `/root/.claude/` | -| Session not continuing | Session ID not saved | Check SQLite: `sqlite3 store/messages.db "SELECT * FROM sessions"` | -| Session not continuing | Mount path mismatch | Container user is `node` with HOME=/home/node; sessions must be at `/home/node/.claude/` | -| "QR code expired" | WhatsApp session expired | Delete store/auth/ and restart | -| "No groups registered" | Haven't added groups | Use `@Andy add group "Name"` in main | +| Issue | Cause | Solution | +| ---------------------------------------- | --------------------------------- | ---------------------------------------------------------------------------------------- | +| No response to messages | Service not running | Check `launchctl list \| grep agentlite` | +| "Claude Code process exited with code 1" | Container runtime failed to start | Check logs; AgentLite auto-starts container runtime but may fail | +| "Claude Code process exited with code 1" | Session mount path wrong | Ensure mount is to `/home/node/.claude/` not `/root/.claude/` | +| Session not continuing | Session ID not saved | Check SQLite: `sqlite3 store/messages.db "SELECT * FROM sessions"` | +| Session not continuing | Mount path mismatch | Container user is `node` with HOME=/home/node; sessions must be at `/home/node/.claude/` | +| "QR code expired" | WhatsApp session expired | Delete store/auth/ and restart | +| "No groups registered" | Haven't added groups | Use `@Andy add group "Name"` in main | ### Log Location @@ -778,6 +809,7 @@ chmod 700 groups/ ### Debug Mode Run manually for verbose output: + ```bash npm run dev # or diff --git a/package.json b/package.json index 6827e020374..516c183b46e 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "prepare": "husky", "setup": "tsx setup/index.ts", "auth": "tsx src/whatsapp-auth.ts", + "test:e2e:tasks": "tsx scripts/test-task-sdk-e2e.ts", "lint": "eslint src/", "lint:fix": "eslint src/ --fix", "test": "vitest run", diff --git a/scripts/test-task-sdk-e2e.ts b/scripts/test-task-sdk-e2e.ts new file mode 100644 index 00000000000..dc828517247 --- /dev/null +++ b/scripts/test-task-sdk-e2e.ts @@ -0,0 +1,119 @@ +#!/usr/bin/env npx tsx + +import fs from 'fs'; +import os from 'os'; +import path from 'path'; + +import { createAgentLite } from '../src/api/sdk.js'; + +const workdir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-task-e2e-')); +const anthropicToken = + process.env.ANTHROPIC_AUTH_TOKEN ?? process.env.ANTHROPIC_API_KEY; +const anthropicBaseUrl = process.env.ANTHROPIC_BASE_URL; + +async function main(): Promise { + console.log('=== AgentLite Task SDK E2E ==='); + console.log(`Workdir: ${workdir}`); + console.log( + `Image: ${process.env.BOX_IMAGE || 'ghcr.io/boxlite-ai/agentlite-agent:latest'}`, + ); + if (!anthropicToken) { + throw new Error( + 'Missing ANTHROPIC_AUTH_TOKEN or ANTHROPIC_API_KEY for container credentials', + ); + } + + let platform = await createAgentLite({ workdir, timezone: 'UTC' }); + let agent = platform.getOrCreateAgent('main', { + name: 'Andy', + credentials: async () => ({ + ANTHROPIC_AUTH_TOKEN: anthropicToken, + ANTHROPIC_API_KEY: anthropicToken, + ...(anthropicBaseUrl ? { ANTHROPIC_BASE_URL: anthropicBaseUrl } : {}), + }), + }); + + try { + await agent.start(); + + await agent.registerGroup('main@test', { + name: 'Main', + folder: 'main', + trigger: 'always', + isMain: true, + }); + await agent.registerGroup('team@test', { + name: 'Task E2E', + folder: 'task_e2e', + trigger: '@Andy', + }); + + const task = await agent.scheduleTask({ + jid: 'team@test', + prompt: 'Reply with exactly: task sdk e2e ok', + scheduleType: 'once', + scheduleValue: '2024-01-01T00:00:00Z', + }); + + console.log(`Scheduled task ${task.id}`); + console.log( + 'Restarting through a fresh AgentLite instance so the scheduler immediately rechecks due tasks...', + ); + + await platform.stop(); + platform = await createAgentLite({ workdir, timezone: 'UTC' }); + agent = platform.getOrCreateAgent('main', { + name: 'Andy', + credentials: async () => ({ + ANTHROPIC_AUTH_TOKEN: anthropicToken, + ANTHROPIC_API_KEY: anthropicToken, + ...(anthropicBaseUrl ? { ANTHROPIC_BASE_URL: anthropicBaseUrl } : {}), + }), + }); + await agent.start(); + + const timeoutMs = 120_000; + const pollIntervalMs = 1_000; + const startedAt = Date.now(); + + while (Date.now() - startedAt < timeoutMs) { + const current = agent.getTask(task.id); + if (current?.lastRun && current.runs.length > 0) { + console.log('Task completed.'); + console.log(`Status: ${current.status}`); + console.log(`Last run: ${current.lastRun}`); + console.log(`Last result: ${current.lastResult}`); + console.log(`Run count: ${current.runs.length}`); + + if (current.status !== 'completed') { + throw new Error(`Expected completed status, got ${current.status}`); + } + if (current.runs.length < 1) { + throw new Error('Expected at least one run log entry'); + } + if (!current.lastRun) { + throw new Error('Expected lastRun to be set'); + } + + console.log('PASS: Task SDK E2E completed successfully'); + return; + } + + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); + } + + throw new Error(`Timed out waiting for task ${task.id} to complete`); + } finally { + await platform.stop(); + try { + fs.rmSync(workdir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + } +} + +main().catch((err) => { + console.error('FAIL:', err instanceof Error ? err.message : String(err)); + process.exit(1); +}); diff --git a/src/agent-impl.ts b/src/agent-impl.ts index dfb8084e763..cac661a4bab 100644 --- a/src/agent-impl.ts +++ b/src/agent-impl.ts @@ -23,7 +23,14 @@ import type { RegisteredGroup as PublicRegisteredGroup, } from './api/group.js'; import type { - ChannelDriver, + ListTasksOptions, + ScheduleTaskOptions, + Task, + TaskDetails, + TaskRun, + UpdateTaskOptions, +} from './api/task.js'; +import type { ChannelDriverFactory, ChannelDriverConfig, } from './api/channel-driver.js'; @@ -31,6 +38,8 @@ import type { Channel, NewMessage, RegisteredGroup as InternalRegisteredGroup, + ScheduledTask, + TaskRunLog, } from './types.js'; import { logger } from './logger.js'; @@ -47,6 +56,7 @@ import { GroupQueue } from './group-queue.js'; import { resolveGroupFolderPath } from './group-folder.js'; import { startIpcWatcher } from './ipc.js'; import { findChannel, formatMessages, formatOutbound } from './router.js'; +import { computeTaskNextRun, createTaskId } from './task-utils.js'; import { restoreRemoteControl, startRemoteControl, @@ -87,6 +97,33 @@ function cloneRegisteredGroup( }; } +function toPublicTask(task: ScheduledTask): Task { + return { + id: task.id, + jid: task.chat_jid, + groupFolder: task.group_folder, + prompt: task.prompt, + scheduleType: task.schedule_type, + scheduleValue: task.schedule_value, + contextMode: task.context_mode, + nextRun: task.next_run, + lastRun: task.last_run, + lastResult: task.last_result, + status: task.status, + createdAt: task.created_at, + }; +} + +function toPublicTaskRun(log: TaskRunLog): TaskRun { + return { + runAt: log.run_at, + durationMs: log.duration_ms, + status: log.status, + result: log.result, + error: log.error, + }; +} + // ─── Implementation (used by sdk.ts, not by consumers) ───────────── export class AgentImpl @@ -215,6 +252,152 @@ export class AgentImpl ); } + /** Schedule a task for a registered group. Only after start(). */ + async scheduleTask(options: ScheduleTaskOptions): Promise { + this.requireStartedForTaskApi('scheduleTask'); + this.requireTaskAdminAccess(); + + const group = this._registeredGroups[options.jid]; + if (!group) { + throw new Error( + `Cannot schedule task: group "${options.jid}" is not registered`, + ); + } + + const now = new Date().toISOString(); + const taskId = createTaskId(); + const contextMode = options.contextMode === 'group' ? 'group' : 'isolated'; + const nextRun = computeTaskNextRun( + options.scheduleType, + options.scheduleValue, + this.runtimeConfig.timezone, + ); + + this.db.createTask({ + id: taskId, + group_folder: group.folder, + chat_jid: options.jid, + prompt: options.prompt, + schedule_type: options.scheduleType, + schedule_value: options.scheduleValue, + context_mode: contextMode, + next_run: nextRun, + status: 'active', + created_at: now, + }); + + this.refreshTaskSnapshots(); + + return this.getTaskSnapshotOrThrow(taskId); + } + + /** List scheduled tasks. Only after start(). */ + listTasks(options?: ListTasksOptions): Task[] { + this.requireStartedForTaskApi('listTasks'); + + return this.db + .getAllTasks() + .filter((task) => { + if (options?.jid && task.chat_jid !== options.jid) return false; + if (options?.status && task.status !== options.status) return false; + return true; + }) + .map((task) => toPublicTask(task)); + } + + /** Get one scheduled task including run history. Only after start(). */ + getTask(taskId: string): TaskDetails | undefined { + this.requireStartedForTaskApi('getTask'); + + const task = this.db.getTaskById(taskId); + if (!task) return undefined; + + return { + ...toPublicTask(task), + runs: this.db.getTaskRunLogs(taskId).map((log) => toPublicTaskRun(log)), + }; + } + + /** Update a scheduled task. Only after start(). */ + async updateTask(taskId: string, updates: UpdateTaskOptions): Promise { + this.requireStartedForTaskApi('updateTask'); + this.requireTaskAdminAccess(); + + const task = this.requireExistingTask(taskId); + this.requireTaskUpdatable(task, 'update'); + + const dbUpdates: Parameters[1] = {}; + if (updates.prompt !== undefined) dbUpdates.prompt = updates.prompt; + if (updates.scheduleType !== undefined) + dbUpdates.schedule_type = updates.scheduleType; + if (updates.scheduleValue !== undefined) + dbUpdates.schedule_value = updates.scheduleValue; + + if ( + updates.scheduleType !== undefined || + updates.scheduleValue !== undefined + ) { + const scheduleType = updates.scheduleType ?? task.schedule_type; + const scheduleValue = updates.scheduleValue ?? task.schedule_value; + dbUpdates.next_run = computeTaskNextRun( + scheduleType, + scheduleValue, + this.runtimeConfig.timezone, + ); + } + + this.db.updateTask(taskId, dbUpdates); + this.refreshTaskSnapshots(); + + return this.getTaskSnapshotOrThrow(taskId); + } + + /** Pause an active scheduled task. Only after start(). */ + async pauseTask(taskId: string): Promise { + this.requireStartedForTaskApi('pauseTask'); + this.requireTaskAdminAccess(); + + const task = this.requireExistingTask(taskId); + if (task.status !== 'active') { + throw new Error( + `Cannot pause task "${taskId}" because it is ${task.status}`, + ); + } + + this.db.updateTask(taskId, { status: 'paused' }); + this.refreshTaskSnapshots(); + + return this.getTaskSnapshotOrThrow(taskId); + } + + /** Resume a paused scheduled task. Only after start(). */ + async resumeTask(taskId: string): Promise { + this.requireStartedForTaskApi('resumeTask'); + this.requireTaskAdminAccess(); + + const task = this.requireExistingTask(taskId); + if (task.status !== 'paused') { + throw new Error( + `Cannot resume task "${taskId}" because it is ${task.status}`, + ); + } + + this.db.updateTask(taskId, { status: 'active' }); + this.refreshTaskSnapshots(); + + return this.getTaskSnapshotOrThrow(taskId); + } + + /** Cancel and delete a scheduled task. Only after start(). */ + async cancelTask(taskId: string): Promise { + this.requireStartedForTaskApi('cancelTask'); + this.requireTaskAdminAccess(); + + this.requireExistingTask(taskId); + this.db.deleteTask(taskId); + this.refreshTaskSnapshots(); + } + /** * Start the agent — initialize DB, connect channels, start message loop. */ @@ -696,21 +879,7 @@ export class AgentImpl const isMain = group.isMain === true; const sessionId = this.sessions[group.folder]; - const tasks = this.db.getAllTasks(); - writeTasksSnapshot( - group.folder, - isMain, - tasks.map((t) => ({ - id: t.id, - groupFolder: t.group_folder, - prompt: t.prompt, - schedule_type: t.schedule_type, - schedule_value: t.schedule_value, - status: t.status, - next_run: t.next_run, - })), - this.config.dataDir, - ); + this.refreshTaskSnapshots(); const availableGroups = this.getAvailableGroups(); writeGroupsSnapshot( @@ -905,6 +1074,64 @@ export class AgentImpl })); } + private requireStartedForTaskApi(methodName: string): void { + if (!this._started) { + throw new Error(`Call start() before ${methodName}()`); + } + } + + private requireTaskAdminAccess(): void { + const hasMainGroup = Object.values(this._registeredGroups).some( + (group) => group.isMain === true, + ); + if (!hasMainGroup) { + throw new Error('Task admin requires at least one registered main group'); + } + } + + private requireExistingTask(taskId: string): ScheduledTask { + const task = this.db.getTaskById(taskId); + if (!task) { + throw new Error(`Task "${taskId}" not found`); + } + return task; + } + + private requireTaskUpdatable(task: ScheduledTask, operation: 'update'): void { + if (task.status === 'completed') { + throw new Error(`Cannot ${operation} completed task "${task.id}"`); + } + } + + private getTaskSnapshotOrThrow(taskId: string): Task { + const task = this.db.getTaskById(taskId); + if (!task) { + throw new Error(`Task "${taskId}" not found`); + } + return toPublicTask(task); + } + + private refreshTaskSnapshots(): void { + const taskRows = this.db.getAllTasks().map((task) => ({ + id: task.id, + groupFolder: task.group_folder, + prompt: task.prompt, + schedule_type: task.schedule_type, + schedule_value: task.schedule_value, + status: task.status, + next_run: task.next_run, + })); + + for (const group of Object.values(this._registeredGroups)) { + writeTasksSnapshot( + group.folder, + group.isMain === true, + taskRows, + this.config.dataDir, + ); + } + } + /** @internal — test helper for setting registered groups directly. */ _setRegisteredGroups(groups: Record): void { this._registeredGroups = groups; @@ -958,26 +1185,7 @@ export class AgentImpl getAvailableGroups: () => this.getAvailableGroups(), writeGroupsSnapshot: (gf, im, ag, rj) => writeGroupsSnapshot(gf, im, ag, rj, this.config.dataDir), - onTasksChanged: () => { - const tasks = this.db.getAllTasks(); - const taskRows = tasks.map((t) => ({ - id: t.id, - groupFolder: t.group_folder, - prompt: t.prompt, - schedule_type: t.schedule_type, - schedule_value: t.schedule_value, - status: t.status, - next_run: t.next_run, - })); - for (const group of Object.values(this._registeredGroups)) { - writeTasksSnapshot( - group.folder, - group.isMain === true, - taskRows, - this.config.dataDir, - ); - } - }, + onTasksChanged: () => this.refreshTaskSnapshots(), }); this.queue.setProcessMessagesFn((chatJid) => diff --git a/src/agentlite-impl.test.ts b/src/agentlite-impl.test.ts index 2ab2c7fff5f..f415e4e2f83 100644 --- a/src/agentlite-impl.test.ts +++ b/src/agentlite-impl.test.ts @@ -137,10 +137,16 @@ describe('AgentLite platform registry', () => { ).toThrow('workdir'); }); - it('keeps createAgent strict for restored names and removes registry rows on delete', async () => { + it('keeps createAgent strict for restored names, deletes the workdir, and removes registry rows on delete', async () => { const firstPlatform = await createAgentLiteImpl({ workdir: tmpDir }); platforms.push(firstPlatform); - firstPlatform.createAgent('alice', { name: 'Alice' }); + const agentWorkdir = path.join(tmpDir, 'custom-agents', 'alice'); + firstPlatform.createAgent('alice', { + name: 'Alice', + workdir: agentWorkdir, + }); + fs.mkdirSync(agentWorkdir, { recursive: true }); + fs.writeFileSync(path.join(agentWorkdir, 'sentinel.txt'), 'alive', 'utf8'); await firstPlatform.stop(); const secondPlatform = await createAgentLiteImpl({ workdir: tmpDir }); @@ -152,6 +158,8 @@ describe('AgentLite platform registry', () => { await secondPlatform.deleteAgent('alice'); expect(secondPlatform.agents.has('alice')).toBe(false); + expect(fs.existsSync(agentWorkdir)).toBe(false); + expect(fs.existsSync(getAgentRegistryDbPath(tmpDir))).toBe(true); const registry = initAgentRegistryDb(tmpDir); try { diff --git a/src/agentlite-impl.ts b/src/agentlite-impl.ts index 1a0b4acc46c..2a8c32d939b 100644 --- a/src/agentlite-impl.ts +++ b/src/agentlite-impl.ts @@ -7,6 +7,7 @@ */ import path from 'path'; +import fs from 'fs'; import { fileURLToPath } from 'url'; import { @@ -16,10 +17,12 @@ import { type SerializableAgentSettings, } from './agent-config.js'; import { + getAgentRegistryDbPath, initAgentRegistryDb, type AgentRegistryDb, type AgentRegistryRecord, } from './agent-registry-db.js'; +import { cleanupOrphans } from './box-runtime.js'; import { buildRuntimeConfig } from './runtime-config.js'; import type { AgentLiteOptions, AgentOptions } from './api/options.js'; import type { Agent } from './api/agent.js'; @@ -146,10 +149,29 @@ class AgentLiteImpl implements AgentLite { async deleteAgent(name: string): Promise { const agent = this._agents.get(name); + const record = this.registry.getAgent(name); + + if (!agent && !record) { + return; + } + if (agent) { await agent.stop(); - this._agents.delete(name); } + + const agentId = agent?.id ?? record?.agentId; + if (agentId) { + await cleanupOrphans(agentId); + } + + const workDir = agent + ? (agent as RuntimeOptionsAwareAgent).config.workDir + : record?.workDir; + if (workDir) { + this.deleteAgentWorkDir(workDir); + } + + this._agents.delete(name); this.registry.deleteAgent(name); } @@ -224,4 +246,22 @@ class AgentLiteImpl implements AgentLite { ); } } + + private deleteAgentWorkDir(workDir: string): void { + const target = path.resolve(workDir); + const registryDbPath = getAgentRegistryDbPath(this._runtimeConfig.workdir); + const registryPathFromTarget = path.relative(target, registryDbPath); + const wouldDeleteSharedRegistry = + registryPathFromTarget === '' || + (!registryPathFromTarget.startsWith('..') && + !path.isAbsolute(registryPathFromTarget)); + + if (wouldDeleteSharedRegistry) { + throw new Error( + `Refusing to delete agent workdir "${target}" because it contains the shared registry`, + ); + } + + fs.rmSync(target, { recursive: true, force: true }); + } } diff --git a/src/api/agent.ts b/src/api/agent.ts index 9f7531e39cb..4dbdb712df5 100644 --- a/src/api/agent.ts +++ b/src/api/agent.ts @@ -9,6 +9,13 @@ import type { RegisterGroupOptions, RegisteredGroup, } from './group.js'; +import type { + ListTasksOptions, + ScheduleTaskOptions, + Task, + TaskDetails, + UpdateTaskOptions, +} from './task.js'; /** Per-project agent runtime. Manages channels and per-chat VMs. */ export interface Agent { @@ -27,6 +34,20 @@ export interface Agent { getRegisteredGroups(): RegisteredGroup[]; /** Get a snapshot of discovered groups. Only after start(). */ getAvailableGroups(): AvailableGroup[]; + /** Schedule a task for a registered group. Only after start(). */ + scheduleTask(options: ScheduleTaskOptions): Promise; + /** List scheduled tasks. Only after start(). */ + listTasks(options?: ListTasksOptions): Task[]; + /** Get one scheduled task including run history. Only after start(). */ + getTask(taskId: string): TaskDetails | undefined; + /** Update a scheduled task. Only after start(). */ + updateTask(taskId: string, updates: UpdateTaskOptions): Promise; + /** Pause an active scheduled task. Only after start(). */ + pauseTask(taskId: string): Promise; + /** Resume a paused scheduled task. Only after start(). */ + resumeTask(taskId: string): Promise; + /** Cancel and delete a scheduled task. Only after start(). */ + cancelTask(taskId: string): Promise; /** Start the agent — connects channels, begins processing messages. */ start(): Promise; /** Stop the agent — disconnects channels, stops processing. */ diff --git a/src/api/sdk.ts b/src/api/sdk.ts index 01e29319e71..f2fb730afc4 100644 --- a/src/api/sdk.ts +++ b/src/api/sdk.ts @@ -27,6 +27,17 @@ export type { ChannelDriverConfig, } from './channel-driver.js'; export type { MountAllowlist, AllowedRoot } from './mount.js'; +export type { + Task, + TaskContextMode, + TaskDetails, + TaskRun, + TaskScheduleType, + TaskStatus, + ScheduleTaskOptions, + ListTasksOptions, + UpdateTaskOptions, +} from './task.js'; export type { AgentEvents, MessageInEvent, diff --git a/src/api/task.ts b/src/api/task.ts new file mode 100644 index 00000000000..8d20d7bf7ae --- /dev/null +++ b/src/api/task.ts @@ -0,0 +1,87 @@ +/** + * Public task types for the Agent SDK. + */ + +export type TaskScheduleType = 'cron' | 'interval' | 'once'; +export type TaskContextMode = 'group' | 'isolated'; +export type TaskStatus = 'active' | 'paused' | 'completed'; + +/** A scheduled task returned by the Agent SDK. */ +export interface Task { + /** Stable task identifier. */ + id: string; + /** Target group/chat identifier. */ + jid: string; + /** Folder name for the target group's data. */ + groupFolder: string; + /** Prompt executed when the task runs. */ + prompt: string; + /** Schedule kind. */ + scheduleType: TaskScheduleType; + /** Schedule value for the selected schedule type. */ + scheduleValue: string; + /** Whether the task runs in group context or a fresh session. */ + contextMode: TaskContextMode; + /** Next scheduled run time, or null after a one-time task completes. */ + nextRun: string | null; + /** Most recent run time, if any. */ + lastRun: string | null; + /** Summary from the most recent run, if any. */ + lastResult: string | null; + /** Current task status. */ + status: TaskStatus; + /** Creation time. */ + createdAt: string; +} + +/** A historical task run returned by getTask(). */ +export interface TaskRun { + /** When the run started. */ + runAt: string; + /** Run duration in milliseconds. */ + durationMs: number; + /** Run outcome. */ + status: 'success' | 'error'; + /** Result summary, if any. */ + result: string | null; + /** Error message, if any. */ + error: string | null; +} + +/** Task details including run history. */ +export interface TaskDetails extends Task { + /** Historical runs, newest first. */ + runs: TaskRun[]; +} + +/** Parameters for scheduling a new task. */ +export interface ScheduleTaskOptions { + /** Target group/chat identifier. */ + jid: string; + /** Prompt executed when the task runs. */ + prompt: string; + /** Schedule kind. */ + scheduleType: TaskScheduleType; + /** Schedule value for the selected schedule type. */ + scheduleValue: string; + /** Context mode. Defaults to isolated. */ + contextMode?: TaskContextMode; +} + +/** Parameters for filtering listTasks(). */ +export interface ListTasksOptions { + /** Limit results to one target group/chat identifier. */ + jid?: string; + /** Limit results to one task status. */ + status?: TaskStatus; +} + +/** Mutable fields for updating an existing task. */ +export interface UpdateTaskOptions { + /** New prompt for the task. */ + prompt?: string; + /** New schedule kind. */ + scheduleType?: TaskScheduleType; + /** New schedule value. */ + scheduleValue?: string; +} diff --git a/src/db.test.ts b/src/db.test.ts index f2eca89901b..b33a1d4adec 100644 --- a/src/db.test.ts +++ b/src/db.test.ts @@ -366,6 +366,57 @@ describe('task CRUD', () => { expect(db.getTaskById('task-2')!.status).toBe('paused'); }); + it('returns task run logs newest first', () => { + db.createTask({ + id: 'task-logs', + group_folder: 'main', + chat_jid: 'group@g.us', + prompt: 'log me', + schedule_type: 'once', + schedule_value: '2024-06-01T00:00:00.000Z', + context_mode: 'isolated', + next_run: null, + status: 'active', + created_at: '2024-01-01T00:00:00.000Z', + }); + + db.logTaskRun({ + task_id: 'task-logs', + run_at: '2024-01-01T08:00:00.000Z', + duration_ms: 1000, + status: 'success', + result: 'older', + error: null, + }); + db.logTaskRun({ + task_id: 'task-logs', + run_at: '2024-01-01T09:00:00.000Z', + duration_ms: 2000, + status: 'error', + result: null, + error: 'latest', + }); + + expect(db.getTaskRunLogs('task-logs')).toEqual([ + { + task_id: 'task-logs', + run_at: '2024-01-01T09:00:00.000Z', + duration_ms: 2000, + status: 'error', + result: null, + error: 'latest', + }, + { + task_id: 'task-logs', + run_at: '2024-01-01T08:00:00.000Z', + duration_ms: 1000, + status: 'success', + result: 'older', + error: null, + }, + ]); + }); + it('deletes a task and its run logs', () => { db.createTask({ id: 'task-3', diff --git a/src/db.ts b/src/db.ts index 650016e1b7b..4a82b274ae1 100644 --- a/src/db.ts +++ b/src/db.ts @@ -415,6 +415,19 @@ export class AgentDb { .all() as ScheduledTask[]; } + getTaskRunLogs(taskId: string): TaskRunLog[] { + return this.db + .prepare( + ` + SELECT task_id, run_at, duration_ms, status, result, error + FROM task_run_logs + WHERE task_id = ? + ORDER BY run_at DESC + `, + ) + .all(taskId) as TaskRunLog[]; + } + updateTask( id: string, updates: Partial< diff --git a/src/ipc.ts b/src/ipc.ts index dd2aa7afafa..7782bdb92f8 100644 --- a/src/ipc.ts +++ b/src/ipc.ts @@ -1,12 +1,11 @@ import fs from 'fs'; import path from 'path'; -import { CronExpressionParser } from 'cron-parser'; - import { AvailableGroup } from './container-runner.js'; import type { AgentDb } from './db.js'; import { isValidGroupFolder } from './group-folder.js'; import { logger } from './logger.js'; +import { computeTaskNextRun, createTaskId } from './task-utils.js'; import { RegisteredGroup } from './types.js'; export interface IpcDeps { @@ -214,45 +213,26 @@ export async function processTaskIpc( const scheduleType = data.schedule_type as 'cron' | 'interval' | 'once'; - let nextRun: string | null = null; - if (scheduleType === 'cron') { - try { - const interval = CronExpressionParser.parse(data.schedule_value, { - tz: deps.timezone, - }); - nextRun = interval.next().toISOString(); - } catch { - logger.warn( - { scheduleValue: data.schedule_value }, - 'Invalid cron expression', - ); - break; - } - } else if (scheduleType === 'interval') { - const ms = parseInt(data.schedule_value, 10); - if (isNaN(ms) || ms <= 0) { - logger.warn( - { scheduleValue: data.schedule_value }, - 'Invalid interval', - ); - break; - } - nextRun = new Date(Date.now() + ms).toISOString(); - } else if (scheduleType === 'once') { - const date = new Date(data.schedule_value); - if (isNaN(date.getTime())) { - logger.warn( - { scheduleValue: data.schedule_value }, - 'Invalid timestamp', - ); - break; - } - nextRun = date.toISOString(); + let nextRun: string; + try { + nextRun = computeTaskNextRun( + scheduleType, + data.schedule_value, + deps.timezone, + ); + } catch (err) { + logger.warn( + { + scheduleType, + scheduleValue: data.schedule_value, + err: err instanceof Error ? err.message : String(err), + }, + 'Invalid task schedule', + ); + break; } - const taskId = - data.taskId || - `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const taskId = data.taskId || createTaskId(); const contextMode = data.context_mode === 'group' || data.context_mode === 'isolated' ? data.context_mode @@ -368,25 +348,23 @@ export async function processTaskIpc( ...task, ...updates, }; - if (updatedTask.schedule_type === 'cron') { - try { - const interval = CronExpressionParser.parse( - updatedTask.schedule_value, - { tz: deps.timezone }, - ); - updates.next_run = interval.next().toISOString(); - } catch { - logger.warn( - { taskId: data.taskId, value: updatedTask.schedule_value }, - 'Invalid cron in task update', - ); - break; - } - } else if (updatedTask.schedule_type === 'interval') { - const ms = parseInt(updatedTask.schedule_value, 10); - if (!isNaN(ms) && ms > 0) { - updates.next_run = new Date(Date.now() + ms).toISOString(); - } + try { + updates.next_run = computeTaskNextRun( + updatedTask.schedule_type, + updatedTask.schedule_value, + deps.timezone, + ); + } catch (err) { + logger.warn( + { + taskId: data.taskId, + scheduleType: updatedTask.schedule_type, + value: updatedTask.schedule_value, + err: err instanceof Error ? err.message : String(err), + }, + 'Invalid task schedule update', + ); + break; } } diff --git a/src/sdk-types.test.ts b/src/sdk-types.test.ts index a1fe8285416..e64f756d297 100644 --- a/src/sdk-types.test.ts +++ b/src/sdk-types.test.ts @@ -5,6 +5,8 @@ import type { AgentLite, AvailableGroup, RegisteredGroup, + Task, + TaskDetails, } from './api/sdk.js'; describe('public SDK type exports', () => { @@ -21,4 +23,20 @@ describe('public SDK type exports', () => { expectTypeOf().toEqualTypeOf(); }); + + it('exposes task APIs with public task types', () => { + type ScheduleTaskResult = Awaited>; + type ListTasksResult = ReturnType; + type GetTaskResult = ReturnType; + type UpdateTaskResult = Awaited>; + type PauseTaskResult = Awaited>; + type ResumeTaskResult = Awaited>; + + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + }); }); diff --git a/src/task-api.test.ts b/src/task-api.test.ts new file mode 100644 index 00000000000..843b13e6254 --- /dev/null +++ b/src/task-api.test.ts @@ -0,0 +1,387 @@ +import fs from 'fs'; +import os from 'os'; +import path from 'path'; + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { AgentImpl } from './agent-impl.js'; +import { + buildAgentConfig, + resolveSerializableAgentSettings, +} from './agent-config.js'; +import { _initTestDatabase, AgentDb } from './db.js'; +import { buildRuntimeConfig } from './runtime-config.js'; +import type { RegisteredGroup, ScheduledTask } from './types.js'; + +const runtimeConfig = buildRuntimeConfig( + { timezone: 'UTC' }, + '/tmp/agentlite-test-pkg', +); + +const MAIN_GROUP: RegisteredGroup = { + name: 'Main', + folder: 'main', + trigger: 'always', + added_at: '2024-01-01T00:00:00.000Z', + isMain: true, +}; + +const TEAM_GROUP: RegisteredGroup = { + name: 'Team', + folder: 'team', + trigger: '@Andy', + added_at: '2024-01-01T00:00:00.000Z', +}; + +let tmpDir: string; +let agent: AgentImpl; +let db: AgentDb; + +function createAgent(name: string): AgentImpl { + const config = buildAgentConfig({ + agentId: `${name}00000000`.slice(0, 8), + ...resolveSerializableAgentSettings( + name, + { workdir: path.join(tmpDir, 'agents', name) }, + tmpDir, + ), + }); + return new AgentImpl(config, runtimeConfig); +} + +function createStartedAgent( + groups: Record = { + 'main@g.us': MAIN_GROUP, + 'team@g.us': TEAM_GROUP, + }, +): AgentImpl { + const instance = createAgent('test'); + instance._setDbForTests(db); + instance._setRegisteredGroups(groups); + (instance as unknown as { _started: boolean })._started = true; + return instance; +} + +function seedTask( + task: Partial & Pick, +): void { + const group = task.chat_jid === 'main@g.us' ? MAIN_GROUP : TEAM_GROUP; + db.createTask({ + id: task.id, + group_folder: task.group_folder ?? group.folder, + chat_jid: task.chat_jid, + prompt: task.prompt ?? 'test prompt', + schedule_type: task.schedule_type ?? 'once', + schedule_value: task.schedule_value ?? '2026-01-01T00:00:00Z', + context_mode: task.context_mode ?? 'isolated', + next_run: task.next_run ?? '2026-01-01T00:00:00.000Z', + status: task.status ?? 'active', + created_at: task.created_at ?? '2024-01-01T00:00:00.000Z', + }); +} + +beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-task-api-')); + db = _initTestDatabase(); + agent = createStartedAgent(); +}); + +afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } +}); + +describe('Agent task APIs', () => { + it('schedules a task with camelCase fields and persists it', async () => { + const task = await agent.scheduleTask({ + jid: 'team@g.us', + prompt: 'Send the daily summary', + scheduleType: 'once', + scheduleValue: '2026-01-01T09:00:00Z', + }); + + expect(task).toMatchObject({ + jid: 'team@g.us', + groupFolder: 'team', + prompt: 'Send the daily summary', + scheduleType: 'once', + scheduleValue: '2026-01-01T09:00:00Z', + contextMode: 'isolated', + status: 'active', + }); + expect(task.nextRun).toBe('2026-01-01T09:00:00.000Z'); + expect(task.createdAt).toBeTruthy(); + + const stored = db.getTaskById(task.id); + expect(stored).toBeDefined(); + expect(stored?.chat_jid).toBe('team@g.us'); + expect(stored?.group_folder).toBe('team'); + expect(stored?.context_mode).toBe('isolated'); + }); + + it('lists tasks and filters by jid or status', () => { + seedTask({ + id: 'task-1', + chat_jid: 'main@g.us', + created_at: '2024-01-03T00:00:00.000Z', + }); + seedTask({ + id: 'task-2', + chat_jid: 'team@g.us', + status: 'paused', + created_at: '2024-01-02T00:00:00.000Z', + }); + seedTask({ + id: 'task-3', + chat_jid: 'team@g.us', + status: 'completed', + created_at: '2024-01-01T00:00:00.000Z', + next_run: null, + }); + + expect(agent.listTasks()).toHaveLength(3); + expect( + agent.listTasks({ jid: 'team@g.us' }).map((task) => task.id), + ).toEqual(['task-2', 'task-3']); + expect( + agent.listTasks({ status: 'completed' }).map((task) => task.id), + ).toEqual(['task-3']); + }); + + it('returns task details with newest-first run history', () => { + seedTask({ id: 'task-1', chat_jid: 'team@g.us' }); + db.logTaskRun({ + task_id: 'task-1', + run_at: '2024-01-01T08:00:00.000Z', + duration_ms: 1500, + status: 'success', + result: 'older run', + error: null, + }); + db.logTaskRun({ + task_id: 'task-1', + run_at: '2024-01-01T09:00:00.000Z', + duration_ms: 900, + status: 'error', + result: null, + error: 'latest failure', + }); + + const task = agent.getTask('task-1'); + + expect(task?.runs).toEqual([ + { + runAt: '2024-01-01T09:00:00.000Z', + durationMs: 900, + status: 'error', + result: null, + error: 'latest failure', + }, + { + runAt: '2024-01-01T08:00:00.000Z', + durationMs: 1500, + status: 'success', + result: 'older run', + error: null, + }, + ]); + }); + + it('updates prompt without changing schedule or status', async () => { + seedTask({ + id: 'task-1', + chat_jid: 'team@g.us', + schedule_type: 'cron', + schedule_value: '0 9 * * 1', + next_run: '2026-01-05T09:00:00.000Z', + status: 'active', + }); + + const updated = await agent.updateTask('task-1', { + prompt: 'updated prompt', + }); + + expect(updated).toMatchObject({ + id: 'task-1', + jid: 'team@g.us', + groupFolder: 'team', + prompt: 'updated prompt', + scheduleType: 'cron', + scheduleValue: '0 9 * * 1', + status: 'active', + nextRun: '2026-01-05T09:00:00.000Z', + }); + expect('chat_jid' in (updated as unknown as Record)).toBe( + false, + ); + }); + + it('updates schedule and recomputes nextRun while preserving status', async () => { + seedTask({ + id: 'task-1', + chat_jid: 'team@g.us', + schedule_type: 'cron', + schedule_value: '0 9 * * 1', + next_run: '2026-01-05T09:00:00.000Z', + status: 'paused', + }); + + const updated = await agent.updateTask('task-1', { + scheduleType: 'once', + scheduleValue: '2026-02-02T10:30:00Z', + }); + + expect(updated).toMatchObject({ + id: 'task-1', + scheduleType: 'once', + scheduleValue: '2026-02-02T10:30:00Z', + nextRun: '2026-02-02T10:30:00.000Z', + status: 'paused', + }); + }); + + it('rejects mutation APIs before start()', async () => { + const unstarted = createAgent('unstarted'); + unstarted._setDbForTests(db); + + await expect( + unstarted.scheduleTask({ + jid: 'team@g.us', + prompt: 'test', + scheduleType: 'once', + scheduleValue: '2026-01-01T00:00:00Z', + }), + ).rejects.toThrow('Call start() before scheduleTask()'); + await expect( + unstarted.updateTask('task-1', { prompt: 'updated' }), + ).rejects.toThrow('Call start() before updateTask()'); + await expect(unstarted.pauseTask('task-1')).rejects.toThrow( + 'Call start() before pauseTask()', + ); + await expect(unstarted.resumeTask('task-1')).rejects.toThrow( + 'Call start() before resumeTask()', + ); + await expect(unstarted.cancelTask('task-1')).rejects.toThrow( + 'Call start() before cancelTask()', + ); + }); + + it('rejects read APIs before start()', () => { + const unstarted = createAgent('unstarted'); + unstarted._setDbForTests(db); + + expect(() => unstarted.listTasks()).toThrow( + 'Call start() before listTasks()', + ); + expect(() => unstarted.getTask('task-1')).toThrow( + 'Call start() before getTask()', + ); + }); + + it('rejects mutation APIs when no main group is registered', async () => { + agent = createStartedAgent({ 'team@g.us': TEAM_GROUP }); + seedTask({ id: 'task-1', chat_jid: 'team@g.us' }); + + await expect( + agent.scheduleTask({ + jid: 'team@g.us', + prompt: 'test', + scheduleType: 'once', + scheduleValue: '2026-01-01T00:00:00Z', + }), + ).rejects.toThrow('Task admin requires at least one registered main group'); + await expect(agent.cancelTask('task-1')).rejects.toThrow( + 'Task admin requires at least one registered main group', + ); + }); + + it('rejects scheduleTask for unknown groups and invalid schedules', async () => { + await expect( + agent.scheduleTask({ + jid: 'missing@g.us', + prompt: 'test', + scheduleType: 'once', + scheduleValue: '2026-01-01T00:00:00Z', + }), + ).rejects.toThrow('group "missing@g.us" is not registered'); + + await expect( + agent.scheduleTask({ + jid: 'team@g.us', + prompt: 'test', + scheduleType: 'interval', + scheduleValue: '0', + }), + ).rejects.toThrow('Invalid interval: 0'); + }); + + it('returns undefined for missing tasks and throws on missing mutation targets', async () => { + expect(agent.getTask('missing-task')).toBeUndefined(); + + const missingMutations = [ + () => agent.updateTask('missing-task', { prompt: 'updated' }), + () => agent.pauseTask('missing-task'), + () => agent.resumeTask('missing-task'), + () => agent.cancelTask('missing-task'), + ]; + + for (const mutate of missingMutations) { + await expect(mutate()).rejects.toThrow('Task "missing-task" not found'); + } + }); + + it('pauses and resumes tasks while enforcing valid transitions', async () => { + seedTask({ id: 'task-1', chat_jid: 'team@g.us', status: 'active' }); + + const paused = await agent.pauseTask('task-1'); + expect(paused.status).toBe('paused'); + await expect(agent.pauseTask('task-1')).rejects.toThrow( + 'Cannot pause task "task-1" because it is paused', + ); + + const resumed = await agent.resumeTask('task-1'); + expect(resumed.status).toBe('active'); + await expect(agent.resumeTask('task-1')).rejects.toThrow( + 'Cannot resume task "task-1" because it is active', + ); + }); + + it('blocks updates and state changes for completed tasks', async () => { + seedTask({ + id: 'task-1', + chat_jid: 'team@g.us', + status: 'completed', + next_run: null, + }); + + await expect( + agent.updateTask('task-1', { prompt: 'updated' }), + ).rejects.toThrow('Cannot update completed task "task-1"'); + await expect(agent.pauseTask('task-1')).rejects.toThrow( + 'Cannot pause task "task-1" because it is completed', + ); + await expect(agent.resumeTask('task-1')).rejects.toThrow( + 'Cannot resume task "task-1" because it is completed', + ); + }); + + it('cancels tasks and removes them from storage', async () => { + seedTask({ id: 'task-1', chat_jid: 'team@g.us' }); + db.logTaskRun({ + task_id: 'task-1', + run_at: '2024-01-01T09:00:00.000Z', + duration_ms: 100, + status: 'success', + result: 'done', + error: null, + }); + + await agent.cancelTask('task-1'); + + expect(db.getTaskById('task-1')).toBeUndefined(); + expect(db.getTaskRunLogs('task-1')).toEqual([]); + }); +}); diff --git a/src/task-lifecycle.test.ts b/src/task-lifecycle.test.ts new file mode 100644 index 00000000000..ece0a112534 --- /dev/null +++ b/src/task-lifecycle.test.ts @@ -0,0 +1,215 @@ +import fs from 'fs'; +import os from 'os'; +import path from 'path'; + +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('./container-runner.js', async () => { + const actual = await vi.importActual( + './container-runner.js', + ); + return { + ...actual, + runContainerAgent: vi.fn(), + }; +}); + +import { AgentImpl } from './agent-impl.js'; +import { + buildAgentConfig, + resolveSerializableAgentSettings, +} from './agent-config.js'; +import { _initTestDatabase, AgentDb } from './db.js'; +import { resolveGroupIpcPath } from './group-folder.js'; +import { buildRuntimeConfig } from './runtime-config.js'; +import { startSchedulerLoop } from './task-scheduler.js'; +import { runContainerAgent } from './container-runner.js'; +import type { RuntimeConfig } from './runtime-config.js'; +import type { RegisteredGroup } from './types.js'; + +const runtimeConfig: RuntimeConfig = buildRuntimeConfig( + { timezone: 'UTC' }, + '/tmp/agentlite-test-pkg', +); + +const MAIN_GROUP: RegisteredGroup = { + name: 'Main', + folder: 'main', + trigger: 'always', + added_at: '2024-01-01T00:00:00.000Z', + isMain: true, +}; + +const TEAM_GROUP: RegisteredGroup = { + name: 'Team', + folder: 'team', + trigger: '@Andy', + added_at: '2024-01-01T00:00:00.000Z', +}; + +let tmpDir: string; +let db: AgentDb; + +function createAgent(name: string): AgentImpl { + const config = buildAgentConfig({ + agentId: `${name}00000000`.slice(0, 8), + ...resolveSerializableAgentSettings( + name, + { workdir: path.join(tmpDir, 'agents', name) }, + tmpDir, + ), + }); + return new AgentImpl(config, runtimeConfig); +} + +describe('task lifecycle integration', () => { + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-task-life-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + }); + + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + }); + + it('executes a scheduled task end-to-end through the scheduler and persists results', async () => { + const agent = createAgent('test'); + agent._setDbForTests(db); + agent._setRegisteredGroups({ + 'main@g.us': MAIN_GROUP, + 'team@g.us': TEAM_GROUP, + }); + (agent as unknown as { _started: boolean })._started = true; + ( + agent as unknown as { + sessions: Record; + } + ).sessions = { team: 'session-team-123' }; + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _runtimeConfig, _onProcess, onOutput) => { + await onOutput?.({ + status: 'success', + result: 'Lifecycle integration result', + }); + return { + status: 'success', + result: 'Lifecycle integration result', + }; + }, + ); + + const task = await agent.scheduleTask({ + jid: 'team@g.us', + prompt: 'Reply with Lifecycle integration result', + scheduleType: 'once', + scheduleValue: '2024-01-01T00:00:00Z', + contextMode: 'group', + }); + + const snapshotPath = path.join( + resolveGroupIpcPath('team', agent.config.dataDir), + 'current_tasks.json', + ); + const scheduledSnapshot = JSON.parse( + fs.readFileSync(snapshotPath, 'utf-8'), + ); + expect(scheduledSnapshot).toEqual([ + expect.objectContaining({ + id: task.id, + groupFolder: 'team', + schedule_type: 'once', + status: 'active', + }), + ]); + + const sendMessage = vi.fn(async () => {}); + const enqueueTask = vi.fn( + async (_groupJid: string, _taskId: string, fn: () => Promise) => { + await fn(); + }, + ); + const queue = { + enqueueTask, + notifyIdle: vi.fn(), + closeStdin: vi.fn(), + }; + + const schedulerHandle = startSchedulerLoop({ + db, + assistantName: 'Andy', + schedulerPollInterval: 60000, + timezone: 'UTC', + workDir: agent.config.workDir, + groupsDir: agent.config.groupsDir, + dataDir: agent.config.dataDir, + runtimeConfig, + registeredGroups: () => ({ + 'main@g.us': MAIN_GROUP, + 'team@g.us': TEAM_GROUP, + }), + getSessions: () => + ( + agent as unknown as { + sessions: Record; + } + ).sessions, + queue: queue as any, + onProcess: () => {}, + sendMessage, + }); + + try { + await vi.waitFor(() => { + expect(runContainerAgent).toHaveBeenCalledTimes(1); + }); + await vi.waitFor(() => { + expect(agent.getTask(task.id)?.status).toBe('completed'); + }); + + expect(enqueueTask).toHaveBeenCalledWith( + 'team@g.us', + task.id, + expect.any(Function), + ); + expect(runContainerAgent).toHaveBeenCalledWith( + expect.objectContaining({ folder: 'team' }), + expect.objectContaining({ + prompt: 'Reply with Lifecycle integration result', + sessionId: 'session-team-123', + groupFolder: 'team', + chatJid: 'team@g.us', + isScheduledTask: true, + }), + runtimeConfig, + expect.any(Function), + expect.any(Function), + ); + + const persisted = agent.getTask(task.id); + expect(persisted).toMatchObject({ + id: task.id, + status: 'completed', + lastResult: 'Lifecycle integration result', + }); + expect(persisted?.lastRun).toBeTruthy(); + expect(persisted?.runs).toHaveLength(1); + expect(persisted?.runs[0]).toMatchObject({ + status: 'success', + result: 'Lifecycle integration result', + error: null, + }); + expect(sendMessage).toHaveBeenCalledWith( + 'team@g.us', + 'Lifecycle integration result', + ); + } finally { + schedulerHandle.stop(); + } + }); +}); diff --git a/src/task-utils.ts b/src/task-utils.ts new file mode 100644 index 00000000000..81a574c00f2 --- /dev/null +++ b/src/task-utils.ts @@ -0,0 +1,42 @@ +import { CronExpressionParser } from 'cron-parser'; + +import type { ScheduledTask } from './types.js'; + +export function createTaskId(): string { + return `task-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; +} + +export function computeTaskNextRun( + scheduleType: ScheduledTask['schedule_type'], + scheduleValue: string, + timezone: string, +): string { + if (scheduleType === 'cron') { + const interval = CronExpressionParser.parse(scheduleValue, { + tz: timezone, + }); + const next = interval.next(); + if (!next) { + throw new Error(`Invalid cron expression: ${scheduleValue}`); + } + const nextRun = next.toISOString(); + if (!nextRun) { + throw new Error(`Invalid cron expression: ${scheduleValue}`); + } + return nextRun; + } + + if (scheduleType === 'interval') { + const ms = parseInt(scheduleValue, 10); + if (isNaN(ms) || ms <= 0) { + throw new Error(`Invalid interval: ${scheduleValue}`); + } + return new Date(Date.now() + ms).toISOString(); + } + + const date = new Date(scheduleValue); + if (isNaN(date.getTime())) { + throw new Error(`Invalid timestamp: ${scheduleValue}`); + } + return date.toISOString(); +}