diff --git a/crates/cli/src/tui.rs b/crates/cli/src/tui.rs index 6115e1c..1fb56fb 100644 --- a/crates/cli/src/tui.rs +++ b/crates/cli/src/tui.rs @@ -712,28 +712,17 @@ fn print_help() { /// event, returning the sandbox id. Returns `None` if no sandbox has been /// created yet (e.g. nothing has been chatted with). async fn latest_sandbox_id(conversation: &dyn HarnessConversation) -> Result> { - let result = conversation - .exoharness_handle() - .get_events(Some(EventQuery { - cursor: None, - direction: Some(EventQueryDirection::Desc), - limit: Some(1), - session_id: None, - turn_id: None, - types: Some(vec![EventKind::SANDBOX_CREATED]), - })) - .await?; - let Some(event) = result.events.into_iter().next() else { - return Ok(None); - }; - match event.data { - EventData::SandboxCreated { sandbox_id, .. } => Ok(Some(sandbox_id)), - other => anyhow::bail!( - "type-filtered query for {} returned unexpected variant {}", - EventKind::SANDBOX_CREATED.as_str(), - other.kind().as_str(), - ), - } + executor::first_matching_event( + conversation.exoharness_handle().as_ref(), + EventKind::SANDBOX_CREATED, + EventQueryDirection::Desc, + 1, + |data| match data { + EventData::SandboxCreated { sandbox_id, .. } => Some(sandbox_id), + _ => None, + }, + ) + .await } /// All snapshots taken in the conversation, oldest-first. Each tuple is diff --git a/crates/cli/tests/integration_chat.rs b/crates/cli/tests/integration_chat.rs index 3c08aae..06bc062 100644 --- a/crates/cli/tests/integration_chat.rs +++ b/crates/cli/tests/integration_chat.rs @@ -243,25 +243,172 @@ async fn conversation_send_round_trips_through_real_sandbox_and_mocked_openai() ); if provider == SandboxProvider::Docker { - let leftover_containers = Command::new("docker") - .args([ - "ps", - "-aq", - "--filter", - "label=exo.sandbox.owner-pid", - "--filter", - "status=exited", - ]) - .output() - .expect("docker ps"); - let stdout = String::from_utf8_lossy(&leftover_containers.stdout); - let stale = stdout - .lines() - .filter(|l| !l.trim().is_empty()) - .collect::>(); - assert!( - stale.is_empty(), - "expected zero leftover Exited exo containers after binary exit; found: {stale:?}" + // Drop stops (not rm's) the container so the next process can resume it. + let leftover = list_exo_containers_for_conversation(&conv_dir); + assert_eq!( + leftover.len(), + 1, + "expected exactly one stopped exo container after binary exit (resume target); found: {leftover:?}" ); + + // Test cleanup: don't leak the resume target onto the docker host. + for id in &leftover { + let _ = Command::new("docker").args(["rm", "-f", id]).output(); + } + } +} + +/// Docker container IDs labelled for the conversation at `conv_dir`. +fn list_exo_containers_for_conversation(conv_dir: &std::path::Path) -> Vec { + let conv_id = conv_dir + .file_name() + .and_then(|s| s.to_str()) + .expect("conv dir name"); + let key_prefix = format!("conversation:{conv_id}:"); + let output = Command::new("docker") + .args([ + "ps", + "-a", + "--filter", + "label=exo.sandbox.key", + "--format", + "{{json .}}", + ]) + .output() + .expect("docker ps"); + String::from_utf8_lossy(&output.stdout) + .lines() + .filter(|line| !line.is_empty()) + .filter_map(|line| { + let row: Value = serde_json::from_str(line).expect("docker ps row is json"); + let id = row.get("ID")?.as_str()?.to_string(); + let labels = row.get("Labels")?.as_str()?; + let key = labels + .split(',') + .filter_map(|kv| kv.split_once('=')) + .find_map(|(k, v)| (k == "exo.sandbox.key").then_some(v))?; + key.starts_with(&key_prefix).then_some(id) + }) + .collect() +} + +/// Two separate `conversation send` invocations on the same conversation must +/// hit the same docker container — the cross-process resume path. +#[tokio::test] +#[ignore = "spawns real exo binary + real sandbox + wiremock; run with cargo test -- --ignored"] +async fn cross_process_send_resumes_the_same_sandbox_container() { + let provider = SandboxProvider::from_env(); + if provider != SandboxProvider::Docker { + eprintln!("cross-process resume test only meaningful on docker; skipping"); + return; + } + if !provider.runtime_available() { + eprintln!("docker not available on this runner; skipping"); + return; + } + + let root_dir = TempDir::new().expect("tempdir for --root"); + let xdg_dir = TempDir::new().expect("tempdir for XDG_CONFIG_HOME"); + let root = root_dir.path().to_string_lossy().into_owned(); + let xdg = xdg_dir.path().to_string_lossy().into_owned(); + + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/responses")) + .respond_with(ResponseTemplate::new(200).set_body_json(canned_response_body())) + .mount(&mock_server) + .await; + + run_exo( + &["secret", "set", "test-key", "--env", "OPENAI_API_KEY"], + &root, + &xdg, + ); + run_exo( + &[ + "model", + "register", + "gpt-test", + "--secret", + "test-key", + "--base-url", + &mock_server.uri(), + ], + &root, + &xdg, + ); + run_exo( + &[ + "agent", + "create", + "--slug", + "test-agent", + "--model", + "gpt-test", + "Integration Test Agent", + ], + &root, + &xdg, + ); + run_exo( + &["conversation", "create", "test-agent", "first"], + &root, + &xdg, + ); + + // First send provisions the warm sandbox. + run_exo( + &[ + "conversation", + "send", + "test-agent", + "first", + "first message", + ], + &root, + &xdg, + ); + + // Second send: fresh exo process; should `try_resume` the same container. + run_exo( + &[ + "conversation", + "send", + "test-agent", + "first", + "second message", + ], + &root, + &xdg, + ); + + let conv_dir = root_dir + .path() + .join("exoharness/agents") + .read_dir() + .expect("agents dir") + .next() + .expect("agent") + .unwrap() + .path() + .join("conversations") + .read_dir() + .expect("conversations dir") + .next() + .expect("conversation") + .unwrap() + .path(); + + let containers = list_exo_containers_for_conversation(&conv_dir); + assert_eq!( + containers.len(), + 1, + "expected exactly one docker container after two cross-process sends \ + (resume should reuse, not create new); found: {containers:?}" + ); + + // Cleanup. + for id in &containers { + let _ = Command::new("docker").args(["rm", "-f", id]).output(); } } diff --git a/crates/cli/tests/lifecycle_resume.rs b/crates/cli/tests/lifecycle_resume.rs new file mode 100644 index 0000000..8f27c58 --- /dev/null +++ b/crates/cli/tests/lifecycle_resume.rs @@ -0,0 +1,439 @@ +//! End-to-end tests for the 3-tier resume fallback in `ensure_shell_sandbox`: +//! +//! 1. `try_resume` finds a labelled container → `docker start` + attach. +//! Same container, same `sandbox_id`. +//! 2. Container is gone but a snapshot exists → `acquire_from_snapshot`. +//! New docker id, same `sandbox_id`, filesystem restored. +//! 3. No container, no snapshot → `create_sandbox` fresh. +//! New docker id, new `sandbox_id`. +//! +//! Each test simulates two exo processes against the same `--root` and checks +//! which tier fired. Docker only; LocalProcess has no cross-process identity. + +use std::path::Path; +use std::process::Command; +use std::sync::Arc; + +use executor::{AgentConfig, AgentHarnessKind, BasicToolRuntime, ConversationConfig, ToolRuntime}; +use exoharness::{ + AgentId, BasicExoHarness, BasicExoHarnessConfig, ConversationHandle, ConversationId, EventData, + EventKind, EventQuery, EventQueryDirection, ExoHarness, NewAgentRequest, + NewConversationRequest, RunInSandboxRequest, SandboxBackendChoice, SandboxId, + SandboxProcessParts, SandboxProvider, SecretBackendChoice, +}; +use futures::io::AsyncReadExt; +use tempfile::TempDir; + +const SANDBOX_IMAGE: &str = "docker.io/library/ubuntu:24.04"; + +// Tier 1: harness #1 drops (Drop stops, doesn't rm) → harness #2 finds the +// stopped container via labels and `docker start`s it. Same container, marker +// survives. + +#[tokio::test] +#[ignore = "spawns real docker container; run with `cargo test -- --ignored`"] +async fn tier_1_stopped_container_is_resumed_same_id() { + if !preflight() { + return; + } + let root_dir = TempDir::new().expect("tempdir"); + + let (agent_id, conv_id, sandbox_id, container_id) = { + let harness = make_harness(root_dir.path()).await; + let (agent_id, conv_id) = make_agent_and_conv(&harness, "tier-1-agent").await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let sandbox_id = latest_sandbox_id(conv.as_ref()) + .await + .expect("ensure_shell_sandbox should have created a sandbox"); + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + assert_eq!( + containers.len(), + 1, + "round 1 should have exactly one container, got: {containers:?}" + ); + let container_id = containers.into_iter().next().unwrap(); + + let (rc, _, _) = + exec_shell(conv.as_ref(), &sandbox_id, "echo 'tier-1-marker' > /tmp/x").await; + assert_eq!(rc, 0, "writing marker should succeed"); + + drop(conv); + (agent_id, conv_id, sandbox_id, container_id) + }; + + // Drop stops, doesn't rm. + let state = docker_container_state(&container_id); + assert_eq!( + state, "exited", + "container should be stopped (not rm'd) after harness drop; state={state:?}" + ); + + { + let harness = make_harness(root_dir.path()).await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + assert_eq!( + containers, + vec![container_id.clone()], + "round 2 should reuse the same container ID, not create a new one" + ); + + let (rc, stdout, _) = exec_shell(conv.as_ref(), &sandbox_id, "cat /tmp/x").await; + assert_eq!(rc, 0); + assert_eq!(stdout.trim(), "tier-1-marker"); + + let created = count_events_of_type(conv.as_ref(), "sandbox_created").await; + assert_eq!(created, 1, "tier 1 should not create a second sandbox"); + } + + rm_container(&container_id); +} + +// Tier 2: harness #1 takes a snapshot, then container is rm'd. Harness #2 +// finds no container, falls through to `acquire_from_snapshot`. New docker +// container, same `sandbox_id`, marker survives via the snapshot. + +#[tokio::test] +#[ignore = "spawns real docker container; run with `cargo test -- --ignored`"] +async fn tier_2_gone_container_with_snapshot_restores() { + if !preflight() { + return; + } + let root_dir = TempDir::new().expect("tempdir"); + + let (agent_id, conv_id, sandbox_id, container_id_round1) = { + let harness = make_harness(root_dir.path()).await; + let (agent_id, conv_id) = make_agent_and_conv(&harness, "tier-2-agent").await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let sandbox_id = latest_sandbox_id(conv.as_ref()).await.unwrap(); + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + assert_eq!(containers.len(), 1); + let container_id = containers.into_iter().next().unwrap(); + + exec_shell(conv.as_ref(), &sandbox_id, "echo 'tier-2-marker' > /tmp/x").await; + + conv.snapshot_sandbox(sandbox_id.clone()) + .await + .expect("snapshot_sandbox should succeed while container is live"); + + drop(conv); + (agent_id, conv_id, sandbox_id, container_id) + }; + + rm_container(&container_id_round1); + assert!( + list_containers_for_sandbox(&conv_id, &sandbox_id).is_empty(), + "container should be gone after rm", + ); + + { + let harness = make_harness(root_dir.path()).await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + assert_eq!(containers.len(), 1); + assert_ne!( + containers[0], container_id_round1, + "tier 2 should create a NEW container (restored from snapshot)" + ); + + let (rc, stdout, _) = exec_shell(conv.as_ref(), &sandbox_id, "cat /tmp/x").await; + assert_eq!(rc, 0); + assert_eq!(stdout.trim(), "tier-2-marker"); + + assert_eq!( + count_events_of_type(conv.as_ref(), "sandbox_created").await, + 1, + "tier 2 must not create a new sandbox" + ); + assert!( + count_events_of_type(conv.as_ref(), "sandbox_started").await >= 1, + "tier 2 should emit a SandboxStarted event when restoring" + ); + + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + for c in containers { + rm_container(&c); + } + } +} + +// Tier 3: container rm'd and no snapshot was ever taken. Harness #2 has +// nothing to resume from, falls through to `create_sandbox`. New `sandbox_id`, +// fresh container, no surviving state. + +#[tokio::test] +#[ignore = "spawns real docker container; run with `cargo test -- --ignored`"] +async fn tier_3_gone_container_without_snapshot_creates_fresh() { + if !preflight() { + return; + } + let root_dir = TempDir::new().expect("tempdir"); + + let (agent_id, conv_id, sandbox_id_round1, container_id_round1) = { + let harness = make_harness(root_dir.path()).await; + let (agent_id, conv_id) = make_agent_and_conv(&harness, "tier-3-agent").await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let sandbox_id = latest_sandbox_id(conv.as_ref()).await.unwrap(); + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id); + assert_eq!(containers.len(), 1); + let container_id = containers.into_iter().next().unwrap(); + + exec_shell(conv.as_ref(), &sandbox_id, "echo 'tier-3-marker' > /tmp/x").await; + + drop(conv); + (agent_id, conv_id, sandbox_id, container_id) + }; + + rm_container(&container_id_round1); + + { + let harness = make_harness(root_dir.path()).await; + let conv = open_conv(&harness, &agent_id, &conv_id).await; + prepare(conv.as_ref()).await; + + let sandbox_id_round2 = latest_sandbox_id(conv.as_ref()).await.unwrap(); + assert_ne!( + sandbox_id_round2, sandbox_id_round1, + "tier 3 should create a new sandbox with a new id" + ); + + assert_eq!( + count_events_of_type(conv.as_ref(), "sandbox_created").await, + 2, + "tier 3 should record a second SandboxCreated event" + ); + + let (rc, _, _) = exec_shell(conv.as_ref(), &sandbox_id_round2, "test -f /tmp/x").await; + assert_ne!(rc, 0, "marker should NOT exist in the fresh container"); + + let containers = list_containers_for_sandbox(&conv_id, &sandbox_id_round2); + for c in containers { + rm_container(&c); + } + } +} + +// helpers + +fn preflight() -> bool { + let docker_ok = Command::new("docker") + .arg("info") + .output() + .map(|o| o.status.success()) + .unwrap_or(false); + if !docker_ok { + eprintln!("docker not available, skipping lifecycle test"); + return false; + } + let backend = std::env::var("EXO_TEST_SANDBOX_BACKEND").unwrap_or_else(|_| "docker".into()); + if backend != "docker" { + eprintln!("lifecycle test is docker-only, skipping (EXO_TEST_SANDBOX_BACKEND={backend})"); + return false; + } + true +} + +async fn make_harness(root: &Path) -> BasicExoHarness { + BasicExoHarness::new(BasicExoHarnessConfig { + root: root.to_path_buf(), + secret_backend: SecretBackendChoice::Static([9u8; 32]), + sandbox_backend: SandboxBackendChoice::Docker, + }) + .await + .expect("BasicExoHarness::new") +} + +async fn make_agent_and_conv( + harness: &BasicExoHarness, + agent_slug: &str, +) -> (AgentId, ConversationId) { + let agent = harness + .new_agent(NewAgentRequest { + slug: agent_slug.into(), + name: agent_slug.into(), + }) + .await + .expect("new_agent"); + let agent_id = agent.record().id; + let conv = agent + .new_conversation(NewConversationRequest { + slug: Some("conv".into()), + name: Some("conversation".into()), + }) + .await + .expect("new_conversation"); + let conv_id = conv.record().id; + (agent_id, conv_id) +} + +async fn open_conv( + harness: &BasicExoHarness, + agent_id: &AgentId, + conv_id: &ConversationId, +) -> Arc { + let agent = harness + .get_agent(agent_id) + .await + .expect("get_agent") + .expect("agent should exist on disk"); + agent + .get_conversation(conv_id) + .await + .expect("get_conversation") + .expect("conversation should exist on disk") +} + +fn test_agent_config() -> AgentConfig { + AgentConfig { + instructions: Vec::new(), + harness: AgentHarnessKind::Basic, + typescript: None, + enable_agent_tool_creation: false, + sandbox_image: Some(SANDBOX_IMAGE.into()), + sandbox_provider: SandboxProvider::Docker, + enable_networking: false, + model: "lifecycle-test-model".into(), + max_output_tokens: None, + max_tool_round_trips: None, + braintrust: None, + } +} + +fn test_conv_config() -> ConversationConfig { + ConversationConfig { + sandbox_image: None, + sandbox_provider: None, + // shell_program: Some(_) is the trigger for ensure_shell_sandbox. + shell_program: Some("bash".into()), + mounts: Vec::new(), + } +} + +/// Fire `BasicToolRuntime::prepare_conversation`, which is the public-ish +/// entry point that internally calls `ensure_shell_sandbox` — the +/// function that runs the 3-tier lifecycle fallback we're testing. +async fn prepare(conv: &dyn ConversationHandle) { + BasicToolRuntime + .prepare_conversation(conv, &test_agent_config(), &test_conv_config()) + .await + .expect("prepare_conversation"); +} + +async fn exec_shell( + conv: &dyn ConversationHandle, + sandbox_id: &SandboxId, + cmd: &str, +) -> (i32, String, String) { + let process = conv + .run_in_sandbox(RunInSandboxRequest { + id: sandbox_id.clone(), + command: vec!["/bin/bash".into(), "-c".into(), cmd.into()], + env: Default::default(), + }) + .await + .unwrap_or_else(|error| panic!("run_in_sandbox({cmd:?}) failed: {error:#}")); + let mut parts: SandboxProcessParts = process.into_parts(); + drop(parts.stdin); + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + let (a, b, c) = tokio::join!( + parts.stdout.read_to_end(&mut stdout), + parts.stderr.read_to_end(&mut stderr), + parts.wait, + ); + a.expect("read stdout"); + b.expect("read stderr"); + let exit_code = c.expect("wait"); + ( + exit_code, + String::from_utf8_lossy(&stdout).into_owned(), + String::from_utf8_lossy(&stderr).into_owned(), + ) +} + +async fn latest_sandbox_id(conv: &dyn ConversationHandle) -> Option { + let result = conv + .get_events(Some(EventQuery { + cursor: None, + direction: Some(EventQueryDirection::Desc), + limit: Some(50), + session_id: None, + turn_id: None, + types: Some(vec![EventKind::SANDBOX_CREATED]), + })) + .await + .ok()?; + for event in result.events { + if let EventData::SandboxCreated { sandbox_id, .. } = event.data { + return Some(sandbox_id); + } + } + None +} + +async fn count_events_of_type(conv: &dyn ConversationHandle, ty: &str) -> usize { + let mut cursor = None; + let mut count = 0; + loop { + let result = conv + .get_events(Some(EventQuery { + cursor, + direction: Some(EventQueryDirection::Asc), + limit: Some(100), + session_id: None, + turn_id: None, + types: Some(vec![EventKind::custom(ty.to_string())]), + })) + .await + .expect("get_events"); + let events_empty = result.events.is_empty(); + count += result.events.len(); + if events_empty || result.cursor.is_none() { + break; + } + cursor = result.cursor; + } + count +} + +fn list_containers_for_sandbox(conv_id: &ConversationId, sandbox_id: &SandboxId) -> Vec { + let label = format!("exo.sandbox.key=conversation:{conv_id}:{sandbox_id}"); + let output = Command::new("docker") + .args([ + "ps", + "-a", + "--filter", + &format!("label={label}"), + "--format", + "{{.ID}}", + ]) + .output() + .expect("docker ps"); + String::from_utf8_lossy(&output.stdout) + .lines() + .filter(|s| !s.trim().is_empty()) + .map(|s| s.trim().to_string()) + .collect() +} + +fn docker_container_state(id: &str) -> String { + let output = Command::new("docker") + .args(["inspect", "--format", "{{.State.Status}}", id]) + .output() + .expect("docker inspect"); + String::from_utf8_lossy(&output.stdout).trim().to_string() +} + +fn rm_container(id: &str) { + let _ = Command::new("docker").args(["rm", "-f", id]).output(); +} diff --git a/crates/executor/src/harness_tool.rs b/crates/executor/src/harness_tool.rs index 6d9ccd1..a88c214 100644 --- a/crates/executor/src/harness_tool.rs +++ b/crates/executor/src/harness_tool.rs @@ -2,8 +2,8 @@ use crate::{AgentConfig, ConversationConfig, ToolRuntime}; use async_trait::async_trait; use exoharness::{ ConversationHandle, CreateSandboxRequest, DEFAULT_SANDBOX_IMAGE, EventData, EventKind, - EventQuery, EventQueryDirection, FileSystemMount, FileSystemMountMode, Result, - RunInSandboxRequest, SandboxProvider, ToolRequest, ToolResult, + EventQueryDirection, FileSystemMount, FileSystemMountMode, Result, RunInSandboxRequest, + SandboxProvider, SnapshotId, StartSandboxRequest, ToolRequest, ToolResult, }; use futures::io::AsyncReadExt; use serde::{Deserialize, Serialize}; @@ -124,6 +124,8 @@ pub(crate) async fn ensure_shell_sandbox( return Ok(sandbox.id); }; + // Tier 1: container is still around — `run_in_sandbox` does the + // cross-process `try_resume` internally on cache miss. let healthcheck = conversation .run_in_sandbox(RunInSandboxRequest { id: sandbox.id.clone(), @@ -134,9 +136,26 @@ pub(crate) async fn ensure_shell_sandbox( if healthcheck.is_ok() { return Ok(sandbox.id); } + + // Tier 2: container is gone. If we ever took a snapshot of this + // sandbox in this conversation, restore from the latest one. + if let Some(snapshot_id) = + latest_snapshot_for_sandbox(conversation, &sandbox.id).await? + && conversation + .start_sandbox(StartSandboxRequest { + id: sandbox.id.clone(), + snapshot_id, + idle_seconds: Some(sandbox.idle_seconds), + }) + .await + .is_ok() + { + return Ok(sandbox.id); + } } } + // Tier 3: nothing reusable; create fresh. conversation .create_sandbox(CreateSandboxRequest { provider: desired_provider, @@ -149,6 +168,26 @@ pub(crate) async fn ensure_shell_sandbox( .await } +async fn latest_snapshot_for_sandbox( + conversation: &dyn ConversationHandle, + sandbox_id: &str, +) -> Result> { + exoharness::first_matching_event( + conversation, + EventKind::SANDBOX_SNAPSHOTTED, + EventQueryDirection::Desc, + 100, + |data| match data { + EventData::SandboxSnapshotted { + sandbox_id: sid, + snapshot_id, + } if sid == sandbox_id => Some(snapshot_id), + _ => None, + }, + ) + .await +} + fn normalize_mounts(mounts: &[FileSystemMount]) -> Vec { mounts .iter() @@ -178,47 +217,30 @@ async fn latest_shell_sandbox( conversation: &dyn ConversationHandle, desired_provider: SandboxProvider, ) -> Result> { - let events = conversation - .get_events(Some(EventQuery { - cursor: None, - direction: Some(EventQueryDirection::Desc), - limit: Some(50), - session_id: None, - turn_id: None, - types: Some(vec![EventKind::SANDBOX_CREATED]), - })) - .await? - .events; - - let Some(event) = events.into_iter().next() else { - return Ok(None); - }; - match event.data { - EventData::SandboxCreated { - sandbox_id, - provider, - image, - default_workdir, - file_system_mounts, - enable_networking, - idle_seconds, - } => { - if provider != desired_provider { - return Ok(None); - } - Ok(Some(ShellSandboxInfo { + exoharness::first_matching_event( + conversation, + EventKind::SANDBOX_CREATED, + EventQueryDirection::Desc, + 1, + |data| match data { + EventData::SandboxCreated { + sandbox_id, + provider, + image, + default_workdir, + file_system_mounts, + enable_networking, + idle_seconds, + } if provider == desired_provider => Some(ShellSandboxInfo { id: sandbox_id, image, default_workdir, file_system_mounts, enable_networking, idle_seconds, - })) - } - other => Err(anyhow::anyhow!( - "type-filtered query for {} returned unexpected variant {}", - EventKind::SANDBOX_CREATED.as_str(), - other.kind().as_str(), - )), - } + }), + _ => None, + }, + ) + .await } diff --git a/crates/executor/src/lib.rs b/crates/executor/src/lib.rs index f4b600f..cdbeeca 100644 --- a/crates/executor/src/lib.rs +++ b/crates/executor/src/lib.rs @@ -40,7 +40,7 @@ pub use exoharness::{ FileSystemMountMode, ForkConversationRequest, HTTP_EXOHARNESS_TRACING_TARGET, HttpExoHarness, PutSecretRequest, SANDBOX_MAIN_MOUNT_DIR, SandboxBackendChoice, SandboxId, SandboxProvider, Secret, SecretBackendChoice, SecretMetadata, SessionId, SnapshotId, StartSandboxRequest, - ToolRequest, Uuid7, serve_exoharness_http_listener, + ToolRequest, Uuid7, first_matching_event, serve_exoharness_http_listener, serve_exoharness_http_listener_with_options, }; pub use harness_basic::BasicHarness; diff --git a/crates/exoharness/src/basic.rs b/crates/exoharness/src/basic.rs index e2cdebf..a4e300e 100644 --- a/crates/exoharness/src/basic.rs +++ b/crates/exoharness/src/basic.rs @@ -656,12 +656,21 @@ impl BasicConversationHandle { return Ok(handle); } - let handle = self + // Cache miss: resume by SandboxKey, don't silently `acquire` a fresh + // container under the same logical id. + let req = sandbox_request(self.record.id, sandbox_id, sandbox); + let Some(handle) = self .harness .inner .sandbox_backend_for_provider(sandbox.provider)? - .acquire(sandbox_request(self.record.id, sandbox_id, sandbox)) - .await?; + .try_resume(req) + .await? + else { + bail!( + "sandbox {sandbox_id} no longer exists on the backend; \ + create a new one or restore from a snapshot" + ); + }; self.harness .inner .running_sandboxes diff --git a/crates/exoharness/src/basic_tests.rs b/crates/exoharness/src/basic_tests.rs index 0d204fd..6c3ed29 100644 --- a/crates/exoharness/src/basic_tests.rs +++ b/crates/exoharness/src/basic_tests.rs @@ -903,6 +903,13 @@ impl ManagedSandboxBackend for TestSandboxBackend { })) } + async fn try_resume( + &self, + _request: SandboxRequest, + ) -> crate::Result>> { + Ok(None) + } + async fn acquire_from_snapshot( &self, _request: SandboxRequest, diff --git a/crates/exoharness/src/sandbox.rs b/crates/exoharness/src/sandbox.rs index 006b1dd..a0f3a91 100644 --- a/crates/exoharness/src/sandbox.rs +++ b/crates/exoharness/src/sandbox.rs @@ -133,8 +133,21 @@ pub trait ManagedSandboxHandle: Send + Sync { #[async_trait] pub trait ManagedSandboxBackend: Send + Sync { + /// Bring up a sandbox for `request.key`. Reuses an in-process warm + /// sandbox if one matches; otherwise creates fresh from + /// `request.spec.image`. Cross-process reuse goes through + /// [`Self::try_resume`]. async fn acquire(&self, request: SandboxRequest) -> Result>; + /// Reattach to a sandbox from a previous exo process (e.g. a stopped + /// docker container labelled with `exo.sandbox.key`). Starts it if + /// stopped. Returns `Ok(None)` if no match. Backends with no + /// cross-process identity (local-process) always return `Ok(None)`. + async fn try_resume( + &self, + request: SandboxRequest, + ) -> Result>>; + /// Acquire a sandbox initialised from a previously-captured snapshot. /// The request is honoured for mounts, network, lifecycle, etc., but the /// container's filesystem is sourced from the payload instead of @@ -351,6 +364,7 @@ impl CliContainerSandboxBackend { impl Drop for CliContainerSandboxBackend { fn drop(&mut self) { + // Stop, don't delete — let the next process `try_resume` by label. let Ok(mut warm_sandboxes) = self.warm_sandboxes.try_lock() else { return; }; @@ -359,7 +373,7 @@ impl Drop for CliContainerSandboxBackend { .map(|(_, entry)| entry.name) .collect::>(); for name in names { - cleanup_named_container_blocking(&self.container_bin, self.cli, &name); + stop_named_container_blocking(&self.container_bin, self.cli, &name); } } } @@ -422,6 +436,77 @@ impl ManagedSandboxBackend for CliContainerSandboxBackend { })) } + async fn try_resume( + &self, + request: SandboxRequest, + ) -> Result>> { + let request = self.prepare_request(request).await?; + + // No warm pool, no cross-process identity. + let Some(idle_ttl) = request.lifecycle.idle_ttl else { + return Ok(None); + }; + + // In-process warm pool first. + { + let warm_sandboxes = self.warm_sandboxes.lock().await; + if let Some(entry) = warm_sandboxes.get(&request.key) + && entry.request.spec == request.spec + { + return Ok(Some(Arc::new(WarmSandboxHandle { + id: format!("warm:{}", request.key), + cli: self.cli, + container_bin: self.container_bin.clone(), + request, + warm_sandboxes: Arc::clone(&self.warm_sandboxes), + }))); + } + } + + // Cross-process: look up by label; stale spec-hashes are reaped. + let spec_hash = sandbox_spec_hash(&request.spec); + let key_str = request.key.to_string(); + let Some(existing) = + find_resumable_container(&self.container_bin, self.cli, &key_str, &spec_hash).await? + else { + return Ok(None); + }; + + // Drop containers idle past the TTL. + if !existing.is_running + && let Some(finished_at) = existing.finished_at + && let Ok(idle_for) = SystemTime::now().duration_since(finished_at) + && idle_for > idle_ttl + { + cleanup_named_container(&self.container_bin, self.cli, &existing.name).await?; + return Ok(None); + } + + if !existing.is_running { + start_named_container(&self.container_bin, self.cli, &existing.name).await?; + } + + { + let mut warm_sandboxes = self.warm_sandboxes.lock().await; + warm_sandboxes.insert( + request.key.clone(), + WarmSandboxEntry { + name: existing.name.clone(), + request: request.clone(), + last_used_at: Instant::now(), + }, + ); + } + + Ok(Some(Arc::new(WarmSandboxHandle { + id: format!("warm:{}", request.key), + cli: self.cli, + container_bin: self.container_bin.clone(), + request, + warm_sandboxes: Arc::clone(&self.warm_sandboxes), + }))) + } + async fn acquire_from_snapshot( &self, request: SandboxRequest, @@ -621,6 +706,14 @@ impl ManagedSandboxBackend for LocalProcessSandboxBackend { })) } + async fn try_resume( + &self, + _request: SandboxRequest, + ) -> Result>> { + // Local-process "sandboxes" don't outlive the exo invocation. + Ok(None) + } + async fn acquire_from_snapshot( &self, _request: SandboxRequest, @@ -1223,16 +1316,12 @@ fn owner_pid_is_alive(pid: &str) -> bool { .is_ok_and(|status| status.success()) } -fn cleanup_named_container_blocking(container_bin: &Path, cli: ContainerCliFlavor, name: &str) { - match cli { - ContainerCliFlavor::AppleContainer => { - run_container_admin_command_blocking(container_bin, ["stop", name]); - run_container_admin_command_blocking(container_bin, ["delete", name]); - } - ContainerCliFlavor::Docker => { - run_container_admin_command_blocking(container_bin, ["rm", "-f", name]); - } - } +/// Stop without deleting, so the next process can `try_resume` by label. +/// For destructive cleanup use `cleanup_named_container`. +fn stop_named_container_blocking(container_bin: &Path, cli: ContainerCliFlavor, name: &str) { + let _ = cli; // both flavors accept `stop ` + // `-t 0` = SIGKILL immediately on Docker (no 10s SIGTERM grace). + run_container_admin_command_blocking(container_bin, ["stop", "-t", "0", name]); } fn schedule_cleanup_named_container(container_bin: PathBuf, cli: ContainerCliFlavor, name: String) { @@ -1294,6 +1383,241 @@ fn is_missing_container_error(stderr: &str) -> bool { lower.contains("not found") || lower.contains("no such") } +#[derive(Debug)] +struct ResumableContainerInfo { + name: String, + is_running: bool, + /// `None` for running containers or backends without a `FinishedAt`. + finished_at: Option, +} + +/// Look up any container labelled with this `SandboxKey`. Stale spec-hash +/// containers are reaped on the way out. +async fn find_resumable_container( + container_bin: &Path, + cli: ContainerCliFlavor, + key: &str, + spec_hash: &str, +) -> Result> { + match cli { + ContainerCliFlavor::Docker => find_docker_resumable(container_bin, key, spec_hash).await, + ContainerCliFlavor::AppleContainer => { + find_apple_resumable(container_bin, key, spec_hash).await + } + } +} + +/// One row of `docker ps --format '{{json .}}'`. +#[derive(Debug, Deserialize)] +struct DockerPsItem { + #[serde(rename = "Names")] + names: String, + /// `"k1=v1,k2=v2"`; parse with [`parse_docker_labels`]. + #[serde(rename = "Labels")] + labels: String, + #[serde(rename = "State")] + state: String, + #[serde(rename = "CreatedAt")] + created_at: String, +} + +fn parse_docker_labels(raw: &str) -> HashMap<&str, &str> { + raw.split(',').filter_map(|kv| kv.split_once('=')).collect() +} + +async fn find_docker_resumable( + container_bin: &Path, + key: &str, + spec_hash: &str, +) -> Result> { + // `docker ps -a --filter label=...` already does an exact-match on + // the key label server-side. We still pull every candidate so we can + // pick the most recent if there are duplicates (concurrent races) + // and reap any whose spec hash has drifted. + let key_filter = format!("label={WARM_SANDBOX_KEY_LABEL}={key}"); + let mut command = Command::new(container_bin); + command + .arg("ps") + .arg("-a") + .arg("--filter") + .arg(&key_filter) + .arg("--format") + .arg("{{json .}}") + .kill_on_drop(true); + let output = match time::timeout(WARM_SANDBOX_CLEANUP_TIMEOUT, command.output()).await { + Ok(out) => out?, + Err(_) => { + return Err(anyhow!( + "docker ps timed out while listing resumable containers" + )); + } + }; + if !output.status.success() { + return Err(anyhow!( + "docker ps failed while listing resumable containers: {}", + render_command_error(&output.stderr) + )); + } + + let stdout = std::str::from_utf8(&output.stdout).context("docker ps output is not utf-8")?; + let mut matches: Vec<(String, String, String)> = Vec::new(); + let mut stale_names: Vec = Vec::new(); + for line in stdout.lines() { + if line.is_empty() { + continue; + } + let item: DockerPsItem = serde_json::from_str(line) + .with_context(|| format!("decoding docker ps json line: {line}"))?; + let hash = parse_docker_labels(&item.labels) + .get(WARM_SANDBOX_SPEC_HASH_LABEL) + .copied() + .unwrap_or(""); + if hash == spec_hash { + matches.push((item.names, item.state, item.created_at)); + } else { + stale_names.push(item.names); + } + } + for name in stale_names { + if let Err(err) = + cleanup_named_container(container_bin, ContainerCliFlavor::Docker, &name).await + { + eprintln!("failed to reap container {name} with stale spec hash: {err}"); + } + } + // Prefer the most-recent entry (Docker's CreatedAt sorts + // lexicographically as ISO-8601-ish text). + matches.sort_by(|a, b| b.2.cmp(&a.2)); + let Some((name, state, _created)) = matches.into_iter().next() else { + return Ok(None); + }; + let is_running = state.eq_ignore_ascii_case("running"); + let finished_at = if !is_running { + docker_container_finished_at(container_bin, &name).await? + } else { + None + }; + Ok(Some(ResumableContainerInfo { + name, + is_running, + finished_at, + })) +} + +async fn docker_container_finished_at( + container_bin: &Path, + name: &str, +) -> Result> { + let mut command = Command::new(container_bin); + command + .arg("inspect") + .arg("--format") + .arg("{{.State.FinishedAt}}") + .arg(name) + .kill_on_drop(true); + let output = match time::timeout(WARM_SANDBOX_CLEANUP_TIMEOUT, command.output()).await { + Ok(out) => out?, + Err(_) => return Ok(None), + }; + if !output.status.success() { + // Container disappeared between list and inspect — treat as + // unknown, the caller will create fresh. + return Ok(None); + } + let s = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // "0001-01-01T00:00:00Z" is Docker's "never finished" sentinel. + if s.is_empty() || s.starts_with("0001-01-01") { + return Ok(None); + } + Ok(parse_rfc3339_to_system_time(&s)) +} + +fn parse_rfc3339_to_system_time(s: &str) -> Option { + let dt = chrono::DateTime::parse_from_rfc3339(s).ok()?; + let nanos = dt.timestamp_nanos_opt()?; + if nanos < 0 { + return None; + } + Some(SystemTime::UNIX_EPOCH + Duration::from_nanos(nanos as u64)) +} + +async fn find_apple_resumable( + container_bin: &Path, + key: &str, + spec_hash: &str, +) -> Result> { + // Apple's `container` CLI doesn't expose a label filter; we list + // everything and match in-process. + let output = run_container_admin_command( + container_bin, + WARM_SANDBOX_CLEANUP_TIMEOUT, + ["list", "--format", "json"], + ) + .await?; + if !output.status.success() { + return Err(anyhow!( + "container list failed: {}", + render_command_error(&output.stderr) + )); + } + let containers: Vec = serde_json::from_slice(&output.stdout)?; + let mut chosen: Option = None; + let mut stale_names: Vec = Vec::new(); + for c in containers { + let labels = &c.configuration.labels; + let Some(matched_key) = labels.get(WARM_SANDBOX_KEY_LABEL) else { + continue; + }; + if matched_key != key { + continue; + } + let hash_matches = labels + .get(WARM_SANDBOX_SPEC_HASH_LABEL) + .map(|s| s == spec_hash) + .unwrap_or(false); + if !hash_matches { + stale_names.push(c.configuration.id); + continue; + } + // We don't have a portable finished-at signal from the apple + // CLI's list output; trust the status field for run-state and + // leave finished_at as None (the TTL guard simply skips its + // staleness check when finished_at is unknown). + let is_running = c.status.as_deref() == Some("running"); + chosen = Some(ResumableContainerInfo { + name: c.configuration.id, + is_running, + finished_at: None, + }); + } + for name in stale_names { + if let Err(err) = + cleanup_named_container(container_bin, ContainerCliFlavor::AppleContainer, &name).await + { + eprintln!("failed to reap apple-container {name} with stale spec hash: {err}"); + } + } + Ok(chosen) +} + +async fn start_named_container( + container_bin: &Path, + cli: ContainerCliFlavor, + name: &str, +) -> Result<()> { + let _ = cli; // both flavors take `start ` + let output = + run_container_admin_command(container_bin, WARM_SANDBOX_CLEANUP_TIMEOUT, ["start", name]) + .await?; + if !output.status.success() { + return Err(anyhow!( + "failed to start container {name}: {}", + render_command_error(&output.stderr) + )); + } + Ok(()) +} + fn is_already_exists_error(message: &str) -> bool { let lower = message.to_ascii_lowercase(); lower.contains("already exists") @@ -1450,3 +1774,17 @@ async fn docker_load_image(container_bin: &Path, payload: &Bytes) -> Result>, } +/// Return the first event of `kind` (in `direction` order) for which +/// `extract` yields `Some`. `limit` caps the scan; set it generously +/// when `extract` may filter most candidates out. +pub async fn first_matching_event( + conversation: &dyn ConversationHandle, + kind: EventKind, + direction: EventQueryDirection, + limit: u32, + mut extract: impl FnMut(EventData) -> Option, +) -> Result> { + let result = conversation + .get_events(Some(EventQuery { + cursor: None, + direction: Some(direction), + limit: Some(limit), + session_id: None, + turn_id: None, + types: Some(vec![kind]), + })) + .await?; + Ok(result + .events + .into_iter() + .find_map(|event| extract(event.data))) +} + /// Tag identifying an `EventData` variant — used by `EventQuery::types` to /// filter events by kind without stringly comparing snake_case names. The /// constants below cover every built-in variant; `EventKind::custom(name)`