Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 186 additions & 1 deletion sdk/rust/src/filesystem/agentfs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use anyhow::Result;
use async_trait::async_trait;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use turso::{Builder, Connection, Value};

use crate::AgentFSOptions;

use super::{
DirEntry, FileSystem, FilesystemStats, FsError, Stats, DEFAULT_DIR_MODE, DEFAULT_FILE_MODE,
S_IFLNK, S_IFMT,
Expand All @@ -20,6 +22,14 @@ pub struct AgentFS {
chunk_size: usize,
}

pub struct SnapshotArgs {
/// The agent filesystem ID or path
pub id_or_path: String,

/// Create snapshot in this file
pub output_file: PathBuf
}

impl AgentFS {
/// Create a new filesystem
pub async fn new(db_path: &str) -> Result<Self> {
Expand Down Expand Up @@ -1576,6 +1586,135 @@ impl AgentFS {
Ok(())
}

/// Create a consistent point-in-time snapshot of the database into a new file. The output file must not already exist.
pub async fn snapshot(&self, args: SnapshotArgs) -> Result<()> {
if args.output_file.exists() {
anyhow::bail!("Output file already exists: {}", args.output_file.display());
}

let opts = AgentFSOptions::resolve(&args.id_or_path)?;
let db_path = AgentFSOptions::opts_to_db_path(opts)?;
let source_db = AgentFS::new(&db_path).await?;
let source_conn = source_db.get_connection();

let target_path = args.output_file.to_string_lossy();
let target_db = Builder::new_local(&target_path).build().await?;
let target_conn = target_db.connect()?;

source_conn.execute("BEGIN", ()).await?;

// Speed up snapshot writes; if snapshot fails mid-way, caller can retry.
target_conn.execute("PRAGMA synchronous = OFF", ()).await?;
target_conn.execute("BEGIN IMMEDIATE", ()).await?;

Self::clone_schema(&source_conn, &target_conn).await?;
Self::clone_all_tables_data(&source_conn, &target_conn).await?;

target_conn.execute("COMMIT", ()).await?;
source_conn.execute("COMMIT", ()).await?;

Ok(())
}

async fn clone_schema(source: &Connection, target: &Connection) -> Result<()> {
// tables need to be created first, before creating indexes
let query = r#"
SELECT sql
FROM sqlite_schema
WHERE sql IS NOT NULL
AND name NOT LIKE 'sqlite_%'
AND type IN ('table', 'index')
ORDER BY
CASE type
WHEN 'table' THEN 0
WHEN 'index' THEN 1
END,
name
"#;

let mut rows = source.query(query, ()).await?;
while let Some(row) = rows.next().await? {
let ddl = match row.get_value(0).ok() {
Some(Value::Text(s)) => s,
_ => continue, // corrupted db
};

let ddl = format!("{ddl};");
target.execute(&ddl, ()).await?;
}
Ok(())
}

async fn clone_all_tables_data(source: &Connection, target: &Connection) -> Result<()> {
let mut rows = source.query(
r#"
SELECT name
FROM sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'
ORDER BY name
"#,
(),
).await?;

while let Some(row) = rows.next().await? {
let table = match row.get_value(0).ok() {
Some(Value::Text(s)) => s,
_ => continue, // corrupted db
};

Self::copy_table(source, target, &table).await?;
}

Ok(())
}

async fn copy_table(source: &Connection, target: &Connection, table: &str) -> Result<()> {
let cols = Self::table_columns(source, table).await?;
if cols.is_empty() {
return Ok(()); // corrupted db
}

let table_q = quote_ident(table);
let col_list = cols
.iter()
.map(|c| quote_ident(c))
.collect::<Vec<String>>()
.join(", ");

let query = format!("SELECT {col_list} FROM {table_q}");
let mut rows = source.query(&query, ()).await?;

while let Some(row) = rows.next().await? {
let mut values_sql = Vec::with_capacity(cols.len());
for i in 0..cols.len() {
let v = row.get_value(i).ok();
values_sql.push(value_to_sql(v.as_ref()));
}

let query = format!(
"INSERT INTO {table_q} ({col_list}) VALUES ({});",
values_sql.join(", ")
);
target.execute(&query, ()).await?;
}

Ok(())
}

async fn table_columns(conn: &Connection, table: &str) -> Result<Vec<String>> {
let query = format!("PRAGMA table_info({})", quote_ident(table));
let mut rows = conn.query(&query, ()).await?;

let mut cols = Vec::new();
while let Some(row) = rows.next().await? {
if let Some(Value::Text(name)) = row.get_value(1).ok() {
cols.push(name);
}
}
Ok(cols)
}

/// Get the number of chunks for a given inode (for testing)
#[cfg(test)]
async fn get_chunk_count(&self, ino: i64) -> Result<i64> {
Expand All @@ -1596,6 +1735,52 @@ impl AgentFS {
}
}

// quote_ident safely quotes a SQL identifier by wrapping in double quotes and escaping embedded quotes.
fn quote_ident(s: &str) -> String {
let mut out = String::with_capacity(s.len() + 2);
out.push('"');
for ch in s.chars() {
if ch == '"' {
out.push('"');
}
out.push(ch);
}
out.push('"');
out
}

// value_to_sql converts value type to SQL string literals format.
fn value_to_sql(v: Option<&Value>) -> String {
match v {
None | Some(Value::Null) => "NULL".to_string(),
Some(Value::Integer(i)) => i.to_string(),
Some(Value::Real(f)) => f.to_string(),
Some(Value::Text(s)) => {
let mut out = String::with_capacity(s.len() + 2);
out.push('\'');
for ch in s.chars() {
if ch == '\'' {
out.push('\'');
}
out.push(ch);
}
out.push('\'');
out
},
Some(Value::Blob(b)) => {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(2 + b.len()*2 + 1);
out.push_str("X'");
for &byte in b {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0F) as usize] as char);
}
out.push('\'');
out
}
}
}

#[async_trait]
impl FileSystem for AgentFS {
async fn stat(&self, path: &str) -> Result<Option<Stats>> {
Expand Down
10 changes: 10 additions & 0 deletions sdk/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ impl AgentFSOptions {
})?))
}
}

pub fn opts_to_db_path(opts: AgentFSOptions) -> Result<String> {
if let Some(path) = opts.path {
Ok(path)
} else if let Some(id) = opts.id {
Ok(format!("{}/{}.db", agentfs_dir().display(), id))
} else {
Ok(":memory:".to_string())
}
}
}

/// The main AgentFS SDK struct
Expand Down
Loading