diff --git a/cli/src/fuse.rs b/cli/src/fuse.rs index 22e2dd5..f022ad9 100644 --- a/cli/src/fuse.rs +++ b/cli/src/fuse.rs @@ -19,7 +19,7 @@ use std::{ }, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; /// Cache entries never expire - we explicitly invalidate on mutations. /// This is safe because we are the only writer to the filesystem. @@ -48,14 +48,20 @@ struct OpenFile { file: BoxedFile, } +/// FUSE filesystem adapter that dispatches operations to async tasks. +/// +/// Each FUSE callback spawns a tokio task to handle the operation concurrently, +/// allowing multiple filesystem operations to run in parallel instead of +/// serializing them through block_on. struct AgentFSFuse { fs: Arc, - runtime: Runtime, + /// Tokio runtime handle for spawning async tasks + handle: Handle, path_cache: Arc>>, /// Maps file handle -> open file state open_files: Arc>>, /// Next file handle to allocate - next_fh: AtomicU64, + next_fh: Arc, /// User ID to report for all files (set at mount time) uid: u32, /// Group ID to report for all files (set at mount time) @@ -100,19 +106,20 @@ impl Filesystem for AgentFSFuse { return; }; let fs = self.fs.clone(); - let (result, path) = self.runtime.block_on(async move { - let result = fs.lstat(&path).await; - (result, path) - }); - match result { - Ok(Some(stats)) => { - let attr = fillattr(&stats, self.uid, self.gid); - self.add_path(attr.ino, path); - reply.entry(&TTL, &attr, 0); + let path_cache = self.path_cache.clone(); + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + match fs.lstat(&path).await { + Ok(Some(stats)) => { + let attr = fillattr(&stats, uid, gid); + path_cache.lock().insert(attr.ino, path); + reply.entry(&TTL, &attr, 0); + } + Ok(None) => reply.error(libc::ENOENT), + Err(_) => reply.error(libc::EIO), } - Ok(None) => reply.error(libc::ENOENT), - Err(_) => reply.error(libc::EIO), - } + }); } /// Retrieves file attributes for a given inode. @@ -126,13 +133,15 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let result = self.runtime.block_on(async move { fs.lstat(&path).await }); - - match result { - Ok(Some(stats)) => reply.attr(&TTL, &fillattr(&stats, self.uid, self.gid)), - Ok(None) => reply.error(libc::ENOENT), - Err(_) => reply.error(libc::EIO), - } + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + match fs.lstat(&path).await { + Ok(Some(stats)) => reply.attr(&TTL, &fillattr(&stats, uid, gid)), + Ok(None) => reply.error(libc::ENOENT), + Err(_) => reply.error(libc::EIO), + } + }); } /// Reads the target of a symbolic link. @@ -146,15 +155,13 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let result = self - .runtime - .block_on(async move { fs.readlink(&path).await }); - - match result { - Ok(Some(target)) => reply.data(target.as_bytes()), - Ok(None) => reply.error(libc::ENOENT), - Err(_) => reply.error(libc::EIO), - } + self.handle.spawn(async move { + match fs.readlink(&path).await { + Ok(Some(target)) => reply.data(target.as_bytes()), + Ok(None) => reply.error(libc::ENOENT), + Err(_) => reply.error(libc::EIO), + } + }); } /// Sets file attributes, primarily handling truncate operations. @@ -179,56 +186,53 @@ impl Filesystem for AgentFSFuse { _flags: Option, reply: ReplyAttr, ) { - // Handle truncate - if let Some(new_size) = size { - let result = if let Some(fh) = fh { - // Use file handle if available (ftruncate) - let file = { - let open_files = self.open_files.lock(); - open_files.get(&fh).map(|f| f.file.clone()) - }; + // Get file handle or path for truncate, and path for final stat + let file_for_truncate = + fh.and_then(|fh| self.open_files.lock().get(&fh).map(|f| f.file.clone())); + let path = self.get_path(ino); - if let Some(file) = file { - self.runtime - .block_on(async move { file.truncate(new_size).await }) - } else { - reply.error(libc::EBADF); - return; - } - } else { - // Open file and truncate via file handle - let Some(path) = self.path_cache.lock().get(&ino).cloned() else { - reply.error(libc::ENOENT); - return; - }; - - let fs = self.fs.clone(); - self.runtime.block_on(async move { - let file = fs.open(&path).await?; - file.truncate(new_size).await - }) - }; - - if result.is_err() { - reply.error(libc::EIO); - return; - } + // Early return if we need a file handle but don't have one + if size.is_some() && fh.is_some() && file_for_truncate.is_none() { + reply.error(libc::EBADF); + return; } - // Return updated attributes - let Some(path) = self.get_path(ino) else { + let Some(path) = path else { reply.error(libc::ENOENT); return; }; let fs = self.fs.clone(); - let result = self.runtime.block_on(async move { fs.stat(&path).await }); + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + // Handle truncate if requested + if let Some(new_size) = size { + let truncate_result = if let Some(file) = file_for_truncate { + file.truncate(new_size).await + } else { + match fs.open(&path).await { + Ok(file) => file.truncate(new_size).await, + Err(_) => { + reply.error(libc::EIO); + return; + } + } + }; - match result { - Ok(Some(stats)) => reply.attr(&TTL, &fillattr(&stats, self.uid, self.gid)), - Ok(None) => reply.error(libc::ENOENT), - Err(_) => reply.error(libc::EIO), - } + if truncate_result.is_err() { + reply.error(libc::EIO); + return; + } + } + + // Return updated attributes + match fs.stat(&path).await { + Ok(Some(stats)) => reply.attr(&TTL, &fillattr(&stats, uid, gid)), + Ok(None) => reply.error(libc::ENOENT), + Err(_) => reply.error(libc::EIO), + } + }); } // ───────────────────────────────────────────────────────────── @@ -256,84 +260,79 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let (entries_result, path) = self.runtime.block_on(async move { - let result = fs.readdir_plus(&path).await; - (result, path) - }); - - let entries = match entries_result { - Ok(Some(entries)) => entries, - Ok(None) => { - reply.error(libc::ENOENT); - return; - } - Err(_) => { - reply.error(libc::EIO); - return; - } - }; - - // Determine parent inode for ".." entry - let parent_ino = if ino == 1 { - 1 // Root's parent is itself - } else { - let parent_path = Path::new(&path) - .parent() - .map(|p| { - let s = p.to_string_lossy().to_string(); - if s.is_empty() { - "/".to_string() - } else { - s - } - }) - .unwrap_or_else(|| "/".to_string()); + let path_cache = self.path_cache.clone(); + self.handle.spawn(async move { + let entries = match fs.readdir_plus(&path).await { + Ok(Some(entries)) => entries, + Ok(None) => { + reply.error(libc::ENOENT); + return; + } + Err(_) => { + reply.error(libc::EIO); + return; + } + }; - if parent_path == "/" { - 1 + // Determine parent inode for ".." entry + let parent_ino = if ino == 1 { + 1 // Root's parent is itself } else { - let fs = self.fs.clone(); - match self - .runtime - .block_on(async move { fs.stat(&parent_path).await }) - { - Ok(Some(stats)) => stats.ino as u64, - _ => 1, // Fallback to root if parent lookup fails + let parent_path = Path::new(&path) + .parent() + .map(|p| { + let s = p.to_string_lossy().to_string(); + if s.is_empty() { + "/".to_string() + } else { + s + } + }) + .unwrap_or_else(|| "/".to_string()); + + if parent_path == "/" { + 1 + } else { + match fs.stat(&parent_path).await { + Ok(Some(stats)) => stats.ino as u64, + _ => 1, // Fallback to root if parent lookup fails + } } - } - }; + }; - let mut all_entries = vec![ - (ino, FileType::Directory, "."), - (parent_ino, FileType::Directory, ".."), - ]; + // Build entries list: ".", "..", then directory contents + let mut all_entries: Vec<(u64, FileType, String)> = vec![ + (ino, FileType::Directory, ".".to_string()), + (parent_ino, FileType::Directory, "..".to_string()), + ]; - // Process entries with stats already available (no N+1 queries!) - for entry in &entries { - let entry_path = if path == "/" { - format!("/{}", entry.name) - } else { - format!("{}/{}", path, entry.name) - }; + // Process entries with stats already available (no N+1 queries!) + for entry in &entries { + let entry_path = if path == "/" { + format!("/{}", entry.name) + } else { + format!("{}/{}", path, entry.name) + }; - let kind = if entry.stats.is_directory() { - FileType::Directory - } else if entry.stats.is_symlink() { - FileType::Symlink - } else { - FileType::RegularFile - }; + let kind = if entry.stats.is_directory() { + FileType::Directory + } else if entry.stats.is_symlink() { + FileType::Symlink + } else { + FileType::RegularFile + }; - self.add_path(entry.stats.ino as u64, entry_path); - all_entries.push((entry.stats.ino as u64, kind, entry.name.as_str())); - } + path_cache.lock().insert(entry.stats.ino as u64, entry_path); + all_entries.push((entry.stats.ino as u64, kind, entry.name.clone())); + } - for (i, entry) in all_entries.iter().enumerate().skip(offset as usize) { - if reply.add(entry.0, (i + 1) as i64, entry.1, entry.2) { - break; + for (i, entry) in all_entries.iter().enumerate().skip(offset as usize) { + if reply.add(entry.0, (i + 1) as i64, entry.1, &entry.2) { + break; + } } - } - reply.ok(); + reply.ok(); + }); } /// Reads directory entries with full attributes for the given inode. @@ -355,126 +354,105 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let (entries_result, path) = self.runtime.block_on(async move { - let result = fs.readdir_plus(&path).await; - (result, path) - }); + let path_cache = self.path_cache.clone(); + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + let entries = match fs.readdir_plus(&path).await { + Ok(Some(entries)) => entries, + Ok(None) => { + reply.error(libc::ENOENT); + return; + } + Err(_) => { + reply.error(libc::EIO); + return; + } + }; - let entries = match entries_result { - Ok(Some(entries)) => entries, - Ok(None) => { - reply.error(libc::ENOENT); - return; - } - Err(_) => { - reply.error(libc::EIO); - return; - } - }; + // Get current directory stats for "." + let dir_stats = fs.stat(&path).await.ok().flatten(); - // Get current directory stats for "." - let fs = self.fs.clone(); - let path_for_stat = path.clone(); - let dir_stats = self - .runtime - .block_on(async move { fs.stat(&path_for_stat).await }) - .ok() - .flatten(); - - // Determine parent inode and stats for ".." entry - let (parent_ino, parent_stats) = if ino == 1 { - (1u64, dir_stats.clone()) // Root's parent is itself - } else { - let parent_path = Path::new(&path) - .parent() - .map(|p| { - let s = p.to_string_lossy().to_string(); - if s.is_empty() { - "/".to_string() - } else { - s - } - }) - .unwrap_or_else(|| "/".to_string()); - - if parent_path == "/" { - let fs = self.fs.clone(); - let parent_stats = self - .runtime - .block_on(async move { fs.stat(&parent_path).await }) - .ok() - .flatten(); - (1u64, parent_stats) + // Determine parent inode and stats for ".." entry + let (parent_ino, parent_stats) = if ino == 1 { + (1u64, dir_stats.clone()) // Root's parent is itself } else { - let fs = self.fs.clone(); - let parent_stats = self - .runtime - .block_on(async move { fs.stat(&parent_path).await }) - .ok() - .flatten(); - let parent_ino = parent_stats.as_ref().map(|s| s.ino as u64).unwrap_or(1); + let parent_path = Path::new(&path) + .parent() + .map(|p| { + let s = p.to_string_lossy().to_string(); + if s.is_empty() { + "/".to_string() + } else { + s + } + }) + .unwrap_or_else(|| "/".to_string()); + + let parent_stats = fs.stat(&parent_path).await.ok().flatten(); + let parent_ino = if parent_path == "/" { + 1u64 + } else { + parent_stats.as_ref().map(|s| s.ino as u64).unwrap_or(1) + }; (parent_ino, parent_stats) - } - }; - - // Build the entries list with full attributes - let uid = self.uid; - let gid = self.gid; + }; - let mut offset_counter = 0i64; + let mut offset_counter = 0i64; - // Add "." entry - if offset <= offset_counter { - if let Some(ref stats) = dir_stats { - let attr = fillattr(stats, uid, gid); - if reply.add(ino, offset_counter + 1, ".", &TTL, &attr, 0) { - reply.ok(); - return; + // Add "." entry + if offset <= offset_counter { + if let Some(ref stats) = dir_stats { + let attr = fillattr(stats, uid, gid); + if reply.add(ino, offset_counter + 1, ".", &TTL, &attr, 0) { + reply.ok(); + return; + } } } - } - offset_counter += 1; + offset_counter += 1; - // Add ".." entry - if offset <= offset_counter { - if let Some(ref stats) = parent_stats { - let attr = fillattr(stats, uid, gid); - if reply.add(parent_ino, offset_counter + 1, "..", &TTL, &attr, 0) { - reply.ok(); - return; + // Add ".." entry + if offset <= offset_counter { + if let Some(ref stats) = parent_stats { + let attr = fillattr(stats, uid, gid); + if reply.add(parent_ino, offset_counter + 1, "..", &TTL, &attr, 0) { + reply.ok(); + return; + } } } - } - offset_counter += 1; - - // Add directory entries with their attributes - for entry in &entries { - if offset <= offset_counter { - let entry_path = if path == "/" { - format!("/{}", entry.name) - } else { - format!("{}/{}", path, entry.name) - }; + offset_counter += 1; - let attr = fillattr(&entry.stats, uid, gid); - self.add_path(entry.stats.ino as u64, entry_path); - - if reply.add( - entry.stats.ino as u64, - offset_counter + 1, - &entry.name, - &TTL, - &attr, - 0, - ) { - reply.ok(); - return; + // Add directory entries with their attributes + for entry in &entries { + if offset <= offset_counter { + let entry_path = if path == "/" { + format!("/{}", entry.name) + } else { + format!("{}/{}", path, entry.name) + }; + + let attr = fillattr(&entry.stats, uid, gid); + path_cache.lock().insert(entry.stats.ino as u64, entry_path); + + if reply.add( + entry.stats.ino as u64, + offset_counter + 1, + &entry.name, + &TTL, + &attr, + 0, + ) { + reply.ok(); + return; + } } + offset_counter += 1; } - offset_counter += 1; - } - reply.ok(); + reply.ok(); + }); } /// Creates a new directory. @@ -496,34 +474,25 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let (result, path) = self.runtime.block_on(async move { - let result = fs.mkdir(&path).await; - (result, path) - }); - - if result.is_err() { - reply.error(libc::EIO); - return; - } - - // Get the new directory's stats - let fs = self.fs.clone(); - let (stat_result, path) = self.runtime.block_on(async move { - let result = fs.stat(&path).await; - (result, path) - }); - - match stat_result { - Ok(Some(stats)) => { - let attr = fillattr(&stats, self.uid, self.gid); - self.add_path(attr.ino, path); - reply.entry(&TTL, &attr, 0); - } - _ => { - // Fail the operation if we cannot stat the new directory + let path_cache = self.path_cache.clone(); + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + if fs.mkdir(&path).await.is_err() { reply.error(libc::EIO); + return; } - } + + // Get the new directory's stats + match fs.stat(&path).await { + Ok(Some(stats)) => { + let attr = fillattr(&stats, uid, gid); + path_cache.lock().insert(attr.ino, path); + reply.entry(&TTL, &attr, 0); + } + _ => reply.error(libc::EIO), + } + }); } /// Removes an empty directory. @@ -536,65 +505,54 @@ impl Filesystem for AgentFSFuse { return; }; - // Verify target is a directory let fs = self.fs.clone(); - let (stat_result, path) = self.runtime.block_on(async move { - let result = fs.lstat(&path).await; - (result, path) - }); + let path_cache = self.path_cache.clone(); + self.handle.spawn(async move { + // Verify target is a directory + let stats = match fs.lstat(&path).await { + Ok(Some(s)) => s, + Ok(None) => { + reply.error(libc::ENOENT); + return; + } + Err(_) => { + reply.error(libc::EIO); + return; + } + }; - let stats = match stat_result { - Ok(Some(s)) => s, - Ok(None) => { - reply.error(libc::ENOENT); - return; - } - Err(_) => { - reply.error(libc::EIO); + if !stats.is_directory() { + reply.error(libc::ENOTDIR); return; } - }; - if !stats.is_directory() { - reply.error(libc::ENOTDIR); - return; - } - - // Verify directory is empty - let fs = self.fs.clone(); - let (readdir_result, path) = self.runtime.block_on(async move { - let result = fs.readdir(&path).await; - (result, path) - }); - - match readdir_result { - Ok(Some(entries)) if !entries.is_empty() => { - reply.error(libc::ENOTEMPTY); - return; - } - Ok(None) => { - reply.error(libc::ENOENT); - return; - } - Err(_) => { - reply.error(libc::EIO); - return; + // Verify directory is empty + match fs.readdir(&path).await { + Ok(Some(entries)) if !entries.is_empty() => { + reply.error(libc::ENOTEMPTY); + return; + } + Ok(None) => { + reply.error(libc::ENOENT); + return; + } + Err(_) => { + reply.error(libc::EIO); + return; + } + Ok(Some(_)) => {} // Empty directory, proceed } - Ok(Some(_)) => {} // Empty directory, proceed - } - - // Remove the directory - let ino = stats.ino as u64; - let fs = self.fs.clone(); - let result = self.runtime.block_on(async move { fs.remove(&path).await }); - match result { - Ok(()) => { - self.drop_path(ino); - reply.ok(); + // Remove the directory + let ino = stats.ino as u64; + match fs.remove(&path).await { + Ok(()) => { + path_cache.lock().remove(&ino); + reply.ok(); + } + Err(_) => reply.error(libc::EIO), } - Err(_) => reply.error(libc::EIO), - } + }); } // ───────────────────────────────────────────────────────────── @@ -620,57 +578,46 @@ impl Filesystem for AgentFSFuse { return; }; - // Create empty file - let fs = self.fs.clone(); - let (result, path) = self.runtime.block_on(async move { - let result = fs.write_file(&path, &[]).await; - (result, path) - }); - - if result.is_err() { - reply.error(libc::EIO); - return; - } - - // Get the new file's stats let fs = self.fs.clone(); - let (stat_result, path) = self.runtime.block_on(async move { - let result = fs.stat(&path).await; - (result, path) - }); - - let attr = match stat_result { - Ok(Some(stats)) => { - let attr = fillattr(&stats, self.uid, self.gid); - self.add_path(attr.ino, path.clone()); - attr - } - _ => { - // Fail the operation if we cannot stat the new file + let path_cache = self.path_cache.clone(); + let open_files = self.open_files.clone(); + let next_fh = self.next_fh.clone(); + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + // Create empty file + if fs.write_file(&path, &[]).await.is_err() { reply.error(libc::EIO); return; } - }; - // Open the file to get a file handle - let fs = self.fs.clone(); - let path_clone = path.clone(); - let open_result = self - .runtime - .block_on(async move { fs.open(&path_clone).await }); - - let file = match open_result { - Ok(file) => file, - Err(_) => { - reply.error(libc::EIO); - return; - } - }; + // Get the new file's stats + let attr = match fs.stat(&path).await { + Ok(Some(stats)) => { + let attr = fillattr(&stats, uid, gid); + path_cache.lock().insert(attr.ino, path.clone()); + attr + } + _ => { + reply.error(libc::EIO); + return; + } + }; + + // Open the file to get a file handle + let file = match fs.open(&path).await { + Ok(file) => file, + Err(_) => { + reply.error(libc::EIO); + return; + } + }; - let fh = self.alloc_fh(); - self.open_files.lock().insert(fh, OpenFile { file }); + let fh = next_fh.fetch_add(1, Ordering::SeqCst); + open_files.lock().insert(fh, OpenFile { file }); - reply.created(&TTL, &attr, 0, fh, 0); + reply.created(&TTL, &attr, 0, fh, 0); + }); } /// Creates a symbolic link. @@ -695,34 +642,26 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); + let path_cache = self.path_cache.clone(); let target_owned = target_str.to_string(); - let (result, path) = self.runtime.block_on(async move { - let result = fs.symlink(&target_owned, &path).await; - (result, path) - }); - - if result.is_err() { - reply.error(libc::EIO); - return; - } - - // Get the new symlink's stats - let fs = self.fs.clone(); - let (stat_result, path) = self.runtime.block_on(async move { - let result = fs.lstat(&path).await; - (result, path) - }); - - match stat_result { - Ok(Some(stats)) => { - let attr = fillattr(&stats, self.uid, self.gid); - self.add_path(attr.ino, path); - reply.entry(&TTL, &attr, 0); - } - _ => { + let uid = self.uid; + let gid = self.gid; + self.handle.spawn(async move { + if fs.symlink(&target_owned, &path).await.is_err() { reply.error(libc::EIO); + return; } - } + + // Get the new symlink's stats + match fs.lstat(&path).await { + Ok(Some(stats)) => { + let attr = fillattr(&stats, uid, gid); + path_cache.lock().insert(attr.ino, path); + reply.entry(&TTL, &attr, 0); + } + _ => reply.error(libc::EIO), + } + }); } /// Removes a file (unlinks it from the directory). @@ -734,42 +673,36 @@ impl Filesystem for AgentFSFuse { return; }; - // Get inode before removing so we can uncache let fs = self.fs.clone(); - let (stat_result, path) = self.runtime.block_on(async move { - let result = fs.lstat(&path).await; - (result, path) - }); + let path_cache = self.path_cache.clone(); + self.handle.spawn(async move { + // Get inode before removing so we can uncache + let stats = match fs.lstat(&path).await { + Ok(Some(s)) => s, + Ok(None) => { + reply.error(libc::ENOENT); + return; + } + Err(_) => { + reply.error(libc::EIO); + return; + } + }; - let stats = match &stat_result { - Ok(Some(s)) => s, - Ok(None) => { - reply.error(libc::ENOENT); + if stats.is_directory() { + reply.error(libc::EISDIR); return; } - Err(_) => { - reply.error(libc::EIO); - return; - } - }; - if stats.is_directory() { - reply.error(libc::EISDIR); - return; - } - - let ino = stats.ino as u64; - - let fs = self.fs.clone(); - let result = self.runtime.block_on(async move { fs.remove(&path).await }); - - match result { - Ok(()) => { - self.drop_path(ino); - reply.ok(); + let ino = stats.ino as u64; + match fs.remove(&path).await { + Ok(()) => { + path_cache.lock().remove(&ino); + reply.ok(); + } + Err(_) => reply.error(libc::EIO), } - Err(_) => reply.error(libc::EIO), - } + }); } /// Renames a file or directory. @@ -796,52 +729,44 @@ impl Filesystem for AgentFSFuse { return; }; - // Get source inode before rename so we can update cache let fs = self.fs.clone(); - let (src_stat, from_path) = self.runtime.block_on(async move { - let result = fs.stat(&from_path).await; - (result, from_path) - }); - - let src_ino = src_stat.ok().flatten().map(|s| s.ino as u64); - - // Check if destination exists and get its inode for cache cleanup - let fs = self.fs.clone(); - let (dst_stat, to_path) = self.runtime.block_on(async move { - let result = fs.stat(&to_path).await; - (result, to_path) - }); - - let dst_ino = dst_stat.ok().flatten().map(|s| s.ino as u64); - - // Perform the rename - let fs = self.fs.clone(); - let (result, to_path) = self.runtime.block_on(async move { - let result = fs.rename(&from_path, &to_path).await; - (result, to_path) - }); - - match result { - Ok(()) => { - // Update path cache: remove old path, add new path - if let Some(ino) = src_ino { - self.drop_path(ino); - self.add_path(ino, to_path); + let path_cache = self.path_cache.clone(); + self.handle.spawn(async move { + // Get source inode before rename so we can update cache + let src_ino = fs + .stat(&from_path) + .await + .ok() + .flatten() + .map(|s| s.ino as u64); + + // Check if destination exists and get its inode for cache cleanup + let dst_ino = fs.stat(&to_path).await.ok().flatten().map(|s| s.ino as u64); + + // Perform the rename + match fs.rename(&from_path, &to_path).await { + Ok(()) => { + let mut cache = path_cache.lock(); + // Update path cache: remove old path, add new path + if let Some(ino) = src_ino { + cache.remove(&ino); + cache.insert(ino, to_path); + } + // Remove destination from cache if it was replaced + if let Some(ino) = dst_ino { + cache.remove(&ino); + } + reply.ok(); } - // Remove destination from cache if it was replaced - if let Some(ino) = dst_ino { - self.drop_path(ino); + Err(e) => { + let errno = e + .downcast_ref::() + .map(|fs_err| fs_err.to_errno()) + .unwrap_or(libc::EIO); + reply.error(errno); } - reply.ok(); - } - Err(e) => { - let errno = e - .downcast_ref::() - .map(|fs_err| fs_err.to_errno()) - .unwrap_or(libc::EIO); - reply.error(errno); } - } + }); } // ───────────────────────────────────────────────────────────── @@ -858,19 +783,18 @@ impl Filesystem for AgentFSFuse { }; let fs = self.fs.clone(); - let path_clone = path.clone(); - let result = self - .runtime - .block_on(async move { fs.open(&path_clone).await }); - - match result { - Ok(file) => { - let fh = self.alloc_fh(); - self.open_files.lock().insert(fh, OpenFile { file }); - reply.opened(fh, 0); + let open_files = self.open_files.clone(); + let next_fh = self.next_fh.clone(); + self.handle.spawn(async move { + match fs.open(&path).await { + Ok(file) => { + let fh = next_fh.fetch_add(1, Ordering::SeqCst); + open_files.lock().insert(fh, OpenFile { file }); + reply.opened(fh, 0); + } + Err(_) => reply.error(libc::EIO), } - Err(_) => reply.error(libc::EIO), - } + }); } /// Reads data using the file handle. @@ -894,14 +818,12 @@ impl Filesystem for AgentFSFuse { open_file.file.clone() }; - let result = self - .runtime - .block_on(async move { file.pread(offset as u64, size as u64).await }); - - match result { - Ok(data) => reply.data(&data), - Err(_) => reply.error(libc::EIO), - } + self.handle.spawn(async move { + match file.pread(offset as u64, size as u64).await { + Ok(data) => reply.data(&data), + Err(_) => reply.error(libc::EIO), + } + }); } /// Writes data using the file handle. @@ -928,14 +850,12 @@ impl Filesystem for AgentFSFuse { let data_len = data.len(); let data_vec = data.to_vec(); - let result = self - .runtime - .block_on(async move { file.pwrite(offset as u64, &data_vec).await }); - - match result { - Ok(()) => reply.written(data_len as u32), - Err(_) => reply.error(libc::EIO), - } + self.handle.spawn(async move { + match file.pwrite(offset as u64, &data_vec).await { + Ok(()) => reply.written(data_len as u32), + Err(_) => reply.error(libc::EIO), + } + }); } /// Flushes data to the backend storage. @@ -966,12 +886,12 @@ impl Filesystem for AgentFSFuse { } }; - let result = self.runtime.block_on(async move { file.fsync().await }); - - match result { - Ok(()) => reply.ok(), - Err(_) => reply.error(libc::EIO), - } + self.handle.spawn(async move { + match file.fsync().await { + Ok(()) => reply.ok(), + Err(_) => reply.error(libc::EIO), + } + }); } /// Releases (closes) an open file handle. @@ -996,54 +916,54 @@ impl Filesystem for AgentFSFuse { /// /// Queries actual usage from the SDK and reports it to tools like `df`. fn statfs(&mut self, _req: &Request, _ino: u64, reply: ReplyStatfs) { - const BLOCK_SIZE: u64 = 4096; - const TOTAL_INODES: u64 = 1_000_000; // Virtual limit - const MAX_NAMELEN: u32 = 255; - let fs = self.fs.clone(); - let result = self.runtime.block_on(async move { fs.statfs().await }); - - let (used_blocks, used_inodes) = match result { - Ok(stats) => { - let used_blocks = stats.bytes_used.div_ceil(BLOCK_SIZE); - (used_blocks, stats.inodes) - } - Err(_) => (0, 1), // Fallback: just root inode - }; + self.handle.spawn(async move { + const BLOCK_SIZE: u64 = 4096; + const TOTAL_INODES: u64 = 1_000_000; // Virtual limit + const MAX_NAMELEN: u32 = 255; + + let (used_blocks, used_inodes) = match fs.statfs().await { + Ok(stats) => { + let used_blocks = stats.bytes_used.div_ceil(BLOCK_SIZE); + (used_blocks, stats.inodes) + } + Err(_) => (0, 1), // Fallback: just root inode + }; - // Report a large virtual capacity so tools don't think we're out of space - const TOTAL_BLOCKS: u64 = 1024 * 1024 * 1024; // ~4TB virtual size - let free_blocks = TOTAL_BLOCKS.saturating_sub(used_blocks); - let free_inodes = TOTAL_INODES.saturating_sub(used_inodes); - - reply.statfs( - TOTAL_BLOCKS, - free_blocks, - free_blocks, - TOTAL_INODES, - free_inodes, - BLOCK_SIZE as u32, - MAX_NAMELEN, // namelen: maximum filename length - BLOCK_SIZE as u32, // frsize: fragment size - ); + // Report a large virtual capacity so tools don't think we're out of space + const TOTAL_BLOCKS: u64 = 1024 * 1024 * 1024; // ~4TB virtual size + let free_blocks = TOTAL_BLOCKS.saturating_sub(used_blocks); + let free_inodes = TOTAL_INODES.saturating_sub(used_inodes); + + reply.statfs( + TOTAL_BLOCKS, + free_blocks, + free_blocks, + TOTAL_INODES, + free_inodes, + BLOCK_SIZE as u32, + MAX_NAMELEN, // namelen: maximum filename length + BLOCK_SIZE as u32, // frsize: fragment size + ); + }); } } impl AgentFSFuse { /// Create a new FUSE filesystem adapter wrapping a FileSystem instance. /// - /// The provided Tokio runtime is used to execute async FileSystem operations - /// from within synchronous FUSE callbacks via `block_on`. + /// The provided Tokio runtime handle is used to spawn async FileSystem operations + /// from within synchronous FUSE callbacks, allowing concurrent execution. /// /// The uid and gid are used for all file ownership to avoid "dubious ownership" /// errors from tools like git that check file ownership. - fn new(fs: Arc, runtime: Runtime, uid: u32, gid: u32) -> Self { + fn new(fs: Arc, handle: Handle, uid: u32, gid: u32) -> Self { Self { fs, - runtime, + handle, path_cache: Arc::new(Mutex::new(HashMap::new())), open_files: Arc::new(Mutex::new(HashMap::new())), - next_fh: AtomicU64::new(1), + next_fh: Arc::new(AtomicU64::new(1)), uid, gid, } @@ -1087,23 +1007,6 @@ impl AgentFSFuse { let mut path_cache = self.path_cache.lock(); path_cache.insert(ino, path); } - - /// Remove an inode from the path cache. - /// - /// Similar to the Linux kernel's `d_drop()`, this removes the inode's - /// pathname mapping when the file or directory is deleted or renamed. - fn drop_path(&self, ino: u64) { - let mut path_cache = self.path_cache.lock(); - path_cache.remove(&ino); - } - - /// Allocate a new file handle for tracking open files. - /// - /// Similar to the Linux kernel's `get_unused_fd()`, this returns a unique - /// handle that identifies an open file throughout its lifetime. - fn alloc_fh(&self) -> u64 { - self.next_fh.fetch_add(1, Ordering::SeqCst) - } } // ───────────────────────────────────────────────────────────── @@ -1155,7 +1058,11 @@ pub fn mount( let uid = opts.uid.unwrap_or_else(|| unsafe { libc::getuid() }); let gid = opts.gid.unwrap_or_else(|| unsafe { libc::getgid() }); - let fs = AgentFSFuse::new(fs, runtime, uid, gid); + // Get handle from runtime and enter the runtime context so spawned tasks can run + let handle = runtime.handle().clone(); + let _guard = runtime.enter(); + + let fs = AgentFSFuse::new(fs, handle, uid, gid); fs.add_path(1, "/".to_string()); diff --git a/sdk/rust/src/connection_pool.rs b/sdk/rust/src/connection_pool.rs new file mode 100644 index 0000000..575407d --- /dev/null +++ b/sdk/rust/src/connection_pool.rs @@ -0,0 +1,139 @@ +//! Connection pool for concurrent database access. +//! +//! The turso database connection doesn't support concurrent transactions on a single +//! connection. This module provides a pool of connections that can be borrowed for +//! individual operations, enabling concurrent filesystem access. + +use anyhow::Result; +use std::ops::Deref; +use std::sync::Arc; +use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore}; +use turso::{Builder, Connection, Database}; + +/// A pool of database connections for concurrent access. +/// +/// Each connection can only be used by one task at a time. When a task needs +/// database access, it borrows a connection from the pool. When done, the +/// connection is returned to the pool for reuse. +/// +/// If all connections are in use, `get()` will asynchronously wait until +/// a connection becomes available. +pub struct ConnectionPool { + /// The database instance used to create new connections + db: Database, + /// Available connections ready to be borrowed (Arc-wrapped for sharing with PooledConnection) + available: Arc>>>, + /// Semaphore to limit concurrent connections + semaphore: Arc, + /// Maximum number of connections + max_connections: usize, +} + +impl ConnectionPool { + /// Create a new connection pool for the given database path. + /// + /// # Arguments + /// * `db_path` - Path to the SQLite database file + /// * `max_connections` - Maximum number of concurrent connections + pub async fn new(db_path: &str, max_connections: usize) -> Result { + let db = Builder::new_local(db_path).build().await?; + + Ok(Self { + db, + available: Arc::new(Mutex::new(Vec::with_capacity(max_connections))), + semaphore: Arc::new(Semaphore::new(max_connections)), + max_connections, + }) + } + + /// Get a connection from the pool. + /// + /// If a connection is available in the pool, it is returned immediately. + /// If all connections are in use, this method will asynchronously wait + /// until one becomes available. + /// + /// The returned `PooledConnection` will automatically return the connection + /// to the pool when dropped. + pub async fn get(&self) -> Result { + // Acquire a permit - this will wait if all connections are in use + let permit = self + .semaphore + .clone() + .acquire_owned() + .await + .map_err(|e| anyhow::anyhow!("Semaphore closed: {}", e))?; + + // Try to get an existing connection from the pool + let conn = { + let mut available = self.available.lock().await; + available.pop() + }; + + let conn = match conn { + Some(conn) => conn, + None => { + // No available connection, create a new one + Arc::new(self.db.connect()?) + } + }; + + Ok(PooledConnection { + conn: Some(conn), + available: self.available.clone(), + _permit: permit, + max_connections: self.max_connections, + }) + } + + /// Get the number of available connections in the pool. + pub async fn available_count(&self) -> usize { + self.available.lock().await.len() + } + + /// Get the number of permits available (connections not in use). + pub fn permits_available(&self) -> usize { + self.semaphore.available_permits() + } +} + +/// A connection borrowed from the pool. +/// +/// When dropped, the connection is automatically returned to the pool +/// and the semaphore permit is released. +pub struct PooledConnection { + conn: Option>, + available: Arc>>>, + _permit: OwnedSemaphorePermit, + max_connections: usize, +} + +impl Deref for PooledConnection { + type Target = Connection; + + fn deref(&self) -> &Self::Target { + self.conn.as_ref().unwrap() + } +} + +impl Drop for PooledConnection { + fn drop(&mut self) { + if let Some(conn) = self.conn.take() { + // Return connection to pool synchronously + // We use try_lock to avoid blocking in drop + if let Ok(mut available) = self.available.try_lock() { + if available.len() < self.max_connections { + available.push(conn); + } + } + // If we can't get the lock or pool is full, connection is dropped + // The semaphore permit is released automatically when _permit is dropped + } + } +} + +impl PooledConnection { + /// Get a reference to the underlying connection. + pub fn connection(&self) -> &Connection { + self.conn.as_ref().unwrap() + } +} diff --git a/sdk/rust/src/filesystem/agentfs.rs b/sdk/rust/src/filesystem/agentfs.rs index 4aac458..8ce8a54 100644 --- a/sdk/rust/src/filesystem/agentfs.rs +++ b/sdk/rust/src/filesystem/agentfs.rs @@ -3,20 +3,25 @@ use async_trait::async_trait; use std::path::Path; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use turso::{Builder, Connection, Value}; +use turso::{Connection, Value}; use super::{ BoxedFile, DirEntry, File, FileSystem, FilesystemStats, FsError, Stats, DEFAULT_DIR_MODE, DEFAULT_FILE_MODE, S_IFLNK, S_IFMT, }; +use crate::connection_pool::ConnectionPool; const ROOT_INO: i64 = 1; const DEFAULT_CHUNK_SIZE: usize = 4096; +const DEFAULT_POOL_SIZE: usize = 16; -/// A filesystem backed by SQLite +/// A filesystem backed by SQLite with connection pooling for concurrent access. +/// +/// Each filesystem operation borrows a connection from the pool, enabling +/// concurrent access without transaction conflicts. #[derive(Clone)] pub struct AgentFS { - conn: Arc, + pool: Arc, chunk_size: usize, } @@ -24,8 +29,9 @@ pub struct AgentFS { /// /// This struct holds the inode number resolved at open time, allowing /// efficient read/write/fsync operations without path lookups. +/// Each operation borrows a connection from the pool. pub struct AgentFSFile { - conn: Arc, + pool: Arc, ino: i64, chunk_size: usize, } @@ -33,12 +39,12 @@ pub struct AgentFSFile { #[async_trait] impl File for AgentFSFile { async fn pread(&self, offset: u64, size: u64) -> Result> { + let conn = self.pool.get().await?; let chunk_size = self.chunk_size as u64; let start_chunk = offset / chunk_size; let end_chunk = (offset + size).saturating_sub(1) / chunk_size; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT chunk_index, data FROM fs_data WHERE ino = ? AND chunk_index >= ? AND chunk_index <= ? ORDER BY chunk_index", (self.ino, start_chunk as i64, end_chunk as i64), @@ -72,9 +78,10 @@ impl File for AgentFSFile { return Ok(()); } + let conn = self.pool.get().await?; + // Get current file size - let mut rows = self - .conn + let mut rows = conn .query("SELECT size FROM fs_inode WHERE ino = ?", (self.ino,)) .await?; let current_size = if let Some(row) = rows.next().await? { @@ -89,29 +96,31 @@ impl File for AgentFSFile { // If writing beyond current size, extend with zeros first if offset > current_size { let zeros = vec![0u8; (offset - current_size) as usize]; - self.write_data_at_offset(current_size, &zeros).await?; + self.write_data_at_offset_with_conn(&conn, current_size, &zeros) + .await?; } // Write the actual data - self.write_data_at_offset(offset, data).await?; + self.write_data_at_offset_with_conn(&conn, offset, data) + .await?; // Update file size and mtime let new_size = std::cmp::max(current_size, offset + data.len() as u64); let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", - (new_size as i64, now, self.ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", + (new_size as i64, now, self.ino), + ) + .await?; Ok(()) } async fn truncate(&self, new_size: u64) -> Result<()> { + let conn = self.pool.get().await?; + // Get current size - let mut rows = self - .conn + let mut rows = conn .query("SELECT size FROM fs_inode WHERE ino = ?", (self.ino,)) .await?; let current_size = if let Some(row) = rows.next().await? { @@ -125,31 +134,28 @@ impl File for AgentFSFile { let chunk_size = self.chunk_size as u64; - self.conn.execute("BEGIN IMMEDIATE", ()).await?; + conn.execute("BEGIN IMMEDIATE", ()).await?; let result: Result<()> = async { if new_size == 0 { // Special case: truncate to zero - just delete all chunks - self.conn - .execute("DELETE FROM fs_data WHERE ino = ?", (self.ino,)) + conn.execute("DELETE FROM fs_data WHERE ino = ?", (self.ino,)) .await?; } else if new_size < current_size { // Shrinking: delete excess chunks and truncate last chunk if needed let last_chunk_idx = (new_size - 1) / chunk_size; // Delete all chunks beyond the last one we need - self.conn - .execute( - "DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?", - (self.ino, last_chunk_idx as i64), - ) - .await?; + conn.execute( + "DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?", + (self.ino, last_chunk_idx as i64), + ) + .await?; // Truncate the last chunk if needed let offset_in_chunk = (new_size % chunk_size) as usize; if offset_in_chunk > 0 { - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?", (self.ino, last_chunk_idx as i64), @@ -160,12 +166,11 @@ impl File for AgentFSFile { if let Ok(Value::Blob(mut chunk_data)) = row.get_value(0) { if chunk_data.len() > offset_in_chunk { chunk_data.truncate(offset_in_chunk); - self.conn - .execute( - "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", - (Value::Blob(chunk_data), self.ino, last_chunk_idx as i64), - ) - .await?; + conn.execute( + "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", + (Value::Blob(chunk_data), self.ino, last_chunk_idx as i64), + ) + .await?; } } } @@ -176,37 +181,37 @@ impl File for AgentFSFile { // Update the inode size and mtime let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", - (new_size as i64, now, self.ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", + (new_size as i64, now, self.ino), + ) + .await?; Ok(()) } .await; if result.is_err() { - let _ = self.conn.execute("ROLLBACK", ()).await; + let _ = conn.execute("ROLLBACK", ()).await; return result; } - self.conn.execute("COMMIT", ()).await?; + conn.execute("COMMIT", ()).await?; Ok(()) } async fn fsync(&self) -> Result<()> { - self.conn.execute("PRAGMA synchronous = FULL", ()).await?; - self.conn.execute("BEGIN", ()).await?; - self.conn.execute("COMMIT", ()).await?; - self.conn.execute("PRAGMA synchronous = OFF", ()).await?; + let conn = self.pool.get().await?; + conn.execute("PRAGMA synchronous = FULL", ()).await?; + conn.execute("BEGIN", ()).await?; + conn.execute("COMMIT", ()).await?; + conn.execute("PRAGMA synchronous = OFF", ()).await?; Ok(()) } async fn fstat(&self) -> Result { - let mut rows = self - .conn + let conn = self.pool.get().await?; + let mut rows = conn .query( "SELECT ino, mode, nlink, uid, gid, size, atime, mtime, ctime FROM fs_inode WHERE ino = ?", (self.ino,), @@ -223,7 +228,12 @@ impl File for AgentFSFile { impl AgentFSFile { /// Write data at a specific offset, handling chunk boundaries. - async fn write_data_at_offset(&self, offset: u64, data: &[u8]) -> Result<()> { + async fn write_data_at_offset_with_conn( + &self, + conn: &Connection, + offset: u64, + data: &[u8], + ) -> Result<()> { let chunk_size = self.chunk_size as u64; let mut written = 0usize; @@ -238,8 +248,7 @@ impl AgentFSFile { let to_write = std::cmp::min(remaining_in_chunk, remaining_data); // Get existing chunk data (if any) - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?", (self.ino, chunk_index), @@ -271,12 +280,11 @@ impl AgentFSFile { .copy_from_slice(&data[written..written + to_write]); // Save chunk - self.conn - .execute( - "INSERT OR REPLACE INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", - (self.ino, chunk_index, Value::Blob(chunk_data)), - ) - .await?; + conn.execute( + "INSERT OR REPLACE INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", + (self.ino, chunk_index, Value::Blob(chunk_data)), + ) + .await?; written += to_write; } @@ -286,26 +294,30 @@ impl AgentFSFile { } impl AgentFS { - /// Create a new filesystem + /// Create a new filesystem with default pool size. pub async fn new(db_path: &str) -> Result { - let db = Builder::new_local(db_path).build().await?; - let conn = Arc::new(db.connect()?); - Self::from_connection(conn).await + Self::with_pool_size(db_path, DEFAULT_POOL_SIZE).await } - /// Create a filesystem from an existing connection - pub async fn from_connection(conn: Arc) -> Result { - // Initialize schema first - Self::initialize_schema(&conn).await?; + /// Create a new filesystem with a specific pool size. + pub async fn with_pool_size(db_path: &str, pool_size: usize) -> Result { + let pool = Arc::new(ConnectionPool::new(db_path, pool_size).await?); - // Disable synchronous mode for filesystem fsync() semantics. - conn.execute("PRAGMA synchronous = OFF", ()).await?; + // Initialize schema using one connection from the pool + { + let conn = pool.get().await?; + Self::initialize_schema(&conn).await?; + // Disable synchronous mode for filesystem fsync() semantics. + conn.execute("PRAGMA synchronous = OFF", ()).await?; + } // Get chunk_size from config (or use default) - let chunk_size = Self::read_chunk_size(&conn).await?; + let chunk_size = { + let conn = pool.get().await?; + Self::read_chunk_size(&conn).await? + }; - let fs = Self { conn, chunk_size }; - Ok(fs) + Ok(Self { pool, chunk_size }) } /// Get the configured chunk size @@ -313,9 +325,9 @@ impl AgentFS { self.chunk_size } - /// Get the underlying database connection - pub fn get_connection(&self) -> Arc { - self.conn.clone() + /// Get the connection pool + pub fn pool(&self) -> Arc { + self.pool.clone() } /// Initialize the database schema @@ -497,9 +509,8 @@ impl AgentFS { } /// Get link count for an inode - async fn get_link_count(&self, ino: i64) -> Result { - let mut rows = self - .conn + async fn get_link_count_with_conn(conn: &Connection, ino: i64) -> Result { + let mut rows = conn .query("SELECT nlink FROM fs_inode WHERE ino = ?", (ino,)) .await?; @@ -570,7 +581,7 @@ impl AgentFS { } /// Resolve a path to an inode number - async fn resolve_path(&self, path: &str) -> Result> { + async fn resolve_path_with_conn(&self, conn: &Connection, path: &str) -> Result> { let components = self.split_path(path); if components.is_empty() { return Ok(Some(ROOT_INO)); @@ -578,8 +589,7 @@ impl AgentFS { let mut current_ino = ROOT_INO; for component in components { - let mut rows = self - .conn + let mut rows = conn .query( "SELECT ino FROM fs_dentry WHERE parent_ino = ? AND name = ?", (current_ino, component.as_str()), @@ -600,16 +610,22 @@ impl AgentFS { Ok(Some(current_ino)) } + /// Resolve a path to an inode number (public API for tests) + pub async fn resolve_path(&self, path: &str) -> Result> { + let conn = self.pool.get().await?; + self.resolve_path_with_conn(&conn, path).await + } + /// Get file statistics without following symlinks pub async fn lstat(&self, path: &str) -> Result> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); - let ino = match self.resolve_path(&path).await? { + let ino = match self.resolve_path_with_conn(&conn, &path).await? { Some(ino) => ino, None => return Ok(None), }; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT ino, mode, nlink, uid, gid, size, atime, mtime, ctime FROM fs_inode WHERE ino = ?", (ino,), @@ -626,6 +642,7 @@ impl AgentFS { /// Get file statistics, following symlinks pub async fn stat(&self, path: &str) -> Result> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); // Follow symlinks with a maximum depth to prevent infinite loops @@ -633,13 +650,12 @@ impl AgentFS { let max_symlink_depth = 40; // Standard limit for symlink following for _ in 0..max_symlink_depth { - let ino = match self.resolve_path(¤t_path).await? { + let ino = match self.resolve_path_with_conn(&conn, ¤t_path).await? { Some(ino) => ino, None => return Ok(None), }; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT ino, mode, nlink, uid, gid, size, atime, mtime, ctime FROM fs_inode WHERE ino = ?", (ino,), @@ -657,7 +673,7 @@ impl AgentFS { if (mode & S_IFMT) == S_IFLNK { // Read the symlink target let target = self - .readlink(¤t_path) + .readlink_with_conn(&conn, ¤t_path) .await? .ok_or_else(|| anyhow::anyhow!("Symlink has no target"))?; @@ -689,6 +705,7 @@ impl AgentFS { /// Create a directory pub async fn mkdir(&self, path: &str) -> Result<()> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let components = self.split_path(&path); @@ -703,21 +720,20 @@ impl AgentFS { }; let parent_ino = self - .resolve_path(&parent_path) + .resolve_path_with_conn(&conn, &parent_path) .await? .ok_or_else(|| anyhow::anyhow!("Parent directory does not exist"))?; let name = components.last().unwrap(); // Check if already exists - if (self.resolve_path(&path).await?).is_some() { + if (self.resolve_path_with_conn(&conn, &path).await?).is_some() { anyhow::bail!("Directory already exists"); } // Create inode let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - let mut stmt = self - .conn + let mut stmt = conn .prepare( "INSERT INTO fs_inode (mode, uid, gid, size, atime, mtime, ctime) VALUES (?, 0, 0, 0, ?, ?, ?) RETURNING ino", @@ -734,26 +750,25 @@ impl AgentFS { .ok_or_else(|| anyhow::anyhow!("Failed to get inode"))?; // Create directory entry - self.conn - .execute( - "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", - (name.as_str(), parent_ino, ino), - ) - .await?; + conn.execute( + "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", + (name.as_str(), parent_ino, ino), + ) + .await?; // Increment link count - self.conn - .execute( - "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", - (ino,), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", + (ino,), + ) + .await?; Ok(()) } /// Write data to a file pub async fn write_file(&self, path: &str, data: &[u8]) -> Result<()> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let components = self.split_path(&path); @@ -768,27 +783,25 @@ impl AgentFS { }; let parent_ino = self - .resolve_path(&parent_path) + .resolve_path_with_conn(&conn, &parent_path) .await? .ok_or_else(|| anyhow::anyhow!("Parent directory does not exist"))?; let name = components.last().unwrap(); - self.conn.execute("BEGIN IMMEDIATE", ()).await?; + conn.execute("BEGIN IMMEDIATE", ()).await?; let result: Result<()> = async { // Check if file exists - let ino = if let Some(ino) = self.resolve_path(&path).await? { + let ino = if let Some(ino) = self.resolve_path_with_conn(&conn, &path).await? { // Delete existing data - self.conn - .execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) + conn.execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) .await?; ino } else { // Create new inode let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - let mut stmt = self - .conn + let mut stmt = conn .prepare( "INSERT INTO fs_inode (mode, uid, gid, size, atime, mtime, ctime) VALUES (?, 0, 0, ?, ?, ?, ?) RETURNING ino", @@ -805,42 +818,38 @@ impl AgentFS { .ok_or_else(|| anyhow::anyhow!("Failed to get inode"))?; // Create directory entry - self.conn - .execute( - "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", - (name.as_str(), parent_ino, ino), - ) - .await?; + conn.execute( + "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", + (name.as_str(), parent_ino, ino), + ) + .await?; // Increment link count - self.conn - .execute( - "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", - (ino,), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", + (ino,), + ) + .await?; ino }; // Write data in chunks for (chunk_index, chunk) in data.chunks(self.chunk_size).enumerate() { - self.conn - .execute( - "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", - (ino, chunk_index as i64, chunk), - ) - .await?; + conn.execute( + "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", + (ino, chunk_index as i64, chunk), + ) + .await?; } // Update mode (to regular file), size and mtime let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET mode = ?, size = ?, mtime = ? WHERE ino = ?", - (DEFAULT_FILE_MODE as i64, data.len() as i64, now, ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET mode = ?, size = ?, mtime = ? WHERE ino = ?", + (DEFAULT_FILE_MODE as i64, data.len() as i64, now, ino), + ) + .await?; Ok(()) } @@ -848,11 +857,11 @@ impl AgentFS { match result { Ok(()) => { - self.conn.execute("COMMIT", ()).await?; + conn.execute("COMMIT", ()).await?; Ok(()) } Err(e) => { - let _ = self.conn.execute("ROLLBACK", ()).await; + let _ = conn.execute("ROLLBACK", ()).await; Err(e) } } @@ -860,13 +869,13 @@ impl AgentFS { /// Read data from a file pub async fn read_file(&self, path: &str) -> Result>> { - let ino = match self.resolve_path(path).await? { + let conn = self.pool.get().await?; + let ino = match self.resolve_path_with_conn(&conn, path).await? { Some(ino) => ino, None => return Ok(None), }; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? ORDER BY chunk_index", (ino,), @@ -890,7 +899,8 @@ impl AgentFS { /// /// Returns `Ok(None)` if the file does not exist. pub async fn pread(&self, path: &str, offset: u64, size: u64) -> Result>> { - let ino = match self.resolve_path(path).await? { + let conn = self.pool.get().await?; + let ino = match self.resolve_path_with_conn(&conn, path).await? { Some(ino) => ino, None => return Ok(None), }; @@ -900,8 +910,7 @@ impl AgentFS { let start_chunk = offset / chunk_size; let end_chunk = (offset + size).saturating_sub(1) / chunk_size; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT chunk_index, data FROM fs_data WHERE ino = ? AND chunk_index >= ? AND chunk_index <= ? ORDER BY chunk_index", (ino, start_chunk as i64, end_chunk as i64), @@ -938,6 +947,7 @@ impl AgentFS { /// If the offset is beyond the current file size, the file is extended with zeros. /// If the file does not exist, it will be created. pub async fn pwrite(&self, path: &str, offset: u64, data: &[u8]) -> Result<()> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let components = self.split_path(&path); @@ -952,75 +962,71 @@ impl AgentFS { }; let parent_ino = self - .resolve_path(&parent_path) + .resolve_path_with_conn(&conn, &parent_path) .await? .ok_or_else(|| anyhow::anyhow!("Parent directory does not exist"))?; let name = components.last().unwrap(); - self.conn.execute("BEGIN IMMEDIATE", ()).await?; + conn.execute("BEGIN IMMEDIATE", ()).await?; let result: Result<()> = async { // Get or create the inode - let (ino, current_size) = if let Some(ino) = self.resolve_path(&path).await? { - // Get current file size - let mut rows = self - .conn - .query("SELECT size FROM fs_inode WHERE ino = ?", (ino,)) - .await?; - let size = if let Some(row) = rows.next().await? { - row.get_value(0) - .ok() - .and_then(|v| v.as_integer().copied()) - .unwrap_or(0) as u64 + let (ino, current_size) = + if let Some(ino) = self.resolve_path_with_conn(&conn, &path).await? { + // Get current file size + let mut rows = conn + .query("SELECT size FROM fs_inode WHERE ino = ?", (ino,)) + .await?; + let size = if let Some(row) = rows.next().await? { + row.get_value(0) + .ok() + .and_then(|v| v.as_integer().copied()) + .unwrap_or(0) as u64 + } else { + 0 + }; + (ino, size) } else { - 0 - }; - (ino, size) - } else { - // Create new inode - let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - let mut stmt = self - .conn - .prepare( - "INSERT INTO fs_inode (mode, uid, gid, size, atime, mtime, ctime) + // Create new inode + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; + let mut stmt = conn + .prepare( + "INSERT INTO fs_inode (mode, uid, gid, size, atime, mtime, ctime) VALUES (?, 0, 0, 0, ?, ?, ?) RETURNING ino", - ) - .await?; - let row = stmt - .query_row((DEFAULT_FILE_MODE as i64, now, now, now)) - .await?; + ) + .await?; + let row = stmt + .query_row((DEFAULT_FILE_MODE as i64, now, now, now)) + .await?; - let ino = row - .get_value(0) - .ok() - .and_then(|v| v.as_integer().copied()) - .ok_or_else(|| anyhow::anyhow!("Failed to get inode"))?; + let ino = row + .get_value(0) + .ok() + .and_then(|v| v.as_integer().copied()) + .ok_or_else(|| anyhow::anyhow!("Failed to get inode"))?; - // Create directory entry - self.conn - .execute( + // Create directory entry + conn.execute( "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", (name.as_str(), parent_ino, ino), ) .await?; - // Increment link count - self.conn - .execute( + // Increment link count + conn.execute( "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", (ino,), ) .await?; - (ino, 0) - }; + (ino, 0) + }; // Handle empty writes - just update mtime if data.is_empty() { let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute("UPDATE fs_inode SET mtime = ? WHERE ino = ?", (now, ino)) + conn.execute("UPDATE fs_inode SET mtime = ? WHERE ino = ?", (now, ino)) .await?; return Ok(()); } @@ -1056,8 +1062,7 @@ impl AgentFS { // Read existing chunk if we need to preserve some data let needs_read = data_start > 0 || data_end < chunk_size as usize; let mut chunk_data = if needs_read { - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?", (ino, chunk_idx as i64), @@ -1095,29 +1100,26 @@ impl AgentFS { }; // Write the chunk - delete existing then insert - self.conn - .execute( - "DELETE FROM fs_data WHERE ino = ? AND chunk_index = ?", - (ino, chunk_idx as i64), - ) - .await?; - self.conn - .execute( - "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", - (ino, chunk_idx as i64, &chunk_data[..actual_len]), - ) - .await?; + conn.execute( + "DELETE FROM fs_data WHERE ino = ? AND chunk_index = ?", + (ino, chunk_idx as i64), + ) + .await?; + conn.execute( + "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", + (ino, chunk_idx as i64, &chunk_data[..actual_len]), + ) + .await?; } // Update size and mtime let new_size = std::cmp::max(current_size, write_end); let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", - (new_size as i64, now, ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", + (new_size as i64, now, ino), + ) + .await?; Ok(()) } @@ -1125,11 +1127,11 @@ impl AgentFS { match result { Ok(()) => { - self.conn.execute("COMMIT", ()).await?; + conn.execute("COMMIT", ()).await?; Ok(()) } Err(e) => { - let _ = self.conn.execute("ROLLBACK", ()).await; + let _ = conn.execute("ROLLBACK", ()).await; Err(e) } } @@ -1141,15 +1143,15 @@ impl AgentFS { /// - Shrinking: deletes chunks beyond new size, truncates the last chunk if needed /// - Extending: pads with zeros up to the new size pub async fn truncate(&self, path: &str, new_size: u64) -> Result<()> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let ino = self - .resolve_path(&path) + .resolve_path_with_conn(&conn, &path) .await? .ok_or_else(|| anyhow::anyhow!("File not found"))?; // Get current size - let mut rows = self - .conn + let mut rows = conn .query("SELECT size FROM fs_inode WHERE ino = ?", (ino,)) .await?; let current_size = if let Some(row) = rows.next().await? { @@ -1163,25 +1165,23 @@ impl AgentFS { let chunk_size = self.chunk_size as u64; - self.conn.execute("BEGIN IMMEDIATE", ()).await?; + conn.execute("BEGIN IMMEDIATE", ()).await?; let result: Result<()> = async { if new_size == 0 { // Special case: truncate to zero - just delete all chunks - self.conn - .execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) + conn.execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) .await?; } else if new_size < current_size { // Shrinking: delete excess chunks and truncate last chunk if needed let last_chunk_idx = (new_size - 1) / chunk_size; // Delete all chunks beyond the last one we need - self.conn - .execute( - "DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?", - (ino, last_chunk_idx as i64), - ) - .await?; + conn.execute( + "DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?", + (ino, last_chunk_idx as i64), + ) + .await?; // Calculate where in the last chunk the file should end let end_in_last_chunk = ((new_size - 1) % chunk_size) + 1; @@ -1189,8 +1189,7 @@ impl AgentFS { // If the last chunk needs to be truncated (not a full chunk), // read it, truncate, and rewrite if end_in_last_chunk < chunk_size { - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?", (ino, last_chunk_idx as i64), @@ -1201,12 +1200,11 @@ impl AgentFS { if let Ok(Value::Blob(chunk_data)) = row.get_value(0) { if chunk_data.len() > end_in_last_chunk as usize { let truncated = &chunk_data[..end_in_last_chunk as usize]; - self.conn - .execute( - "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", - (truncated, ino, last_chunk_idx as i64), - ) - .await?; + conn.execute( + "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", + (truncated, ino, last_chunk_idx as i64), + ) + .await?; } } } @@ -1222,8 +1220,7 @@ impl AgentFS { // Pad the last existing chunk with zeros if it's not full if let Some(last_idx) = last_existing_chunk { - let mut rows = self - .conn + let mut rows = conn .query( "SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?", (ino, last_idx as i64), @@ -1244,12 +1241,11 @@ impl AgentFS { if needed_len > current_chunk_len { let mut padded = chunk_data.clone(); padded.resize(needed_len, 0); - self.conn - .execute( - "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", - (&padded[..], ino, last_idx as i64), - ) - .await?; + conn.execute( + "UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?", + (&padded[..], ino, last_idx as i64), + ) + .await?; } } } @@ -1264,24 +1260,22 @@ impl AgentFS { chunk_size as usize }; let zeros = vec![0u8; chunk_len]; - self.conn - .execute( - "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", - (ino, chunk_idx as i64, &zeros[..]), - ) - .await?; + conn.execute( + "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)", + (ino, chunk_idx as i64, &zeros[..]), + ) + .await?; } } // else: new_size == current_size, nothing to do for data // Update size and mtime let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", - (new_size as i64, now, ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET size = ?, mtime = ? WHERE ino = ?", + (new_size as i64, now, ino), + ) + .await?; Ok(()) } @@ -1289,11 +1283,11 @@ impl AgentFS { match result { Ok(()) => { - self.conn.execute("COMMIT", ()).await?; + conn.execute("COMMIT", ()).await?; Ok(()) } Err(e) => { - let _ = self.conn.execute("ROLLBACK", ()).await; + let _ = conn.execute("ROLLBACK", ()).await; Err(e) } } @@ -1301,13 +1295,13 @@ impl AgentFS { /// List directory contents pub async fn readdir(&self, path: &str) -> Result>> { - let ino = match self.resolve_path(path).await? { + let conn = self.pool.get().await?; + let ino = match self.resolve_path_with_conn(&conn, path).await? { Some(ino) => ino, None => return Ok(None), }; - let mut rows = self - .conn + let mut rows = conn .query( "SELECT name FROM fs_dentry WHERE parent_ino = ? ORDER BY name", (ino,), @@ -1339,14 +1333,14 @@ impl AgentFS { /// /// Returns entries with their stats in a single JOIN query, avoiding N+1 queries. pub async fn readdir_plus(&self, path: &str) -> Result>> { - let ino = match self.resolve_path(path).await? { + let conn = self.pool.get().await?; + let ino = match self.resolve_path_with_conn(&conn, path).await? { Some(ino) => ino, None => return Ok(None), }; // Single JOIN query to get all entry names and their stats (including link count) - let mut rows = self - .conn + let mut rows = conn .query( "SELECT d.name, i.ino, i.mode, i.nlink, i.uid, i.gid, i.size, i.atime, i.mtime, i.ctime FROM fs_dentry d @@ -1435,6 +1429,7 @@ impl AgentFS { /// Create a symbolic link pub async fn symlink(&self, target: &str, linkpath: &str) -> Result<()> { + let conn = self.pool.get().await?; let linkpath = self.normalize_path(linkpath); let components = self.split_path(&linkpath); @@ -1450,14 +1445,14 @@ impl AgentFS { }; let parent_ino = self - .resolve_path(&parent_path) + .resolve_path_with_conn(&conn, &parent_path) .await? .ok_or_else(|| anyhow::anyhow!("Parent directory does not exist"))?; let name = components.last().unwrap(); // Check if entry already exists - if (self.resolve_path(&linkpath).await?).is_some() { + if (self.resolve_path_with_conn(&conn, &linkpath).await?).is_some() { anyhow::bail!("Path already exists"); } @@ -1470,8 +1465,7 @@ impl AgentFS { let mode = S_IFLNK | 0o777; // Symlinks typically have 777 permissions let size = target.len() as i64; - let mut stmt = self - .conn + let mut stmt = conn .prepare( "INSERT INTO fs_inode (mode, uid, gid, size, atime, mtime, ctime) VALUES (?, 0, 0, ?, ?, ?, ?) RETURNING ino", @@ -1487,44 +1481,40 @@ impl AgentFS { .unwrap_or(0); // Store symlink target - self.conn - .execute( - "INSERT INTO fs_symlink (ino, target) VALUES (?, ?)", - (ino, target), - ) - .await?; + conn.execute( + "INSERT INTO fs_symlink (ino, target) VALUES (?, ?)", + (ino, target), + ) + .await?; // Create directory entry - self.conn - .execute( - "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", - (name.as_str(), parent_ino, ino), - ) - .await?; + conn.execute( + "INSERT INTO fs_dentry (name, parent_ino, ino) VALUES (?, ?, ?)", + (name.as_str(), parent_ino, ino), + ) + .await?; // Increment link count - self.conn - .execute( - "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", - (ino,), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET nlink = nlink + 1 WHERE ino = ?", + (ino,), + ) + .await?; Ok(()) } - /// Read the target of a symbolic link - pub async fn readlink(&self, path: &str) -> Result> { + /// Internal readlink implementation that uses a provided connection + async fn readlink_with_conn(&self, conn: &Connection, path: &str) -> Result> { let path = self.normalize_path(path); - let ino = match self.resolve_path(&path).await? { + let ino = match self.resolve_path_with_conn(conn, &path).await? { Some(ino) => ino, None => return Ok(None), }; // Check if it's a symlink by querying the inode - let mut rows = self - .conn + let mut rows = conn .query("SELECT mode FROM fs_inode WHERE ino = ?", (ino,)) .await?; @@ -1544,8 +1534,7 @@ impl AgentFS { } // Read target from fs_symlink table - let mut rows = self - .conn + let mut rows = conn .query("SELECT target FROM fs_symlink WHERE ino = ?", (ino,)) .await?; @@ -1564,8 +1553,15 @@ impl AgentFS { } } + /// Read the target of a symbolic link + pub async fn readlink(&self, path: &str) -> Result> { + let conn = self.pool.get().await?; + self.readlink_with_conn(&conn, path).await + } + /// Remove a file or empty directory pub async fn remove(&self, path: &str) -> Result<()> { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let components = self.split_path(&path); @@ -1574,7 +1570,7 @@ impl AgentFS { } let ino = self - .resolve_path(&path) + .resolve_path_with_conn(&conn, &path) .await? .ok_or_else(|| anyhow::anyhow!("Path does not exist"))?; @@ -1583,8 +1579,7 @@ impl AgentFS { } // Check if directory is empty - let mut rows = self - .conn + let mut rows = conn .query( "SELECT COUNT(*) FROM fs_dentry WHERE parent_ino = ?", (ino,), @@ -1610,45 +1605,40 @@ impl AgentFS { }; let parent_ino = self - .resolve_path(&parent_path) + .resolve_path_with_conn(&conn, &parent_path) .await? .ok_or_else(|| anyhow::anyhow!("Parent directory does not exist"))?; let name = components.last().unwrap(); // Delete the specific directory entry (not all entries pointing to this inode) - self.conn - .execute( - "DELETE FROM fs_dentry WHERE parent_ino = ? AND name = ?", - (parent_ino, name.as_str()), - ) - .await?; + conn.execute( + "DELETE FROM fs_dentry WHERE parent_ino = ? AND name = ?", + (parent_ino, name.as_str()), + ) + .await?; // Decrement link count - self.conn - .execute( - "UPDATE fs_inode SET nlink = nlink - 1 WHERE ino = ?", - (ino,), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET nlink = nlink - 1 WHERE ino = ?", + (ino,), + ) + .await?; // Check if this was the last link to the inode - let link_count = self.get_link_count(ino).await?; + let link_count = Self::get_link_count_with_conn(&conn, ino).await?; if link_count == 0 { // Manually handle cascading deletes since we don't use foreign keys // Delete data blocks - self.conn - .execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) + conn.execute("DELETE FROM fs_data WHERE ino = ?", (ino,)) .await?; // Delete symlink if exists - self.conn - .execute("DELETE FROM fs_symlink WHERE ino = ?", (ino,)) + conn.execute("DELETE FROM fs_symlink WHERE ino = ?", (ino,)) .await?; // Delete inode - self.conn - .execute("DELETE FROM fs_inode WHERE ino = ?", (ino,)) + conn.execute("DELETE FROM fs_inode WHERE ino = ?", (ino,)) .await?; } @@ -1659,6 +1649,7 @@ impl AgentFS { /// /// This operation is atomic - either all changes succeed or none do. pub async fn rename(&self, from: &str, to: &str) -> Result<()> { + let conn = self.pool.get().await?; let from_path = self.normalize_path(from); let to_path = self.normalize_path(to); @@ -1669,12 +1660,12 @@ impl AgentFS { // Get source inode let src_ino = self - .resolve_path(&from_path) + .resolve_path_with_conn(&conn, &from_path) .await? .ok_or(FsError::NotFound)?; // Get source stats to check if it's a directory - let src_stats = self.stat(&from_path).await?.ok_or(FsError::NotFound)?; + let src_stats = self.lstat(&from_path).await?.ok_or(FsError::NotFound)?; // Prevent renaming a directory into its own subtree (would create a cycle) if src_stats.is_directory() { @@ -1696,7 +1687,7 @@ impl AgentFS { ) }; let src_parent_ino = self - .resolve_path(&src_parent_path) + .resolve_path_with_conn(&conn, &src_parent_path) .await? .ok_or(FsError::NotFound)?; @@ -1712,7 +1703,7 @@ impl AgentFS { format!("/{}", to_components[..to_components.len() - 1].join("/")) }; let dst_parent_ino = self - .resolve_path(&dst_parent_path) + .resolve_path_with_conn(&conn, &dst_parent_path) .await? .ok_or(FsError::NotFound)?; @@ -1720,27 +1711,38 @@ impl AgentFS { let src_name = src_name.clone(); let dst_name = dst_name.clone(); - self.conn.execute("BEGIN IMMEDIATE", ()).await?; + conn.execute("BEGIN IMMEDIATE", ()).await?; let result: Result<()> = async { // Check if destination exists (inside transaction for atomicity) - if let Some(dst_ino) = self.resolve_path(&to_path).await? { - let dst_stats = self.stat(&to_path).await?.ok_or(FsError::NotFound)?; + if let Some(dst_ino) = self.resolve_path_with_conn(&conn, &to_path).await? { + // Get destination stats inline + let mut rows = conn + .query("SELECT mode FROM fs_inode WHERE ino = ?", (dst_ino,)) + .await?; + let dst_mode = if let Some(row) = rows.next().await? { + row.get_value(0) + .ok() + .and_then(|v| v.as_integer().copied()) + .unwrap_or(0) as u32 + } else { + return Err(FsError::NotFound.into()); + }; + let dst_is_dir = (dst_mode & S_IFMT) == super::S_IFDIR; // Can't replace directory with non-directory - if dst_stats.is_directory() && !src_stats.is_directory() { + if dst_is_dir && !src_stats.is_directory() { return Err(FsError::IsADirectory.into()); } // Can't replace non-directory with directory - if !dst_stats.is_directory() && src_stats.is_directory() { + if !dst_is_dir && src_stats.is_directory() { return Err(FsError::NotADirectory.into()); } // If destination is directory, it must be empty - if dst_stats.is_directory() { - let mut rows = self - .conn + if dst_is_dir { + let mut rows = conn .query( "SELECT COUNT(*) FROM fs_dentry WHERE parent_ino = ?", (dst_ino,), @@ -1760,48 +1762,42 @@ impl AgentFS { } // Remove destination entry - self.conn - .execute( - "DELETE FROM fs_dentry WHERE parent_ino = ? AND name = ?", - (dst_parent_ino, dst_name.as_str()), - ) - .await?; + conn.execute( + "DELETE FROM fs_dentry WHERE parent_ino = ? AND name = ?", + (dst_parent_ino, dst_name.as_str()), + ) + .await?; // Decrement link count - self.conn - .execute( - "UPDATE fs_inode SET nlink = nlink - 1 WHERE ino = ?", - (dst_ino,), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET nlink = nlink - 1 WHERE ino = ?", + (dst_ino,), + ) + .await?; // Clean up destination inode if no more links - let link_count = self.get_link_count(dst_ino).await?; + let link_count = Self::get_link_count_with_conn(&conn, dst_ino).await?; if link_count == 0 { - self.conn - .execute("DELETE FROM fs_data WHERE ino = ?", (dst_ino,)) + conn.execute("DELETE FROM fs_data WHERE ino = ?", (dst_ino,)) .await?; - self.conn - .execute("DELETE FROM fs_symlink WHERE ino = ?", (dst_ino,)) + conn.execute("DELETE FROM fs_symlink WHERE ino = ?", (dst_ino,)) .await?; - self.conn - .execute("DELETE FROM fs_inode WHERE ino = ?", (dst_ino,)) + conn.execute("DELETE FROM fs_inode WHERE ino = ?", (dst_ino,)) .await?; } } // Update the dentry: change parent and/or name - self.conn - .execute( - "UPDATE fs_dentry SET parent_ino = ?, name = ? WHERE parent_ino = ? AND name = ?", - ( - dst_parent_ino, - dst_name.as_str(), - src_parent_ino, - src_name.as_str(), - ), - ) - .await?; + conn.execute( + "UPDATE fs_dentry SET parent_ino = ?, name = ? WHERE parent_ino = ? AND name = ?", + ( + dst_parent_ino, + dst_name.as_str(), + src_parent_ino, + src_name.as_str(), + ), + ) + .await?; // Update ctime of the inode let now = SystemTime::now() @@ -1809,12 +1805,11 @@ impl AgentFS { .unwrap_or_default() .as_secs() as i64; - self.conn - .execute( - "UPDATE fs_inode SET ctime = ? WHERE ino = ?", - (now, src_ino), - ) - .await?; + conn.execute( + "UPDATE fs_inode SET ctime = ? WHERE ino = ?", + (now, src_ino), + ) + .await?; Ok(()) } @@ -1822,11 +1817,11 @@ impl AgentFS { match result { Ok(()) => { - self.conn.execute("COMMIT", ()).await?; + conn.execute("COMMIT", ()).await?; Ok(()) } Err(e) => { - let _ = self.conn.execute("ROLLBACK", ()).await; + let _ = conn.execute("ROLLBACK", ()).await; Err(e) } } @@ -1836,8 +1831,9 @@ impl AgentFS { /// /// Returns the total number of inodes and bytes used by file contents. pub async fn statfs(&self) -> Result { + let conn = self.pool.get().await?; // Count total inodes - let mut rows = self.conn.query("SELECT COUNT(*) FROM fs_inode", ()).await?; + let mut rows = conn.query("SELECT COUNT(*) FROM fs_inode", ()).await?; let inodes = if let Some(row) = rows.next().await? { row.get_value(0) @@ -1849,8 +1845,7 @@ impl AgentFS { }; // Sum total bytes used (from file sizes in inodes) - let mut rows = self - .conn + let mut rows = conn .query("SELECT COALESCE(SUM(size), 0) FROM fs_inode", ()) .await?; @@ -1874,10 +1869,11 @@ impl AgentFS { /// /// Note: The path parameter is ignored since all data is in a single database. pub async fn fsync(&self, _path: &str) -> Result<()> { - self.conn.execute("PRAGMA synchronous = FULL", ()).await?; - self.conn.execute("BEGIN", ()).await?; - self.conn.execute("COMMIT", ()).await?; - self.conn.execute("PRAGMA synchronous = OFF", ()).await?; + let conn = self.pool.get().await?; + conn.execute("PRAGMA synchronous = FULL", ()).await?; + conn.execute("BEGIN", ()).await?; + conn.execute("COMMIT", ()).await?; + conn.execute("PRAGMA synchronous = OFF", ()).await?; Ok(()) } @@ -1886,14 +1882,15 @@ impl AgentFS { /// The returned handle can be used for efficient read/write/fsync operations /// without requiring path lookups on each operation. pub async fn open(&self, path: &str) -> Result { + let conn = self.pool.get().await?; let path = self.normalize_path(path); let ino = self - .resolve_path(&path) + .resolve_path_with_conn(&conn, &path) .await? .ok_or_else(|| anyhow::anyhow!("File not found: {}", path))?; Ok(Arc::new(AgentFSFile { - conn: self.conn.clone(), + pool: self.pool.clone(), ino, chunk_size: self.chunk_size, })) @@ -1902,8 +1899,8 @@ impl AgentFS { /// Get the number of chunks for a given inode (for testing) #[cfg(test)] async fn get_chunk_count(&self, ino: i64) -> Result { - let mut rows = self - .conn + let conn = self.pool.get().await?; + let mut rows = conn .query("SELECT COUNT(*) FROM fs_data WHERE ino = ?", (ino,)) .await?; @@ -2283,8 +2280,8 @@ mod tests { let (fs, _dir) = create_test_fs().await?; // Query fs_config table directly - let mut rows = fs - .conn + let conn = fs.pool().get().await?; + let mut rows = conn .query("SELECT value FROM fs_config WHERE key = 'chunk_size'", ()) .await?; @@ -2317,8 +2314,8 @@ mod tests { let ino = fs.resolve_path("/unique.txt").await?.unwrap(); // Try to insert a duplicate chunk - should fail due to PRIMARY KEY constraint - let result = fs - .conn + let conn = fs.pool().get().await?; + let result = conn .execute( "INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, 0, ?)", (ino, vec![1u8; 10]), @@ -2343,8 +2340,8 @@ mod tests { let ino = fs.resolve_path("/ordered.bin").await?.unwrap(); // Query chunks in order - let mut rows = fs - .conn + let conn = fs.pool().get().await?; + let mut rows = conn .query( "SELECT chunk_index FROM fs_data WHERE ino = ? ORDER BY chunk_index", (ino,), @@ -2384,8 +2381,8 @@ mod tests { fs.remove("/deleteme.txt").await?; // Verify all chunks are gone - let mut rows = fs - .conn + let conn = fs.pool().get().await?; + let mut rows = conn .query("SELECT COUNT(*) FROM fs_data WHERE ino = ?", (ino,)) .await?; diff --git a/sdk/rust/src/filesystem/overlayfs.rs b/sdk/rust/src/filesystem/overlayfs.rs index 8ab8eb5..c01e48e 100644 --- a/sdk/rust/src/filesystem/overlayfs.rs +++ b/sdk/rust/src/filesystem/overlayfs.rs @@ -207,7 +207,8 @@ impl OverlayFS { /// base layer represents. This is stored in the delta database so that /// tools like `agentfs diff` can determine what files were modified. pub async fn init(&self, base_path: &str) -> Result<()> { - Self::init_schema(&self.delta.get_connection(), base_path).await + let conn = self.delta.pool().get().await?; + Self::init_schema(&conn, base_path).await } /// Extract the parent path from a normalized path @@ -250,7 +251,7 @@ impl OverlayFS { /// then /foo/bar is also considered deleted. async fn is_whiteout(&self, path: &str) -> Result { let normalized = self.normalize_path(path); - let conn = self.delta.get_connection(); + let conn = self.delta.pool().get().await?; // Check the path itself and all parent paths let mut check_path = normalized.clone(); @@ -290,7 +291,7 @@ impl OverlayFS { async fn create_whiteout(&self, path: &str) -> Result<()> { let normalized = self.normalize_path(path); let parent = Self::parent_path(&normalized); - let conn = self.delta.get_connection(); + let conn = self.delta.pool().get().await?; let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64; conn.execute( @@ -305,7 +306,7 @@ impl OverlayFS { /// Remove a whiteout (un-delete a path) async fn remove_whiteout(&self, path: &str) -> Result<()> { let normalized = self.normalize_path(path); - let conn = self.delta.get_connection(); + let conn = self.delta.pool().get().await?; conn.execute( "DELETE FROM fs_whiteout WHERE path = ?", @@ -318,7 +319,7 @@ impl OverlayFS { /// Get all whiteouts that are direct children of a directory async fn get_child_whiteouts(&self, dir_path: &str) -> Result> { let normalized = self.normalize_path(dir_path); - let conn = self.delta.get_connection(); + let conn = self.delta.pool().get().await?; let mut whiteouts = HashSet::new(); // Use parent_path index for O(1) lookup instead of LIKE which compiles regex diff --git a/sdk/rust/src/lib.rs b/sdk/rust/src/lib.rs index 5fcc3fb..b5b9447 100644 --- a/sdk/rust/src/lib.rs +++ b/sdk/rust/src/lib.rs @@ -1,3 +1,4 @@ +pub mod connection_pool; pub mod filesystem; pub mod kvstore; pub mod toolcalls; @@ -169,7 +170,7 @@ impl AgentFS { let conn = Arc::new(conn); let kv = KvStore::from_connection(conn.clone()).await?; - let fs = filesystem::AgentFS::from_connection(conn.clone()).await?; + let fs = filesystem::AgentFS::new(&db_path).await?; let tools = ToolCalls::from_connection(conn.clone()).await?; Ok(Self { @@ -194,7 +195,7 @@ impl AgentFS { let conn = Arc::new(conn); let kv = KvStore::from_connection(conn.clone()).await?; - let fs = filesystem::AgentFS::from_connection(conn.clone()).await?; + let fs = filesystem::AgentFS::new(db_path).await?; let tools = ToolCalls::from_connection(conn.clone()).await?; Ok(Self {