diff --git a/prompts/en/memory_persistence.md.j2 b/prompts/en/memory_persistence.md.j2 index 3a60a0bd5..2b2932313 100644 --- a/prompts/en/memory_persistence.md.j2 +++ b/prompts/en/memory_persistence.md.j2 @@ -32,13 +32,17 @@ This is an automatic process triggered periodically during conversation. You are - `"decision"` for commitments or choices made - `"user_correction"` when the user corrects a prior assumption, instruction, or framing - `"decision_revised"` when a prior choice changes after feedback or new information + - `"deadline_set"` when the conversation establishes a concrete due date, deadline, or scheduled milestone + - `"blocked_on"` when progress is waiting on an approval, dependency, missing input, or external action + - `"constraint"` when the user or system states a hard requirement, limitation, or non-negotiable boundary + - `"outcome"` when a task, branch, or delegated step reaches a clear terminal result worth retaining in temporal memory - `"error"` for failures or problems - `"system"` for other notable events - `summary`: one-line description of what happened - Normalize relative time references to absolute dates/times with timezone (for example `2026-03-31T14:20:00-04:00`) so downstream memory checks are stable across sessions. - - `importance`: 0.0-1.0 score (`decision`, `user_correction`, and `decision_revised` are typically 0.6-0.8) + - `importance`: 0.0-1.0 score (`decision`, `user_correction`, `decision_revised`, `blocked_on`, and `outcome` are typically 0.6-0.8) - Events feed the agent's temporal working memory — they help the agent remember *what happened today*, not just facts. 5. **Finish with the terminal tool.** You must call `memory_persistence_complete` before finishing: diff --git a/src/agent/channel.rs b/src/agent/channel.rs index ad9932f81..1d2af93f8 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -82,6 +82,63 @@ fn should_flush_coalesce_buffer_for_event(event: &ProcessEvent) -> bool { ) } +fn classify_conversational_event_summary( + summary: &str, + default_event_type: crate::memory::WorkingMemoryEventType, +) -> (crate::memory::WorkingMemoryEventType, String) { + let trimmed = summary.trim(); + if trimmed.is_empty() { + return (default_event_type, String::new()); + } + + if let Some((prefix, rest)) = trimmed.split_once(':') { + let rest_trimmed = rest.trim(); + let prefix = prefix.trim(); + if prefix.eq_ignore_ascii_case("outcome") { + return ( + crate::memory::WorkingMemoryEventType::Outcome, + rest_trimmed.to_string(), + ); + } + if prefix.eq_ignore_ascii_case("blocked_on") { + return ( + crate::memory::WorkingMemoryEventType::BlockedOn, + rest_trimmed.to_string(), + ); + } + if prefix.eq_ignore_ascii_case("constraint") { + return ( + crate::memory::WorkingMemoryEventType::Constraint, + rest_trimmed.to_string(), + ); + } + } + + (default_event_type, trimmed.to_string()) +} + +fn format_conversational_event_summary( + event_type: crate::memory::WorkingMemoryEventType, + source: &str, + event_summary: &str, +) -> String { + let label = match event_type { + crate::memory::WorkingMemoryEventType::Outcome => "outcome", + crate::memory::WorkingMemoryEventType::BlockedOn => "blocked on", + crate::memory::WorkingMemoryEventType::Constraint => "constraint", + crate::memory::WorkingMemoryEventType::Error => "failed", + crate::memory::WorkingMemoryEventType::BranchCompleted + | crate::memory::WorkingMemoryEventType::WorkerCompleted => "completed", + _ => "concluded", + }; + + if event_summary.is_empty() { + format!("{source} {label}") + } else { + format!("{source} {label}: {event_summary}") + } +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -2954,11 +3011,19 @@ impl Channel { } else { conclusion.clone() }; + let (event_type, event_summary) = classify_conversational_event_summary( + &summary, + crate::memory::WorkingMemoryEventType::BranchCompleted, + ); self.deps .working_memory .emit( - crate::memory::WorkingMemoryEventType::BranchCompleted, - format!("Branch concluded: {summary}"), + event_type, + format_conversational_event_summary( + event_type, + "Branch", + &event_summary, + ), ) .channel(self.id.to_string()) .importance(0.7) @@ -3027,20 +3092,18 @@ impl Channel { } else { result.clone() }; - let event_type = if *success { + let default_event_type = if *success { crate::memory::WorkingMemoryEventType::WorkerCompleted } else { crate::memory::WorkingMemoryEventType::Error }; + let (event_type, event_summary) = + classify_conversational_event_summary(&worker_summary, default_event_type); self.deps .working_memory .emit( event_type, - if *success { - format!("Worker completed: {worker_summary}") - } else { - format!("Worker failed: {worker_summary}") - }, + format_conversational_event_summary(event_type, "Worker", &event_summary), ) .channel(self.id.to_string()) .importance(if *success { 0.6 } else { 0.8 }) @@ -3617,11 +3680,12 @@ fn is_dm_conversation_id(conv_id: &str) -> bool { #[cfg(test)] mod tests { use super::{ - QuietModeFallbackState, compute_listen_mode_invocation, is_dm_conversation_id, + QuietModeFallbackState, classify_conversational_event_summary, + compute_listen_mode_invocation, format_conversational_event_summary, is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, }; - use crate::memory::MemoryType; + use crate::memory::{MemoryType, WorkingMemoryEventType}; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; use std::collections::HashMap; use std::sync::Arc; @@ -3790,6 +3854,77 @@ mod tests { assert!(!should_process_event_for_channel(&event, &channel_id)); } + #[test] + fn conversational_event_summary_extracts_outcome_prefix() { + let (event_type, summary) = classify_conversational_event_summary( + "outcome: implemented the migration safety check", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert_eq!(summary, "implemented the migration safety check"); + } + + #[test] + fn conversational_event_summary_extracts_blocked_on_prefix() { + let (event_type, summary) = classify_conversational_event_summary( + "blocked_on: waiting for review from infra", + WorkingMemoryEventType::Error, + ); + assert_eq!(event_type, WorkingMemoryEventType::BlockedOn); + assert_eq!(summary, "waiting for review from infra"); + } + + #[test] + fn conversational_event_summary_falls_back_to_default_type() { + let (event_type, summary) = classify_conversational_event_summary( + "completed with no blockers", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::WorkerCompleted); + assert_eq!(summary, "completed with no blockers"); + } + + #[test] + fn conversational_event_summary_extracts_constraint_prefix_case_insensitively() { + let (event_type, summary) = classify_conversational_event_summary( + "CoNsTrAiNt: must keep migrations immutable", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Constraint); + assert_eq!(summary, "must keep migrations immutable"); + } + + #[test] + fn conversational_event_summary_is_case_insensitive_across_prefixes() { + let (event_type, summary) = classify_conversational_event_summary( + "OUTCOME: implemented the follow-up", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert_eq!(summary, "implemented the follow-up"); + + let (event_type, summary) = classify_conversational_event_summary( + "Blocked_On: waiting on reviewer signoff", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::BlockedOn); + assert_eq!(summary, "waiting on reviewer signoff"); + } + + #[test] + fn conversational_event_summary_treats_empty_prefixed_content_as_empty_summary() { + let (event_type, summary) = classify_conversational_event_summary( + "outcome: ", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert!(summary.is_empty()); + assert_eq!( + format_conversational_event_summary(event_type, "Worker", &summary), + "Worker outcome" + ); + } + #[test] fn quiet_mode_invocation_uses_discord_mention_and_reply_metadata() { let message = inbound_message( diff --git a/src/memory/working.rs b/src/memory/working.rs index 7d1bff0d2..9308915ca 100644 --- a/src/memory/working.rs +++ b/src/memory/working.rs @@ -43,6 +43,14 @@ pub enum WorkingMemoryEventType { UserCorrection, /// A prior decision was revised. DecisionRevised, + /// A concrete deadline or due date was set. + DeadlineSet, + /// Progress is currently blocked on an external dependency or prerequisite. + BlockedOn, + /// An explicit constraint was stated. + Constraint, + /// A task or branch reached a terminal result. + Outcome, /// An error or failure occurred. Error, /// A task was created or updated. @@ -68,6 +76,10 @@ impl WorkingMemoryEventType { Self::Decision => "decision", Self::UserCorrection => "user_correction", Self::DecisionRevised => "decision_revised", + Self::DeadlineSet => "deadline_set", + Self::BlockedOn => "blocked_on", + Self::Constraint => "constraint", + Self::Outcome => "outcome", Self::Error => "error", Self::TaskUpdate => "task_update", Self::AgentMessage => "agent_message", @@ -87,6 +99,10 @@ impl WorkingMemoryEventType { "decision" => Some(Self::Decision), "user_correction" => Some(Self::UserCorrection), "decision_revised" => Some(Self::DecisionRevised), + "deadline_set" => Some(Self::DeadlineSet), + "blocked_on" => Some(Self::BlockedOn), + "constraint" => Some(Self::Constraint), + "outcome" => Some(Self::Outcome), "error" => Some(Self::Error), "task_update" => Some(Self::TaskUpdate), "agent_message" => Some(Self::AgentMessage), @@ -765,6 +781,10 @@ fn format_event_line(event: &WorkingMemoryEvent, current_channel_id: &str) -> St WorkingMemoryEventType::Decision => "Decision", WorkingMemoryEventType::UserCorrection => "User correction", WorkingMemoryEventType::DecisionRevised => "Decision revised", + WorkingMemoryEventType::DeadlineSet => "Deadline set", + WorkingMemoryEventType::BlockedOn => "Blocked on", + WorkingMemoryEventType::Constraint => "Constraint", + WorkingMemoryEventType::Outcome => "Outcome", WorkingMemoryEventType::Error => "Error", WorkingMemoryEventType::TaskUpdate => "Task update", WorkingMemoryEventType::AgentMessage => "Agent message", @@ -1060,7 +1080,7 @@ mod tests { let store = setup_test_store().await; let today = store.today(); - for event_type in [ + let inserted = [ WorkingMemoryEventType::BranchCompleted, WorkingMemoryEventType::WorkerSpawned, WorkingMemoryEventType::WorkerCompleted, @@ -1069,13 +1089,19 @@ mod tests { WorkingMemoryEventType::Decision, WorkingMemoryEventType::UserCorrection, WorkingMemoryEventType::DecisionRevised, + WorkingMemoryEventType::DeadlineSet, + WorkingMemoryEventType::BlockedOn, + WorkingMemoryEventType::Constraint, + WorkingMemoryEventType::Outcome, WorkingMemoryEventType::Error, WorkingMemoryEventType::TaskUpdate, WorkingMemoryEventType::AgentMessage, WorkingMemoryEventType::System, WorkingMemoryEventType::MemoryPromoted, WorkingMemoryEventType::MemoryDemoted, - ] { + ]; + + for event_type in inserted { let event = WorkingMemoryEvent { id: Uuid::new_v4().to_string(), event_type, @@ -1091,7 +1117,7 @@ mod tests { } let events = store.get_events_for_day(&today).await.unwrap(); - assert_eq!(events.len(), 14); + assert_eq!(events.len(), inserted.len()); // Verify all types survived the roundtrip. let types: Vec = events.iter().map(|e| e.event_type).collect(); diff --git a/src/tasks/store.rs b/src/tasks/store.rs index db3b0d117..199413c3c 100644 --- a/src/tasks/store.rs +++ b/src/tasks/store.rs @@ -9,6 +9,8 @@ use anyhow::Context as _; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; +#[cfg(test)] +use sqlx::sqlite::SqlitePoolOptions; use sqlx::{Row as _, SqlitePool}; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)] @@ -183,6 +185,11 @@ impl TaskStore { Self { pool } } + #[cfg(test)] + pub(crate) fn pool(&self) -> &SqlitePool { + &self.pool + } + /// Maximum number of retries when a concurrent create races on the /// `task_number` UNIQUE constraint. const MAX_CREATE_RETRIES: usize = 3; @@ -670,61 +677,65 @@ fn read_optional_timestamp(row: &sqlx::sqlite::SqliteRow, column: &str) -> Optio } #[cfg(test)] -mod tests { - use super::*; - use sqlx::sqlite::SqlitePoolOptions; - - async fn setup_store() -> TaskStore { - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect("sqlite::memory:") - .await - .expect("in-memory sqlite should connect"); - - sqlx::query( - r#" - CREATE TABLE tasks ( - id TEXT PRIMARY KEY, - task_number INTEGER NOT NULL UNIQUE, - title TEXT NOT NULL, - description TEXT, - status TEXT NOT NULL DEFAULT 'backlog', - priority TEXT NOT NULL DEFAULT 'medium', - owner_agent_id TEXT NOT NULL, - assigned_agent_id TEXT NOT NULL, - subtasks TEXT, - metadata TEXT, - source_memory_id TEXT, - worker_id TEXT, - created_by TEXT NOT NULL, - approved_at TEXT, - approved_by TEXT, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - completed_at TEXT - ) - "#, - ) - .execute(&pool) +pub(crate) async fn setup_test_store() -> TaskStore { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") .await - .expect("tasks schema should be created"); - - sqlx::query( - "CREATE TABLE task_number_seq ( - id INTEGER PRIMARY KEY CHECK (id = 1), - next_number INTEGER NOT NULL DEFAULT 1 - )", + .expect("in-memory sqlite should connect"); + + sqlx::query( + r#" + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + task_number INTEGER NOT NULL UNIQUE, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL DEFAULT 'backlog', + priority TEXT NOT NULL DEFAULT 'medium', + owner_agent_id TEXT NOT NULL, + assigned_agent_id TEXT NOT NULL, + subtasks TEXT, + metadata TEXT, + source_memory_id TEXT, + worker_id TEXT, + created_by TEXT NOT NULL, + approved_at TEXT, + approved_by TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + completed_at TEXT ) + "#, + ) + .execute(&pool) + .await + .expect("tasks schema should be created"); + + sqlx::query( + "CREATE TABLE task_number_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_number INTEGER NOT NULL DEFAULT 1 + )", + ) + .execute(&pool) + .await + .expect("task_number_seq should be created"); + + sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") .execute(&pool) .await - .expect("task_number_seq should be created"); + .expect("sequence seed should be inserted"); - sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") - .execute(&pool) - .await - .expect("sequence seed should be inserted"); + TaskStore::new(pool) +} - TaskStore::new(pool) +#[cfg(test)] +mod tests { + use super::*; + + async fn setup_store() -> TaskStore { + setup_test_store().await } fn self_assigned_input(title: &str, status: TaskStatus) -> CreateTaskInput { diff --git a/src/tools/memory_persistence_complete.rs b/src/tools/memory_persistence_complete.rs index e7e9c8b8f..f54c1beb9 100644 --- a/src/tools/memory_persistence_complete.rs +++ b/src/tools/memory_persistence_complete.rs @@ -109,7 +109,8 @@ pub struct MemoryPersistenceCompleteArgs { /// A single event extracted by the persistence branch for the working memory log. #[derive(Debug, Clone, Deserialize, JsonSchema)] pub struct WorkingMemoryEventInput { - /// Event type: "decision", "user_correction", "decision_revised", "error", or "system". + /// Event type: "decision", "user_correction", "decision_revised", "deadline_set", + /// "blocked_on", "constraint", "outcome", "error", or "system". pub event_type: String, /// One-line summary of the event. pub summary: String, @@ -170,6 +171,10 @@ impl Tool for MemoryPersistenceCompleteTool { "decision", "user_correction", "decision_revised", + "deadline_set", + "blocked_on", + "constraint", + "outcome", "error", "system" ], diff --git a/src/tools/send_agent_message.rs b/src/tools/send_agent_message.rs index dcc9e79e2..82b0267ed 100644 --- a/src/tools/send_agent_message.rs +++ b/src/tools/send_agent_message.rs @@ -285,7 +285,7 @@ impl Tool for SendAgentMessageTool { if let Some(working_memory) = &self.working_memory { working_memory .emit( - crate::memory::WorkingMemoryEventType::AgentMessage, + crate::memory::WorkingMemoryEventType::Outcome, format!("Delegated task #{task_number} to {target_display}"), ) .importance(0.7) @@ -325,3 +325,102 @@ fn extract_task_title(message: &str) -> String { format!("{}...", first_line[..boundary].trim()) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::links::{AgentLink, LinkDirection, LinkKind}; + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; + use arc_swap::ArcSwap; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::collections::HashMap; + use std::time::Duration; + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn send_agent_message_emits_outcome_event() { + let task_store = setup_test_store().await; + let pool = task_store.pool().clone(); + sqlx::query( + "CREATE TABLE conversation_messages ( + id TEXT PRIMARY KEY, + channel_id TEXT NOT NULL, + role TEXT NOT NULL, + sender_name TEXT, + sender_id TEXT, + content TEXT NOT NULL, + metadata TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + )", + ) + .execute(&pool) + .await + .expect("conversation messages schema should be created"); + let task_store = Arc::new(task_store); + let conversation_logger = ConversationLogger::new(pool.clone()); + let working_memory_pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&working_memory_pool) + .await + .expect("working memory migrations"); + let working_memory = WorkingMemoryStore::new(working_memory_pool, Tz::UTC); + + let links = Arc::new(ArcSwap::from_pointee(vec![AgentLink { + from_agent_id: "planner".to_string(), + to_agent_id: "executor".to_string(), + direction: LinkDirection::TwoWay, + kind: LinkKind::Peer, + }])); + let agent_names = Arc::new(HashMap::from([ + ("planner".to_string(), "Planner".to_string()), + ("executor".to_string(), "Executor".to_string()), + ])); + + let tool = SendAgentMessageTool::new( + crate::AgentId::from("planner"), + links, + agent_names, + task_store, + conversation_logger, + ) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(SendAgentMessageArgs { + target: "executor".to_string(), + message: "Implement the working-memory renderer. Include tests.".to_string(), + }) + .await + .expect("send agent message should succeed"); + + assert!(output.success); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!(event.summary, "Delegated task #1 to Executor"); + } +} diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 180831c0a..61f979416 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -29,6 +29,21 @@ impl SpawnWorkerTool { } } +fn summarize_duplicate_task(task: &str) -> String { + let trimmed = task.trim(); + if trimmed.is_empty() { + return "unspecified task".to_string(); + } + + const MAX_CHARS: usize = 80; + if trimmed.len() <= MAX_CHARS { + trimmed.to_string() + } else { + let boundary = trimmed.floor_char_boundary(MAX_CHARS); + format!("{}...", &trimmed[..boundary]) + } +} + /// Error type for spawn worker tool. #[derive(Debug, thiserror::Error)] #[error("Worker spawn failed: {0}")] @@ -188,6 +203,20 @@ impl Tool for SpawnWorkerTool { { let status = self.state.status_block.read().await; if let Some(existing_id) = status.find_duplicate_worker_task(&args.task) { + self.state + .deps + .working_memory + .emit( + crate::memory::WorkingMemoryEventType::BlockedOn, + format!( + "Worker spawn blocked on active worker {existing_id} for duplicate task: {}", + summarize_duplicate_task(&args.task) + ), + ) + .channel(self.state.channel_id.to_string()) + .importance(0.6) + .record(); + return Ok(SpawnWorkerOutput { worker_id: existing_id, spawned: false, diff --git a/src/tools/task_create.rs b/src/tools/task_create.rs index dabc7ead6..476d9ee7f 100644 --- a/src/tools/task_create.rs +++ b/src/tools/task_create.rs @@ -142,12 +142,25 @@ impl Tool for TaskCreateTool { .map_err(|error| TaskCreateError(format!("{error}")))?; if let Some(working_memory) = &self.working_memory { - working_memory - .emit( + let (event_type, summary, importance) = if task.status == TaskStatus::Done { + ( + crate::memory::WorkingMemoryEventType::Outcome, + format!("Task #{} completed: {}", task.task_number, task.title), + 0.7, + ) + } else { + ( crate::memory::WorkingMemoryEventType::TaskUpdate, - format!("Task created #{}: {}", task.task_number, task.title), + format!( + "Task created #{}: {} (status: {})", + task.task_number, task.title, task.status + ), + 0.5, ) - .importance(0.5) + }; + working_memory + .emit(event_type, summary) + .importance(importance) .record(); } @@ -159,3 +172,68 @@ impl Tool for TaskCreateTool { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::time::Duration; + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn task_create_emits_outcome_for_done_tasks() { + let task_store = Arc::new(setup_test_store().await); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("migrations"); + let working_memory = WorkingMemoryStore::new(pool, Tz::UTC); + + let tool = TaskCreateTool::new(task_store, "agent-test", "branch") + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskCreateArgs { + title: "Ship observation MVP".to_string(), + description: Some("land the first packet".to_string()), + priority: "medium".to_string(), + subtasks: Vec::new(), + metadata: None, + status: Some("done".to_string()), + }) + .await + .expect("task create should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!(event.summary, "Task #1 completed: Ship observation MVP"); + } +} diff --git a/src/tools/task_update.rs b/src/tools/task_update.rs index 555df299f..2c1a0a6ef 100644 --- a/src/tools/task_update.rs +++ b/src/tools/task_update.rs @@ -155,6 +155,13 @@ impl Tool for TaskUpdateTool { async fn call(&self, args: Self::Args) -> Result { let task_number = i64::from(args.task_number); + let previous_status = self + .task_store + .get_by_number(task_number) + .await + .map_err(|error| TaskUpdateError(format!("{error}")))? + .ok_or_else(|| TaskUpdateError(format!("task #{} not found", task_number)))? + .status; if let TaskUpdateScope::Worker(ref worker_id) = self.scope { let current = self @@ -236,15 +243,27 @@ impl Tool for TaskUpdateTool { .ok_or_else(|| TaskUpdateError(format!("task #{} not found", task_number)))?; if let Some(working_memory) = &self.working_memory { - working_memory - .emit( + let transitioned_to_done = + previous_status != TaskStatus::Done && updated.status == TaskStatus::Done; + let (event_type, summary, importance) = if transitioned_to_done { + ( + crate::memory::WorkingMemoryEventType::Outcome, + format!("Task #{} completed", updated.task_number), + 0.7, + ) + } else { + ( crate::memory::WorkingMemoryEventType::TaskUpdate, format!( "Task #{} updated to {}", updated.task_number, updated.status ), + 0.4, ) - .importance(0.4) + }; + working_memory + .emit(event_type, summary) + .importance(importance) .record(); } @@ -256,3 +275,145 @@ impl Tool for TaskUpdateTool { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::time::Duration; + + async fn setup_working_memory() -> Arc { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("migrations"); + WorkingMemoryStore::new(pool, Tz::UTC) + } + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn task_update_emits_outcome_for_done_status() { + let task_store = Arc::new(setup_test_store().await); + let working_memory = setup_working_memory().await; + + let created = task_store + .create(crate::tasks::CreateTaskInput { + owner_agent_id: "agent-test".to_string(), + assigned_agent_id: "agent-test".to_string(), + title: "Review PR 2".to_string(), + description: None, + status: TaskStatus::InProgress, + priority: TaskPriority::Medium, + subtasks: Vec::new(), + metadata: serde_json::json!({}), + source_memory_id: None, + created_by: "branch".to_string(), + }) + .await + .expect("task should be created"); + + let tool = TaskUpdateTool::for_branch(task_store, AgentId::from("agent-test")) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskUpdateArgs { + task_number: created.task_number as i32, + title: None, + description: None, + status: Some("done".to_string()), + priority: None, + subtasks: None, + metadata: None, + complete_subtask: None, + worker_id: None, + approved_by: None, + }) + .await + .expect("task update should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!( + event.summary, + format!("Task #{} completed", created.task_number) + ); + } + + #[tokio::test] + async fn task_update_keeps_task_update_event_when_task_was_already_done() { + let task_store = Arc::new(setup_test_store().await); + let working_memory = setup_working_memory().await; + + let created = task_store + .create(crate::tasks::CreateTaskInput { + owner_agent_id: "agent-test".to_string(), + assigned_agent_id: "agent-test".to_string(), + title: "Review merged changes".to_string(), + description: None, + status: TaskStatus::Done, + priority: TaskPriority::Medium, + subtasks: Vec::new(), + metadata: serde_json::json!({}), + source_memory_id: None, + created_by: "branch".to_string(), + }) + .await + .expect("task should be created"); + + let tool = TaskUpdateTool::for_branch(task_store, AgentId::from("agent-test")) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskUpdateArgs { + task_number: created.task_number as i32, + title: Some("Review merged changes carefully".to_string()), + description: None, + status: None, + priority: None, + subtasks: None, + metadata: None, + complete_subtask: None, + worker_id: None, + approved_by: None, + }) + .await + .expect("task update should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::TaskUpdate); + assert_eq!( + event.summary, + format!("Task #{} updated to done", created.task_number) + ); + } +}