diff --git a/sdk/rust/src/filesystem/agentfs.rs b/sdk/rust/src/filesystem/agentfs.rs index 1bab7f5..3a6418d 100644 --- a/sdk/rust/src/filesystem/agentfs.rs +++ b/sdk/rust/src/filesystem/agentfs.rs @@ -1,10 +1,12 @@ use anyhow::Result; use async_trait::async_trait; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use turso::{Builder, Connection, Value}; +use crate::AgentFSOptions; + use super::{ DirEntry, FileSystem, FilesystemStats, FsError, Stats, DEFAULT_DIR_MODE, DEFAULT_FILE_MODE, S_IFLNK, S_IFMT, @@ -20,6 +22,14 @@ pub struct AgentFS { chunk_size: usize, } +pub struct SnapshotArgs { + /// The agent filesystem ID or path + pub id_or_path: String, + + /// Create snapshot in this file + pub output_file: PathBuf +} + impl AgentFS { /// Create a new filesystem pub async fn new(db_path: &str) -> Result { @@ -1576,6 +1586,135 @@ impl AgentFS { Ok(()) } + /// Create a consistent point-in-time snapshot of the database into a new file. The output file must not already exist. + pub async fn snapshot(&self, args: SnapshotArgs) -> Result<()> { + if args.output_file.exists() { + anyhow::bail!("Output file already exists: {}", args.output_file.display()); + } + + let opts = AgentFSOptions::resolve(&args.id_or_path)?; + let db_path = AgentFSOptions::opts_to_db_path(opts)?; + let source_db = AgentFS::new(&db_path).await?; + let source_conn = source_db.get_connection(); + + let target_path = args.output_file.to_string_lossy(); + let target_db = Builder::new_local(&target_path).build().await?; + let target_conn = target_db.connect()?; + + source_conn.execute("BEGIN", ()).await?; + + // Speed up snapshot writes; if snapshot fails mid-way, caller can retry. + target_conn.execute("PRAGMA synchronous = OFF", ()).await?; + target_conn.execute("BEGIN IMMEDIATE", ()).await?; + + Self::clone_schema(&source_conn, &target_conn).await?; + Self::clone_all_tables_data(&source_conn, &target_conn).await?; + + target_conn.execute("COMMIT", ()).await?; + source_conn.execute("COMMIT", ()).await?; + + Ok(()) + } + + async fn clone_schema(source: &Connection, target: &Connection) -> Result<()> { + // tables need to be created first, before creating indexes + let query = r#" + SELECT sql + FROM sqlite_schema + WHERE sql IS NOT NULL + AND name NOT LIKE 'sqlite_%' + AND type IN ('table', 'index') + ORDER BY + CASE type + WHEN 'table' THEN 0 + WHEN 'index' THEN 1 + END, + name + "#; + + let mut rows = source.query(query, ()).await?; + while let Some(row) = rows.next().await? { + let ddl = match row.get_value(0).ok() { + Some(Value::Text(s)) => s, + _ => continue, // corrupted db + }; + + let ddl = format!("{ddl};"); + target.execute(&ddl, ()).await?; + } + Ok(()) + } + + async fn clone_all_tables_data(source: &Connection, target: &Connection) -> Result<()> { + let mut rows = source.query( + r#" + SELECT name + FROM sqlite_schema + WHERE type = 'table' + AND name NOT LIKE 'sqlite_%' + ORDER BY name + "#, + (), + ).await?; + + while let Some(row) = rows.next().await? { + let table = match row.get_value(0).ok() { + Some(Value::Text(s)) => s, + _ => continue, // corrupted db + }; + + Self::copy_table(source, target, &table).await?; + } + + Ok(()) + } + + async fn copy_table(source: &Connection, target: &Connection, table: &str) -> Result<()> { + let cols = Self::table_columns(source, table).await?; + if cols.is_empty() { + return Ok(()); // corrupted db + } + + let table_q = quote_ident(table); + let col_list = cols + .iter() + .map(|c| quote_ident(c)) + .collect::>() + .join(", "); + + let query = format!("SELECT {col_list} FROM {table_q}"); + let mut rows = source.query(&query, ()).await?; + + while let Some(row) = rows.next().await? { + let mut values_sql = Vec::with_capacity(cols.len()); + for i in 0..cols.len() { + let v = row.get_value(i).ok(); + values_sql.push(value_to_sql(v.as_ref())); + } + + let query = format!( + "INSERT INTO {table_q} ({col_list}) VALUES ({});", + values_sql.join(", ") + ); + target.execute(&query, ()).await?; + } + + Ok(()) + } + + async fn table_columns(conn: &Connection, table: &str) -> Result> { + let query = format!("PRAGMA table_info({})", quote_ident(table)); + let mut rows = conn.query(&query, ()).await?; + + let mut cols = Vec::new(); + while let Some(row) = rows.next().await? { + if let Some(Value::Text(name)) = row.get_value(1).ok() { + cols.push(name); + } + } + Ok(cols) + } + /// Get the number of chunks for a given inode (for testing) #[cfg(test)] async fn get_chunk_count(&self, ino: i64) -> Result { @@ -1596,6 +1735,52 @@ impl AgentFS { } } +// quote_ident safely quotes a SQL identifier by wrapping in double quotes and escaping embedded quotes. +fn quote_ident(s: &str) -> String { + let mut out = String::with_capacity(s.len() + 2); + out.push('"'); + for ch in s.chars() { + if ch == '"' { + out.push('"'); + } + out.push(ch); + } + out.push('"'); + out +} + +// value_to_sql converts value type to SQL string literals format. +fn value_to_sql(v: Option<&Value>) -> String { + match v { + None | Some(Value::Null) => "NULL".to_string(), + Some(Value::Integer(i)) => i.to_string(), + Some(Value::Real(f)) => f.to_string(), + Some(Value::Text(s)) => { + let mut out = String::with_capacity(s.len() + 2); + out.push('\''); + for ch in s.chars() { + if ch == '\'' { + out.push('\''); + } + out.push(ch); + } + out.push('\''); + out + }, + Some(Value::Blob(b)) => { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut out = String::with_capacity(2 + b.len()*2 + 1); + out.push_str("X'"); + for &byte in b { + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0F) as usize] as char); + } + out.push('\''); + out + } + } +} + #[async_trait] impl FileSystem for AgentFS { async fn stat(&self, path: &str) -> Result> { diff --git a/sdk/rust/src/lib.rs b/sdk/rust/src/lib.rs index 2a3846d..5ccd0c8 100644 --- a/sdk/rust/src/lib.rs +++ b/sdk/rust/src/lib.rs @@ -102,6 +102,16 @@ impl AgentFSOptions { })?)) } } + + pub fn opts_to_db_path(opts: AgentFSOptions) -> Result { + if let Some(path) = opts.path { + Ok(path) + } else if let Some(id) = opts.id { + Ok(format!("{}/{}.db", agentfs_dir().display(), id)) + } else { + Ok(":memory:".to_string()) + } + } } /// The main AgentFS SDK struct