diff --git a/crates/forge_app/src/compact.rs b/crates/forge_app/src/compact.rs index 2c6e4a69eb..30ed4b375e 100644 --- a/crates/forge_app/src/compact.rs +++ b/crates/forge_app/src/compact.rs @@ -36,6 +36,199 @@ impl Compactor { } impl Compactor { + pub fn compact_range(&self, context: &Context) -> anyhow::Result> { + let Some(breakpoint) = self.find_last_breakpoint(context, &self.compact) else { + tracing::debug!("No compaction needed"); + return Ok(None); + }; + + if breakpoint >= context.messages.len() { + return Err(anyhow::anyhow!("BUG(compaction): breakpoint out of bounds")); + } + + let summary = self.render_summary_frame(&context.messages[0..=breakpoint])?; + + info!( + breakpoint = breakpoint, + "Created context compaction summary" + ); + + let mut compacted_context = + Context::default().add_message(ContextMessage::user(summary, None)); + + // Find the first message after breakpoint that isn't an orphaned tool result + // Tool results are orphaned if their corresponding tool call is before the + // breakpoint + let mut remaining_start = breakpoint + 1; + + // Skip any tool results that appear immediately after the breakpoint + // These are orphaned because their tool calls were compacted away + while remaining_start < context.messages.len() { + if context.messages[remaining_start].has_tool_result() { + tracing::debug!( + msg_idx = remaining_start, + "Skipping orphaned tool result after compaction breakpoint" + ); + remaining_start += 1; + } else { + break; + } + } + + // Add the remaining messages after skipping orphaned tool results + for entry in context.messages.iter().skip(remaining_start) { + compacted_context = compacted_context.add_message(entry.message.clone()); + } + + // Validate no orphaned tool results remain in the compacted context + // Tool results without corresponding tool calls should not exist + self.validate_tool_pairs(&compacted_context)?; + + tracing::info!( + original_messages = context.messages.len(), + compacted_messages = compacted_context.messages.len(), + "Context compacted" + ); + + Ok(Some(compacted_context)) + } + + /// Validates that all tool results have corresponding tool calls in the + /// context. + fn validate_tool_pairs(&self, context: &Context) -> anyhow::Result<()> { + let mut tool_call_ids = std::collections::HashSet::new(); + + for msg in &context.messages { + match &**msg { + ContextMessage::Text(text) => { + // Collect all tool call IDs from this message + if let Some(tool_calls) = &text.tool_calls { + for tool_call in tool_calls { + if let Some(call_id) = &tool_call.call_id { + tool_call_ids.insert(call_id.clone()); + } + } + } + } + ContextMessage::Tool(result) => { + // Check if this tool result has a corresponding tool call + if let Some(call_id) = &result.call_id + && !tool_call_ids.contains(call_id) + { + return Err(anyhow::anyhow!( + "Orphaned tool result: tool_result references call_id {:?} \ + but no corresponding tool call found in compacted context", + call_id + )); + } + } + _ => {} + } + } + + Ok(()) + } +} + +impl Compactor { + /// Finds the last breakpoint in the context where compaction should occur. + /// + /// Iterates through messages tracking token counts, turn counts, and + /// message counts without cloning. Returns the index of the last + /// message before the most recent compaction threshold breach. + /// + /// # Arguments + /// + /// * `context` - The context to analyze for breakpoints + /// * `compact_config` - The compaction configuration containing thresholds + /// + /// # Returns + /// + /// The index of the last breakpoint, or None if no compaction is needed + fn find_last_breakpoint(&self, context: &Context, compact_config: &Compact) -> Option { + if context.messages.is_empty() { + return None; + } + + let mut last_bp: Option = None; + + // Track counts without cloning + let mut token_count = 0; + let mut turn_count = 0; + let mut message_count = 0; + + for (i, entry) in context.messages.iter().enumerate() { + // Update counts + token_count += entry.token_count_approx(); + message_count += 1; + + let is_user = entry.has_role(forge_domain::Role::User); + if is_user { + turn_count += 1; + } + + // Check if we should compact based on current accumulated state + let should_compact = { + let token_check = compact_config + .token_threshold + .map(|threshold| token_count >= threshold) + .unwrap_or(false); + + let turn_check = compact_config + .turn_threshold + .map(|threshold| turn_count >= threshold) + .unwrap_or(false); + + let message_check = compact_config + .message_threshold + .map(|threshold| message_count >= threshold) + .unwrap_or(false); + + let turn_end_check = compact_config + .on_turn_end + .map(|enabled| enabled && is_user) + .unwrap_or(false); + + token_check || turn_check || message_check || turn_end_check + }; + + if should_compact { + last_bp = Some(i); + // Reset counts for next accumulation window + token_count = 0; + turn_count = 0; + message_count = 0; + } + } + + last_bp + } + + fn render_summary_frame( + &self, + messages: &[forge_domain::MessageEntry], + ) -> anyhow::Result { + // Filter out droppable messages from compaction + let compaction_sequence: Vec<_> = messages + .iter() + .filter(|msg| !msg.is_droppable()) + .cloned() + .collect(); + + // Create a temporary context for the sequence to generate summary + let sequence_context = Context::default().messages(compaction_sequence); + + // Generate context summary with tool call information + let context_summary = ContextSummary::from(&sequence_context); + + // Apply transformers to reduce redundant operations and clean up + let context_summary = self.transform(context_summary); + + TemplateEngine::default().render( + "forge-partial-summary-frame.md", + &serde_json::json!({"messages": context_summary.messages}), + ) + } /// Apply compaction to the context if requested. pub fn compact(&self, context: Context, max: bool) -> anyhow::Result { let eviction = CompactionStrategy::evict(self.compact.eviction_window); @@ -62,6 +255,15 @@ impl Compactor { ) -> anyhow::Result { let (start, end) = sequence; + // Generate summary for the sequence + let summary = self.render_summary_frame(&context.messages[start..=end])?; + + info!( + sequence_start = start, + sequence_end = end, + "Created context compaction summary" + ); + // The sequence from the original message that needs to be compacted // Filter out droppable messages (e.g., attachments) from compaction let compaction_sequence = context.messages[start..=end] @@ -70,27 +272,6 @@ impl Compactor { .cloned() .collect::>(); - // Create a temporary context for the sequence to generate summary - let sequence_context = Context::default().messages(compaction_sequence.clone()); - - // Generate context summary with tool call information - let context_summary = ContextSummary::from(&sequence_context); - - // Apply transformers to reduce redundant operations and clean up - let context_summary = self.transform(context_summary); - - info!( - sequence_start = sequence.0, - sequence_end = sequence.1, - sequence_length = compaction_sequence.len(), - "Created context compaction summary" - ); - - let summary = TemplateEngine::default().render( - "forge-partial-summary-frame.md", - &serde_json::json!({"messages": context_summary.messages}), - )?; - // Extended thinking reasoning chain preservation // // Extended thinking requires the first assistant message to have @@ -150,7 +331,7 @@ impl Compactor { mod tests { use std::path::PathBuf; - use forge_domain::MessageEntry; + use forge_domain::{MessageEntry, MessagePattern}; use pretty_assertions::assert_eq; use super::*; @@ -161,6 +342,163 @@ mod tests { env.cwd(std::path::PathBuf::from("/test/working/dir")) } + /// Helper to create context from SAURT pattern + /// s = system, a = assistant, u = user, r = tool result, t = tool call + fn ctx(pattern: &str) -> Context { + MessagePattern::new(pattern).build() + } + + #[test] + fn test_find_last_breakpoint_no_messages() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().token_threshold(1000usize); + + let fixture = Context::default(); + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + let expected = None; + + assert_eq!(actual, expected); + } + + #[test] + fn test_find_last_breakpoint_no_threshold_exceeded() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().token_threshold(100000usize); // Very high threshold + + let fixture = ctx("uaua"); // user, assistant, user, assistant + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + let expected = None; + + assert_eq!(actual, expected); + } + + #[test] + fn test_find_last_breakpoint_single_breakpoint() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().message_threshold(2usize); + + let fixture = ctx("uaua"); // user, assistant, user, assistant + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + let expected = Some(3); // Threshold of 2 reached at index 1, continues to add until hitting again at index 3 + + assert_eq!(actual, expected); + } + + #[test] + fn test_find_last_breakpoint_multiple_breakpoints() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().message_threshold(2usize); + + let fixture = ctx("uauaua"); // user, assistant, user, assistant, user, assistant + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + let expected = Some(5); // Last breakpoint at index 5 + + assert_eq!(actual, expected); + } + + #[test] + fn test_find_last_breakpoint_with_token_threshold() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().token_threshold(50usize); // Lower threshold to ensure trigger + + let mut fixture = Context::default(); + for i in 0..10 { + fixture = fixture + .add_message(ContextMessage::user( + format!("Message {} with substantial content to increase token count. This message contains enough text to make sure we hit the compaction threshold quickly.", i), + None, + )) + .add_message(ContextMessage::assistant( + format!("Response {} with substantial content to increase token count. This response also contains enough text to ensure we accumulate sufficient tokens.", i), + None, + None, + )); + } + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + + // Should find at least one breakpoint + assert!( + actual.is_some(), + "Expected to find a breakpoint with token threshold" + ); + } + + #[test] + fn test_find_last_breakpoint_with_turn_threshold() { + let environment = test_environment(); + let compactor = Compactor::new(Compact::new(), environment); + let compact_config = Compact::new().turn_threshold(1usize); // Trigger after 1 user message + + let fixture = ctx("uauaua"); // user, assistant, user, assistant, user, assistant + + let actual = compactor.find_last_breakpoint(&fixture, &compact_config); + let expected = Some(4); // With 1 turn threshold, breaks after each user message (indices 0, 2, 4) + + assert_eq!(actual, expected); + } + + #[test] + fn test_compact_range_skips_orphaned_tool_results() { + use forge_domain::{ToolCallFull, ToolCallId, ToolName}; + use serde_json::json; + + let environment = test_environment(); + let compactor = Compactor::new(Compact::new().message_threshold(2usize), environment); + + // Create context: User -> Assistant with tool call -> Tool result -> User -> + // Assistant When we compact at message threshold 2, the tool call gets + // removed but tool result remains + let tool_call = ToolCallFull { + name: ToolName::new("read"), + call_id: Some(ToolCallId::new("call_123")), + arguments: json!({"path": "/test/path"}).into(), + }; + + let tool_result = forge_domain::ToolResult::new(ToolName::new("read")) + .call_id(ToolCallId::new("call_123")) + .success(json!({"content": "File content"}).to_string()); + + let fixture = Context::default() + .add_message(ContextMessage::user("User 1", None)) + .add_message(ContextMessage::assistant( + "Response 1", + None, + Some(vec![tool_call]), + )) + .add_message(ContextMessage::tool_result(tool_result)) + .add_message(ContextMessage::user("User 2", None)) + .add_message(ContextMessage::assistant("Response 2", None, None)); + + // Compact should skip the orphaned tool result + let result = compactor.compact_range(&fixture); + + assert!( + result.is_ok(), + "Compaction should succeed and handle orphaned tool results: {:?}", + result + ); + + let compacted = result.unwrap().expect("Compaction should return Some"); + + // Verify no tool results exist in the compacted context + // They should have been skipped as orphaned + for msg in &compacted.messages { + if let ContextMessage::Tool(_) = &**msg { + panic!("Compacted context should not contain orphaned tool results"); + } + } + } + #[test] fn test_compress_single_sequence_preserves_only_last_reasoning() { use forge_domain::ReasoningFull; diff --git a/crates/forge_app/src/orch.rs b/crates/forge_app/src/orch.rs index c2a4b36088..c3cf9f7430 100644 --- a/crates/forge_app/src/orch.rs +++ b/crates/forge_app/src/orch.rs @@ -12,8 +12,8 @@ use tracing::{debug, info, warn}; use crate::TemplateEngine; use crate::agent::AgentService; -use crate::compact::Compactor; use crate::title_generator::TitleGenerator; +use crate::transformers::CompactionTransformer; #[derive(Clone, Setters)] #[setters(into)] @@ -150,12 +150,17 @@ impl Orchestrator { reasoning_supported: bool, ) -> anyhow::Result { let tool_supported = self.is_tool_supported()?; + let mut transformers = DefaultTransformation::default() .pipe(SortTools::new()) .pipe(TransformToolCalls::new().when(|_| !tool_supported)) .pipe(ImageHandling::new()) .pipe(DropReasoningDetails.when(|_| !reasoning_supported)) - .pipe(ReasoningNormalizer.when(|_| reasoning_supported)); + .pipe(ReasoningNormalizer.when(|_| reasoning_supported)) + .pipe_some(self.agent.compact.clone().map(|compact| { + CompactionTransformer::new(compact.clone(), self.environment.clone()) + })); + let response = self .services .chat_agent( @@ -167,21 +172,6 @@ impl Orchestrator { response.into_full(!tool_supported).await } - /// Checks if compaction is needed and performs it if necessary - fn check_and_compact(&self, context: &Context) -> anyhow::Result> { - let agent = &self.agent; - // Estimate token count for compaction decision - let token_count = context.token_count(); - if agent.compact.should_compact(context, *token_count) { - info!(agent_id = %agent.id, "Compaction needed"); - Compactor::new(agent.compact.clone(), self.environment.clone()) - .compact(context.clone(), false) - .map(Some) - } else { - debug!(agent_id = %agent.id, "Compaction not needed"); - Ok(None) - } - } // Create a helper method with the core functionality pub async fn run(&mut self) -> anyhow::Result<()> { @@ -249,17 +239,6 @@ impl Orchestrator { }), ).await?; - // FIXME: Add a unit test in orch spec, to guarantee that compaction is - // triggered after receiving the response Trigger compaction after - // making a request NOTE: Ideally compaction should be implemented - // as a transformer - if let Some(c_context) = self.check_and_compact(&context)? { - info!(agent_id = %agent.id, "Using compacted context from execution"); - context = c_context; - } else { - debug!(agent_id = %agent.id, "No compaction was needed"); - } - info!( conversation_id = %self.conversation.id, conversation_length = context.messages.len(), diff --git a/crates/forge_app/src/transformers/compaction.rs b/crates/forge_app/src/transformers/compaction.rs index 2359288f97..9e923affe5 100644 --- a/crates/forge_app/src/transformers/compaction.rs +++ b/crates/forge_app/src/transformers/compaction.rs @@ -1,48 +1,107 @@ -use std::path::PathBuf; - -use forge_domain::{ContextSummary, Role, Transformer}; - -use crate::transformers::dedupe_role::DedupeRole; -use crate::transformers::drop_role::DropRole; -use crate::transformers::strip_working_dir::StripWorkingDir; -use crate::transformers::trim_context_summary::TrimContextSummary; - -/// Composes all compaction transformers into a single transformation pipeline. -/// -/// This transformer applies a series of transformations to reduce context size -/// and improve context quality: -/// -/// 1. Drops all System role messages -/// 2. Deduplicates consecutive User role messages -/// 3. Trims context by keeping only the last operation per file path -/// 4. Deduplicates consecutive Assistant content blocks -/// 5. Strips working directory prefix from file paths -/// -/// The transformations are applied in sequence using the pipe combinator. -pub struct SummaryTransformer { - working_dir: PathBuf, +use forge_domain::{Context, Environment, Transformer}; + +use crate::compact::Compactor; + +/// Transformer that compacts context when necessary before sending to LLM +pub struct CompactionTransformer { + compactor: Compactor, } -impl SummaryTransformer { - /// Creates a new Compaction transformer with the specified working - /// directory. +impl CompactionTransformer { + /// Creates a new CompactionTransformer /// /// # Arguments /// - /// * `working_dir` - The working directory path to strip from file paths - pub fn new(working_dir: impl Into) -> Self { - Self { working_dir: working_dir.into() } + /// * `compact` - The compaction configuration + /// * `env` - The environment for the compactor + pub fn new(compact: forge_domain::Compact, env: Environment) -> Self { + Self { compactor: Compactor::new(compact, env) } + } +} + +impl Transformer for CompactionTransformer { + type Value = Context; + + fn transform(&mut self, context: Self::Value) -> Self::Value { + match self.compactor.compact_range(&context) { + Ok(Some(compacted_context)) => { + tracing::debug!("Compaction completed"); + compacted_context + } + Ok(None) => { + tracing::debug!("No compaction needed"); + context + } + Err(e) => { + tracing::error!( + error = ?e, + "Compaction failed, using original context" + ); + context + } + } } } -impl Transformer for SummaryTransformer { - type Value = ContextSummary; +#[cfg(test)] +mod tests { + use fake::{Fake, Faker}; + use forge_domain::{Compact, Environment, MessagePattern}; + use pretty_assertions::assert_eq; + + use super::*; + + fn test_environment() -> Environment { + let env: Environment = Faker.fake(); + env.cwd(std::path::PathBuf::from("/test/working/dir")) + } + + fn test_compact() -> Compact { + Compact::new() + .message_threshold(10usize) // Trigger compaction after 10 messages + .eviction_window(0.5) + .retention_window(2usize) + } + + /// Helper to create context from SAURT pattern + /// s = system, a = assistant, u = user, r = tool result, t = tool call + fn ctx(pattern: &str) -> Context { + MessagePattern::new(pattern).build() + } + + #[test] + fn test_no_compaction_for_small_context() { + let compact = test_compact(); + let environment = test_environment(); + + let fixture = ctx("ua"); // user, assistant + + let mut transformer = CompactionTransformer::new(compact, environment); + let actual = transformer.transform(fixture.clone()); + + assert_eq!(actual.messages.len(), fixture.messages.len()); + } + + #[test] + fn test_compaction_with_threshold_exceeded() { + let compact = test_compact(); + let environment = test_environment(); + + // Create a pattern with many messages to exceed threshold + // Using the SAURT notation: 50 user-assistant pairs + let pattern = "ua".repeat(50); + let fixture = ctx(&pattern); + + let mut transformer = CompactionTransformer::new(compact, environment); + let actual = transformer.transform(fixture.clone()); - fn transform(&mut self, context_summary: Self::Value) -> Self::Value { - DropRole::new(Role::System) - .pipe(DedupeRole::new(Role::User)) - .pipe(TrimContextSummary) - .pipe(StripWorkingDir::new(self.working_dir.clone())) - .transform(context_summary) + // Real compactor should reduce the message count when compaction occurs + // The exact count depends on the compaction logic, but it should be less + assert!( + actual.messages.len() < fixture.messages.len(), + "Expected compaction to reduce message count from {} to less, but got {}", + fixture.messages.len(), + actual.messages.len() + ); } } diff --git a/crates/forge_app/src/transformers/mod.rs b/crates/forge_app/src/transformers/mod.rs index ef434b62dd..2ab9e5d2aa 100644 --- a/crates/forge_app/src/transformers/mod.rs +++ b/crates/forge_app/src/transformers/mod.rs @@ -2,6 +2,8 @@ mod compaction; mod dedupe_role; mod drop_role; mod strip_working_dir; +mod summary; mod trim_context_summary; -pub use compaction::SummaryTransformer; +pub use compaction::CompactionTransformer; +pub use summary::SummaryTransformer; diff --git a/crates/forge_app/src/transformers/summary.rs b/crates/forge_app/src/transformers/summary.rs new file mode 100644 index 0000000000..2359288f97 --- /dev/null +++ b/crates/forge_app/src/transformers/summary.rs @@ -0,0 +1,48 @@ +use std::path::PathBuf; + +use forge_domain::{ContextSummary, Role, Transformer}; + +use crate::transformers::dedupe_role::DedupeRole; +use crate::transformers::drop_role::DropRole; +use crate::transformers::strip_working_dir::StripWorkingDir; +use crate::transformers::trim_context_summary::TrimContextSummary; + +/// Composes all compaction transformers into a single transformation pipeline. +/// +/// This transformer applies a series of transformations to reduce context size +/// and improve context quality: +/// +/// 1. Drops all System role messages +/// 2. Deduplicates consecutive User role messages +/// 3. Trims context by keeping only the last operation per file path +/// 4. Deduplicates consecutive Assistant content blocks +/// 5. Strips working directory prefix from file paths +/// +/// The transformations are applied in sequence using the pipe combinator. +pub struct SummaryTransformer { + working_dir: PathBuf, +} + +impl SummaryTransformer { + /// Creates a new Compaction transformer with the specified working + /// directory. + /// + /// # Arguments + /// + /// * `working_dir` - The working directory path to strip from file paths + pub fn new(working_dir: impl Into) -> Self { + Self { working_dir: working_dir.into() } + } +} + +impl Transformer for SummaryTransformer { + type Value = ContextSummary; + + fn transform(&mut self, context_summary: Self::Value) -> Self::Value { + DropRole::new(Role::System) + .pipe(DedupeRole::new(Role::User)) + .pipe(TrimContextSummary) + .pipe(StripWorkingDir::new(self.working_dir.clone())) + .transform(context_summary) + } +} diff --git a/crates/forge_domain/src/transformer/mod.rs b/crates/forge_domain/src/transformer/mod.rs index 0916334673..f28cf2ee29 100644 --- a/crates/forge_domain/src/transformer/mod.rs +++ b/crates/forge_domain/src/transformer/mod.rs @@ -9,6 +9,10 @@ pub trait Transformer: Sized { Pipe(self, other) } + fn pipe_some(self, other: Option) -> PipeSome { + PipeSome(self, other) + } + fn when bool>(self, cond: F) -> Cond where Self: Sized, @@ -72,6 +76,23 @@ where } } +pub struct PipeSome(A, Option); + +impl Transformer for PipeSome +where + A: Transformer, + B: Transformer, +{ + type Value = V; + + fn transform(&mut self, value: Self::Value) -> Self::Value { + match self.1 { + Some(ref mut other) => other.transform(self.0.transform(value)), + None => self.0.transform(value), + } + } +} + // Re-export specific transformers mod drop_reasoning_details; mod image_handling;