diff --git a/src/lib.rs b/src/lib.rs index 331bb4c2..de0f452b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,3 +4,5 @@ pub use storage::kv::error::{Error, Result}; pub use storage::kv::option::{IsolationLevel, Options}; pub use storage::kv::store::Store; pub use storage::kv::transaction::{Durability, Mode, Transaction}; + +pub mod vfs; diff --git a/src/storage/kv/compaction.rs b/src/storage/kv/compaction.rs index cc1ed941..a6dfbe62 100644 --- a/src/storage/kv/compaction.rs +++ b/src/storage/kv/compaction.rs @@ -1,8 +1,8 @@ -use std::fs::{self, File}; use std::io::{Read, Write}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; +use crate::vfs::FileSystem; use bytes::BytesMut; use crate::storage::{ @@ -34,7 +34,7 @@ impl<'a> Drop for CompactionGuard<'a> { } } -impl StoreInner { +impl StoreInner { pub async fn compact(&self) -> Result<()> { // Early return if the store is closed or compaction is already in progress if self.is_closed.load(Ordering::SeqCst) || !self.core.opts.should_persist_data() { @@ -49,16 +49,16 @@ impl StoreInner { // Clear files before starting compaction if a .merge or .tmp.merge directory exists let tmp_merge_dir = self.core.opts.dir.join(".tmp.merge"); if tmp_merge_dir.exists() { - fs::remove_dir_all(&tmp_merge_dir)?; + self.vfs.remove_dir_all(&tmp_merge_dir)?; } let merge_dir = self.core.opts.dir.join(".merge"); if merge_dir.exists() { - fs::remove_dir_all(&merge_dir)?; + self.vfs.remove_dir_all(&merge_dir)?; } // Clean recovery state before starting compaction - RecoveryState::clear(&self.core.opts.dir)?; + RecoveryState::clear(&self.core.opts.dir, &self.vfs)?; // Clear compaction stats before starting compaction self.stats.compaction_stats.reset(); @@ -77,10 +77,10 @@ impl StoreInner { drop(clog); // Explicitly drop the lock // Create a temporary directory for compaction - fs::create_dir_all(&tmp_merge_dir)?; + self.vfs.create_dir_all(&tmp_merge_dir)?; // Initialize a new manifest in the temporary directory - let mut manifest = Core::initialize_manifest(&tmp_merge_dir)?; + let mut manifest = Core::initialize_manifest(&tmp_merge_dir, &self.vfs)?; // Add the last updated segment ID to the manifest let changeset = Manifest::with_compacted_up_to_segment(last_updated_segment_id); manifest.append(&changeset.serialize()?)?; @@ -91,7 +91,7 @@ impl StoreInner { let tm_opts = LogOptions::default() .with_max_file_size(self.core.opts.max_compaction_segment_size) .with_file_extension("clog".to_string()); - let mut temp_writer = Aol::open(&temp_clog_dir, &tm_opts)?; + let mut temp_writer = Aol::open(&temp_clog_dir, &tm_opts, &self.vfs)?; // TODO: Check later to add a new way for compaction by reading from the files first and then // check in files for the keys that are not found in memory to handle deletion @@ -205,7 +205,7 @@ impl StoreInner { temp_writer.close()?; // Finalize compaction by renaming the temporary directory - fs::rename(tmp_merge_dir, merge_dir)?; + self.vfs.rename(tmp_merge_dir, merge_dir)?; Ok(()) } @@ -218,10 +218,10 @@ pub(crate) enum RecoveryState { } impl RecoveryState { - pub(crate) fn load(dir: &Path) -> Result { + pub(crate) fn load(dir: &Path, vfs: &V) -> Result { let path = dir.join(".recovery_state"); if path.exists() { - let mut file = File::open(path)?; + let mut file = vfs.open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; match contents.as_str() { @@ -233,9 +233,9 @@ impl RecoveryState { } } - pub(crate) fn save(&self, dir: &Path) -> Result<()> { + pub(crate) fn save(&self, dir: &Path, vfs: &V) -> Result<()> { let path = dir.join(".recovery_state"); - let mut file = File::create(path)?; + let mut file = vfs.create(path)?; match self { RecoveryState::ClogDeleted => { write!(file, "ClogDeleted") @@ -246,16 +246,16 @@ impl RecoveryState { } } - pub(crate) fn clear(dir: &Path) -> Result<()> { + pub(crate) fn clear(dir: &Path, vfs: &V) -> Result<()> { let path = dir.join(".recovery_state"); if path.exists() { - fs::remove_file(path)?; + vfs.remove_file(path)?; } Ok(()) } } -fn perform_recovery(opts: &Options) -> Result<()> { +fn perform_recovery(opts: &Options, vfs: &V) -> Result<()> { // Encapsulate operations in a closure for easier rollback let result = || -> Result<()> { let merge_dir = opts.dir.join(".merge"); @@ -263,9 +263,9 @@ fn perform_recovery(opts: &Options) -> Result<()> { let merge_clog_subdir = merge_dir.join("clog"); // If there is a .merge directory, try reading manifest from it - let manifest = Core::initialize_manifest(&merge_dir)?; + let manifest = Core::initialize_manifest(&merge_dir, vfs)?; let existing_manifest = if manifest.size()? > 0 { - Core::read_manifest(&merge_dir)? + Core::read_manifest(&merge_dir, vfs)? } else { return Err(Error::MergeManifestMissing); }; @@ -276,18 +276,18 @@ fn perform_recovery(opts: &Options) -> Result<()> { } let compacted_upto_segment_id = compacted_upto_segments[0]; - let segs = SegmentRef::read_segments_from_directory(&clog_dir)?; + let segs = SegmentRef::read_segments_from_directory(&clog_dir, vfs)?; // Step 4: Copy files from clog dir to merge clog dir for seg in segs.iter() { if seg.id > compacted_upto_segment_id { // Check if the path points to a regular file - match fs::metadata(&seg.file_path) { + match vfs.metadata(&seg.file_path) { Ok(metadata) => { if metadata.is_file() { // Proceed to copy the file let dest_path = merge_clog_subdir.join(seg.file_path.file_name().unwrap()); - match fs::copy(&seg.file_path, &dest_path) { + match vfs.copy(&seg.file_path, &dest_path) { Ok(_) => println!("File copied successfully"), Err(e) => { println!("Error copying file: {:?}", e); @@ -307,24 +307,24 @@ fn perform_recovery(opts: &Options) -> Result<()> { } // Clear any previous recovery state before setting a new one - RecoveryState::clear(&opts.dir)?; + RecoveryState::clear(&opts.dir, vfs)?; // After successful operation, update recovery state to indicate clog can be deleted - RecoveryState::ClogDeleted.save(&opts.dir)?; + RecoveryState::ClogDeleted.save(&opts.dir, vfs)?; // Delete the `clog` directory - if let Err(e) = fs::remove_dir_all(&clog_dir) { + if let Err(e) = vfs.remove_dir_all(&clog_dir) { println!("Error deleting clog directory: {:?}", e); return Err(Error::from(e)); } // Rename `merge_clog_subdir` to `clog` - if let Err(e) = fs::rename(&merge_clog_subdir, &clog_dir) { + if let Err(e) = vfs.rename(&merge_clog_subdir, &clog_dir) { println!("Error renaming merge_clog_subdir to clog: {:?}", e); return Err(Error::from(e)); } // Clear recovery state after successful completion - RecoveryState::clear(&opts.dir)?; + RecoveryState::clear(&opts.dir, vfs)?; Ok(()) }; @@ -334,22 +334,32 @@ fn perform_recovery(opts: &Options) -> Result<()> { Err(e) => { let merge_dir = opts.dir.join(".merge"); let clog_dir = opts.dir.join("clog"); - rollback(&merge_dir, &clog_dir, RecoveryState::load(&opts.dir)?)?; + rollback( + &merge_dir, + &clog_dir, + RecoveryState::load(&opts.dir, vfs)?, + vfs, + )?; Err(e) } } } -fn cleanup_after_recovery(opts: &Options) -> Result<()> { +fn cleanup_after_recovery(opts: &Options, vfs: &V) -> Result<()> { let merge_dir = opts.dir.join(".merge"); if merge_dir.exists() { - fs::remove_dir_all(&merge_dir)?; + vfs.remove_dir_all(&merge_dir)?; } Ok(()) } -fn rollback(merge_dir: &Path, clog_dir: &Path, checkpoint: RecoveryState) -> Result<()> { +fn rollback( + merge_dir: &Path, + clog_dir: &Path, + checkpoint: RecoveryState, + vfs: &V, +) -> Result<()> { if checkpoint == RecoveryState::ClogDeleted { // Restore the clog directory from merge directory if it exists // At this point the merge directory should exist and the clog directory should not @@ -357,7 +367,7 @@ fn rollback(merge_dir: &Path, clog_dir: &Path, checkpoint: RecoveryState) -> Res if !clog_dir.exists() && merge_dir.exists() { let merge_clog_subdir = merge_dir.join("clog"); if merge_clog_subdir.exists() { - fs::rename(&merge_clog_subdir, clog_dir)?; + vfs.rename(&merge_clog_subdir, clog_dir)?; } } } @@ -369,21 +379,21 @@ fn needs_recovery(opts: &Options) -> Result { Ok(opts.dir.join(".merge").exists()) } -fn handle_clog_deleted_state(opts: &Options) -> Result<()> { +fn handle_clog_deleted_state(opts: &Options, vfs: &V) -> Result<()> { let merge_dir = opts.dir.join(".merge"); let clog_dir = opts.dir.join("clog"); - rollback(&merge_dir, &clog_dir, RecoveryState::ClogDeleted) + rollback(&merge_dir, &clog_dir, RecoveryState::ClogDeleted, vfs) } /// Restores the store from a compaction process by handling .tmp.merge and .merge directories. /// TODO: This should happen post repair -pub fn restore_from_compaction(opts: &Options) -> Result<()> { +pub fn restore_from_compaction(opts: &Options, vfs: &V) -> Result<()> { let tmp_merge_dir = opts.dir.join(".tmp.merge"); // 1) Check if there is a .tmp.merge directory, delete it if tmp_merge_dir.exists() { // This means there was a previous compaction process that failed // so we don't need to do anything here and just return - fs::remove_dir_all(&tmp_merge_dir)?; + vfs.remove_dir_all(&tmp_merge_dir)?; return Ok(()); } @@ -391,14 +401,14 @@ pub fn restore_from_compaction(opts: &Options) -> Result<()> { return Ok(()); } - match RecoveryState::load(&opts.dir)? { - RecoveryState::ClogDeleted => handle_clog_deleted_state(opts)?, + match RecoveryState::load(&opts.dir, vfs)? { + RecoveryState::ClogDeleted => handle_clog_deleted_state(opts, vfs)?, RecoveryState::None => (), } - perform_recovery(opts)?; + perform_recovery(opts, vfs)?; // Clean up merge directory after successful operation - cleanup_after_recovery(opts) + cleanup_after_recovery(opts, vfs) } #[cfg(test)] @@ -428,30 +438,37 @@ mod tests { let temp_dir = &temp_dir.to_path_buf(); // Clear state and re-setup for next test - RecoveryState::clear(temp_dir).unwrap(); + RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap(); assert!(!path.exists()); // Test saving and loading ClogDeleted - RecoveryState::ClogDeleted.save(temp_dir).unwrap(); + RecoveryState::ClogDeleted + .save(temp_dir, &crate::vfs::Dummy) + .unwrap(); assert_eq!( - RecoveryState::load(temp_dir).unwrap(), + RecoveryState::load(temp_dir, &crate::vfs::Dummy).unwrap(), RecoveryState::ClogDeleted ); // Clear state and re-setup for next test - RecoveryState::clear(temp_dir).unwrap(); + RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap(); assert!(!path.exists()); // Test loading None when no state is saved - assert_eq!(RecoveryState::load(temp_dir).unwrap(), RecoveryState::None); + assert_eq!( + RecoveryState::load(temp_dir, &crate::vfs::Dummy).unwrap(), + RecoveryState::None + ); // Test save contents for ClogDeleted - RecoveryState::ClogDeleted.save(temp_dir).unwrap(); + RecoveryState::ClogDeleted + .save(temp_dir, &crate::vfs::Dummy) + .unwrap(); let contents = read_to_string(&path).unwrap(); assert_eq!(contents, "ClogDeleted"); // Final clear to clean up - RecoveryState::clear(temp_dir).unwrap(); + RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap(); assert!(!path.exists()); } diff --git a/src/storage/kv/manifest.rs b/src/storage/kv/manifest.rs index 50601ee7..d4c95ed6 100644 --- a/src/storage/kv/manifest.rs +++ b/src/storage/kv/manifest.rs @@ -3,6 +3,7 @@ use std::path::Path; use super::reader::Reader; use crate::storage::log::{write_field, Error as LogError, MultiSegmentReader, SegmentRef}; +use crate::vfs::FileSystem; use crate::{Error, Options, Result}; #[revisioned(revision = 1)] @@ -81,13 +82,13 @@ impl Manifest { // Load Vec from a dir #[allow(unused)] - pub fn load_from_dir(path: &Path) -> Result { + pub fn load_from_dir(path: &Path, vfs: &V) -> Result { let mut manifests = Manifest::new(); if !path.exists() { return Ok(manifests); } - let sr = SegmentRef::read_segments_from_directory(path)?; + let sr = SegmentRef::read_segments_from_directory(path, vfs)?; let reader = MultiSegmentReader::new(sr)?; let mut reader = Reader::new_from(reader); @@ -132,7 +133,8 @@ mod tests { // Create a temporary directory let temp_dir = create_temp_directory(); let opts = LogOptions::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); let manifest = Manifest { changes: vec![ManifestChangeType::Options(Options::default())], @@ -144,7 +146,7 @@ mod tests { a.close().expect("should close aol"); // Load the manifests from the file - let loaded_manifest = Manifest::load_from_dir(temp_dir.path()).unwrap(); + let loaded_manifest = Manifest::load_from_dir(temp_dir.path(), &crate::vfs::Dummy).unwrap(); // Assert that the loaded manifests contain exactly one manifest assert_eq!(loaded_manifest.changes.len(), 1); @@ -155,7 +157,8 @@ mod tests { // Step 1: Create a temporary directory let temp_dir = create_temp_directory(); let log_opts = LogOptions::default(); - let mut a = Aol::open(temp_dir.path(), &log_opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &log_opts, &crate::vfs::Dummy).expect("should create aol"); // Step 2: Create the first Manifest instance and append it to the file let first_manifest = Manifest { @@ -181,7 +184,7 @@ mod tests { a.close().expect("should close aol"); // Step 5: Load the manifests from the file - let loaded_manifest = Manifest::load_from_dir(temp_dir.path()).unwrap(); + let loaded_manifest = Manifest::load_from_dir(temp_dir.path(), &crate::vfs::Dummy).unwrap(); // Step 6: Assert that the loaded manifests contain exactly two manifests assert_eq!(loaded_manifest.changes.len(), 2); diff --git a/src/storage/kv/reader.rs b/src/storage/kv/reader.rs index f8ded677..f24bf888 100644 --- a/src/storage/kv/reader.rs +++ b/src/storage/kv/reader.rs @@ -280,7 +280,8 @@ mod tests { max_file_size: 4, ..Options::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Test initial offset let sz = (a.active_segment_id, a.active_segment.offset()); @@ -299,7 +300,7 @@ mod tests { a.close().expect("should close aol"); - let sr = SegmentRef::read_segments_from_directory(temp_dir.path()) + let sr = SegmentRef::read_segments_from_directory(temp_dir.path(), &crate::vfs::Dummy) .expect("should read segments"); let sr = MultiSegmentReader::new(sr).expect("should create segment reader"); @@ -337,7 +338,8 @@ mod tests { ..Options::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Append 10 records for i in 0..num_items { @@ -348,7 +350,7 @@ mod tests { a.close().expect("should close aol"); - let sr = SegmentRef::read_segments_from_directory(temp_dir.path()) + let sr = SegmentRef::read_segments_from_directory(temp_dir.path(), &crate::vfs::Dummy) .expect("should read segments"); let sr = MultiSegmentReader::new(sr).expect("should create segment reader"); diff --git a/src/storage/kv/repair.rs b/src/storage/kv/repair.rs index 422259c9..991ac68a 100644 --- a/src/storage/kv/repair.rs +++ b/src/storage/kv/repair.rs @@ -1,7 +1,8 @@ -use std::fs; use std::path::Path; use std::path::PathBuf; +use crate::vfs::FileSystem; + use crate::storage::{ kv::{ error::{Error, Result}, @@ -15,14 +16,15 @@ use crate::storage::{ /// The last active segment being written to in the append-only log (AOL) is usually the WAL in database terminology. /// Corruption in the last segment can happen due to various reasons such as a power failure or a bug in the system, /// and also due to the asynchronous nature of calling close on the store. -pub(crate) fn repair_last_corrupted_segment( +pub(crate) fn repair_last_corrupted_segment( aol: &mut Aol, db_opts: &Options, corrupted_segment_id: u64, corrupted_offset_marker: u64, + vfs: &V, ) -> Result<()> { // Read the list of segments from the directory - let segs = SegmentRef::read_segments_from_directory(&aol.dir)?; + let segs = SegmentRef::read_segments_from_directory(&aol.dir, vfs)?; // Get the last segment let last_segment = segs @@ -51,24 +53,26 @@ pub(crate) fn repair_last_corrupted_segment( corrupted_offset_marker, last_segment_path, last_segment_header_offset, + vfs, ) } /// This function is used to repair a corrupted any given segment in the append-only log (AOL). /// Currently it is only being used for testing purposes. #[allow(unused)] -pub(crate) fn repair_corrupted_segment( +pub(crate) fn repair_corrupted_segment( aol: &mut Aol, db_opts: &Options, corrupted_segment_id: u64, corrupted_offset_marker: u64, + vfs: &V, ) -> Result<()> { let mut file_path = None; let mut file_header_offset = 0; { // Read the list of segments from the directory - let segs = SegmentRef::read_segments_from_directory(&aol.dir)?; + let segs = SegmentRef::read_segments_from_directory(&aol.dir, vfs)?; // Loop through the segments for s in &segs { @@ -94,6 +98,7 @@ pub(crate) fn repair_corrupted_segment( corrupted_offset_marker, file_path, file_header_offset, + vfs, )?; Ok(()) @@ -125,13 +130,14 @@ pub(crate) fn repair_corrupted_segment( /// Finally, the function opens the next segment and makes it active. /// /// If any of these operations fail, the function returns an error. -fn repair_segment( +fn repair_segment( aol: &mut Aol, db_opts: &Options, corrupted_segment_id: u64, corrupted_offset_marker: u64, corrupted_segment_file_path: PathBuf, corrupted_segment_file_header_offset: u64, + vfs: &V, ) -> Result<()> { // Close the active segment if its ID matches if aol.active_segment_id == corrupted_segment_id { @@ -142,7 +148,7 @@ fn repair_segment( let repaired_segment_path = corrupted_segment_file_path.with_extension("repair"); // Rename the corrupted segment to the repaired segment - std::fs::rename(&corrupted_segment_file_path, &repaired_segment_path)?; + vfs.rename(&corrupted_segment_file_path, &repaired_segment_path)?; // Open a new segment as the active segment let mut new_segment = Segment::open(&aol.dir, corrupted_segment_id, &aol.opts)?; @@ -174,12 +180,12 @@ fn repair_segment( new_segment.close()?; // Remove the repaired segment file - std::fs::remove_file(&repaired_segment_path)?; + vfs.remove_file(&repaired_segment_path)?; // Open the next segment and make it active if count == 0 { println!("deleting empty file {:?}", corrupted_segment_file_path); - std::fs::remove_file(&corrupted_segment_file_path)?; + vfs.remove_file(&corrupted_segment_file_path)?; } let new_segment = Segment::open(&aol.dir, aol.active_segment_id, &aol.opts)?; aol.active_segment = new_segment; @@ -195,7 +201,7 @@ fn repair_segment( // // Parameters: // directory: A string slice that holds the path to the directory. -pub(crate) fn restore_repair_files(directory: &str) -> std::io::Result<()> { +pub(crate) fn restore_repair_files(directory: &str, vfs: &V) -> std::io::Result<()> { // Check if the directory exists if !Path::new(directory).exists() { return Ok(()); @@ -205,7 +211,7 @@ pub(crate) fn restore_repair_files(directory: &str) -> std::io::Result<()> { let directory = sanitize_directory(directory)?; // Read the directory - let entries = fs::read_dir(directory.clone())?; + let entries = vfs.read_dir(directory.clone())?; // Iterate over each entry in the directory for entry in entries.flatten() { @@ -222,10 +228,10 @@ pub(crate) fn restore_repair_files(directory: &str) -> std::io::Result<()> { // If the '.clog' file exists if clog_path.exists() { // Remove the '.clog' file - fs::remove_file(&clog_path)?; + vfs.remove_file(&clog_path)?; } // Rename the '.repair' file back to '.clog' - fs::rename(path, clog_path)?; + vfs.rename(path, clog_path)?; } } } @@ -298,8 +304,8 @@ mod tests { fn corrupt_at_offset(opts: Options, segment_num: usize, corruption_offset: u64) { let clog_subdir = opts.dir.join("clog"); - let sr = - SegmentRef::read_segments_from_directory(&clog_subdir).expect("should read segments"); + let sr = SegmentRef::read_segments_from_directory(&clog_subdir, &crate::vfs::Dummy) + .expect("should read segments"); // Open the nth segment file for corrupting let file_path = &sr[segment_num - 1].file_path; @@ -310,11 +316,12 @@ mod tests { // This is useful for testing the repair process because when opening // the store, the repair process will only repair the corrupted segment // if it is the last segment in the directory. - fn corrupt_and_repair( + fn corrupt_and_repair( store: &Store, opts: Options, segment_num: usize, corruption_offset: u64, + vfs: &V, ) { let mut clog = store .inner @@ -326,8 +333,8 @@ mod tests { .unwrap() .write(); let clog_subdir = opts.dir.join("clog"); - let sr = - SegmentRef::read_segments_from_directory(&clog_subdir).expect("should read segments"); + let sr = SegmentRef::read_segments_from_directory(&clog_subdir, &crate::vfs::Dummy) + .expect("should read segments"); // Open the nth segment file for corruption corrupt_at_offset(opts.clone(), segment_num, corruption_offset); @@ -340,6 +347,7 @@ mod tests { &opts, corrupted_segment_id, corrupted_offset_marker, + vfs, ) .unwrap(); @@ -426,7 +434,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 29; // 24bytes is length of header - corrupt_and_repair(&store, opts.clone(), 2, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 2, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -467,7 +481,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 29; // 24bytes is length of txn header - corrupt_and_repair(&store, opts.clone(), 3, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 3, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -518,7 +538,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 29; // 24bytes is length of txn header - corrupt_and_repair(&store, opts.clone(), 1, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 1, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -565,7 +591,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 29; // 24bytes is length of txn header - corrupt_and_repair(&store, opts.clone(), 3, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 3, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -671,7 +703,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 37 + 29; // 24bytes is length of txn header - corrupt_and_repair(&store, opts.clone(), 1, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 1, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -713,7 +751,13 @@ mod tests { let store = setup_store_with_data(opts.clone(), keys, default_value.clone()).await; let corruption_offset = 37 + 29; // 24bytes is length of txn header - corrupt_and_repair(&store, opts.clone(), 2, corruption_offset); + corrupt_and_repair( + &store, + opts.clone(), + 2, + corruption_offset, + &crate::vfs::Dummy, + ); // Close the store store.close().await.expect("should close store"); @@ -795,7 +839,7 @@ mod tests { let mut file2 = File::create(dir.path().join("0002.clog")).unwrap(); file1.write_all(b"Data for 0001.clog").unwrap(); file2.write_all(b"Data for 0002.clog").unwrap(); - assert!(restore_repair_files(dir.path().to_str().unwrap()).is_ok()); + assert!(restore_repair_files(dir.path().to_str().unwrap(), &crate::vfs::Dummy).is_ok()); assert!(dir.path().join("0001.clog").exists()); assert!(dir.path().join("0002.clog").exists()); } @@ -805,7 +849,7 @@ mod tests { let dir = create_temp_directory(); File::create(dir.path().join("001.repair")).unwrap(); File::create(dir.path().join("002.repair")).unwrap(); - assert!(restore_repair_files(dir.path().to_str().unwrap()).is_ok()); + assert!(restore_repair_files(dir.path().to_str().unwrap(), &crate::vfs::Dummy).is_ok()); assert!(dir.path().join("001.clog").exists()); assert!(dir.path().join("002.clog").exists()); assert!(!dir.path().join("001.repair").exists()); @@ -819,7 +863,7 @@ mod tests { File::create(dir.path().join("0002.clog")).unwrap(); File::create(dir.path().join("0001.repair")).unwrap(); File::create(dir.path().join("0002.repair")).unwrap(); - assert!(restore_repair_files(dir.path().to_str().unwrap()).is_ok()); + assert!(restore_repair_files(dir.path().to_str().unwrap(), &crate::vfs::Dummy).is_ok()); assert!(dir.path().join("0001.clog").exists()); assert!(dir.path().join("0002.clog").exists()); assert!(!dir.path().join("0001.repair").exists()); @@ -832,7 +876,7 @@ mod tests { File::create(dir.path().join("0001.clog")).unwrap(); File::create(dir.path().join("0002.clog")).unwrap(); File::create(dir.path().join("0001.repair")).unwrap(); - assert!(restore_repair_files(dir.path().to_str().unwrap()).is_ok()); + assert!(restore_repair_files(dir.path().to_str().unwrap(), &crate::vfs::Dummy).is_ok()); assert!(dir.path().join("0001.clog").exists()); assert!(dir.path().join("0002.clog").exists()); assert!(!dir.path().join("0001.repair").exists()); @@ -849,7 +893,7 @@ mod tests { file2.write_all(b"Data for 0002.clog").unwrap(); repair_file1.write_all(b"Data for 0001.repair").unwrap(); repair_file2.write_all(b"Data for 0002.repair").unwrap(); - assert!(restore_repair_files(dir.path().to_str().unwrap()).is_ok()); + assert!(restore_repair_files(dir.path().to_str().unwrap(), &crate::vfs::Dummy).is_ok()); assert!(dir.path().join("0001.clog").exists()); assert!(dir.path().join("0002.clog").exists()); assert!(!dir.path().join("0001.repair").exists()); diff --git a/src/storage/kv/store.rs b/src/storage/kv/store.rs index 08c23068..ac777fb9 100644 --- a/src/storage/kv/store.rs +++ b/src/storage/kv/store.rs @@ -17,6 +17,7 @@ use parking_lot::RwLock; use quick_cache::sync::Cache; use revision::Revisioned; +use crate::vfs::FileSystem; use vart::art::KV; use crate::storage::{ @@ -38,26 +39,27 @@ use crate::storage::{ log::{Aol, Error as LogError, MultiSegmentReader, Options as LogOptions, SegmentRef}, }; -pub(crate) struct StoreInner { +pub(crate) struct StoreInner { pub(crate) core: Arc, pub(crate) is_closed: AtomicBool, pub(crate) is_compacting: AtomicBool, stop_tx: Sender<()>, task_runner_handle: Arc>>>, pub(crate) stats: Arc, + pub(crate) vfs: V, } // Inner representation of the store. The wrapper will handle the asynchronous closing of the store. -impl StoreInner { +impl StoreInner { /// Creates a new MVCC key-value store with the given options. /// It creates a new core with the options and wraps it in an atomic reference counter. /// It returns the store. - pub fn new(opts: Options) -> Result { + pub fn new(opts: Options, vfs: V) -> Result { // TODO: make this channel size configurable let (writes_tx, writes_rx) = bounded(10000); let (stop_tx, stop_rx) = bounded(1); - let core = Arc::new(Core::new(opts, writes_tx)?); + let core = Arc::new(Core::new(opts, writes_tx, &vfs)?); let task_runner_handle = TaskRunner::new(core.clone(), writes_rx, stop_rx).spawn(); Ok(Self { @@ -67,6 +69,7 @@ impl StoreInner { is_compacting: AtomicBool::new(false), task_runner_handle: Arc::new(AsyncMutex::new(Some(task_runner_handle))), stats: Arc::new(StorageStats::new()), + vfs, }) } @@ -104,6 +107,8 @@ impl StoreInner { } } +pub type Store = StoreImpl; + /// An MVCC-based transactional key-value store. /// /// The store is closed asynchronously when it is dropped. @@ -111,15 +116,21 @@ impl StoreInner { // This is a wrapper around the inner store to allow for asynchronous closing of the store. #[derive(Default)] -pub struct Store { - pub(crate) inner: Option, +pub struct StoreImpl { + pub(crate) inner: Option>, } -impl Store { - /// Creates a new MVCC key-value store with the given options. +impl StoreImpl { pub fn new(opts: Options) -> Result { + Self::with_vfs(opts, crate::vfs::Dummy) + } +} + +impl StoreImpl { + /// Creates a new MVCC key-value store with the given options. + pub fn with_vfs(opts: Options, vfs: V) -> Result { Ok(Self { - inner: Some(StoreInner::new(opts)?), + inner: Some(StoreInner::::new(opts, vfs)?), }) } @@ -189,7 +200,7 @@ impl Store { } } -impl Drop for Store { +impl Drop for StoreImpl { fn drop(&mut self) { if let Some(inner) = self.inner.take() { // Close the store asynchronously @@ -288,14 +299,14 @@ impl Core { } // This function initializes the manifest log for the database to store all settings. - pub(crate) fn initialize_manifest(dir: &Path) -> Result { + pub(crate) fn initialize_manifest(dir: &Path, vfs: &V) -> Result { let manifest_subdir = dir.join("manifest"); let mopts = LogOptions::default().with_file_extension("manifest".to_string()); - Aol::open(&manifest_subdir, &mopts).map_err(Error::from) + Aol::open(&manifest_subdir, &mopts, vfs).map_err(Error::from) } // This function initializes the commit log (clog) for the database. - fn initialize_clog(opts: &Options) -> Result { + fn initialize_clog(opts: &Options, vfs: &V) -> Result { // It first constructs the path to the clog subdirectory within the database directory. let clog_subdir = opts.dir.join("clog"); @@ -313,11 +324,11 @@ impl Core { // // Even though we are restoring the corrupted files, it will get repaired // during in the load_index function. - restore_repair_files(clog_subdir.as_path().to_str().unwrap())?; + restore_repair_files(clog_subdir.as_path().to_str().unwrap(), vfs)?; // Finally, it attempts to open the clog with the specified options. // If this fails, the error is converted to a database error and then propagated up to the caller of the function. - Aol::open(&clog_subdir, &copts).map_err(Error::from) + Aol::open(&clog_subdir, &copts, vfs).map_err(Error::from) } /// Creates a new Core with the given options. @@ -326,7 +337,7 @@ impl Core { /// opens or creates the commit log file, loads the index from the commit log if it exists, creates /// and initializes an Oracle, creates and initializes a value cache, and constructs and returns /// the Core instance. - pub fn new(opts: Options, writes_tx: Sender) -> Result { + pub fn new(opts: Options, writes_tx: Sender, vfs: &V) -> Result { // Initialize a new Indexer with the provided options. let mut indexer = Self::initialize_indexer(); @@ -336,20 +347,20 @@ impl Core { let mut num_entries = 0; if opts.should_persist_data() { // Determine options for the manifest file and open or create it. - manifest = Some(Self::initialize_manifest(&opts.dir)?); + manifest = Some(Self::initialize_manifest(&opts.dir, vfs)?); // Load options from the manifest file. - let opts = Core::load_manifest(&opts, manifest.as_mut().unwrap())?; + let opts = Core::load_manifest(&opts, manifest.as_mut().unwrap(), vfs)?; // Determine options for the commit log file and open or create it. - clog = Some(Self::initialize_clog(&opts)?); + clog = Some(Self::initialize_clog(&opts, vfs)?); // Restore the store from a compaction process if necessary. - restore_from_compaction(&opts)?; + restore_from_compaction(&opts, vfs)?; // Load the index from the commit log if it exists. if clog.as_ref().unwrap().size()? > 0 { - num_entries = Core::load_index(&opts, clog.as_mut().unwrap(), &mut indexer)?; + num_entries = Core::load_index(&opts, clog.as_mut().unwrap(), &mut indexer, vfs)?; } } @@ -383,12 +394,17 @@ impl Core { } // The load_index function is responsible for loading the index from the log. - fn load_index(opts: &Options, clog: &mut Aol, indexer: &mut Indexer) -> Result { + fn load_index( + opts: &Options, + clog: &mut Aol, + indexer: &mut Indexer, + vfs: &V, + ) -> Result { // The directory where the log segments are stored is determined. let clog_subdir = opts.dir.join("clog"); // The segments are read from the directory. - let sr = SegmentRef::read_segments_from_directory(clog_subdir.as_path()) + let sr = SegmentRef::read_segments_from_directory(clog_subdir.as_path(), vfs) .expect("should read segments"); // A MultiSegmentReader is created to read from multiple segments. @@ -444,7 +460,7 @@ impl Core { "Repairing corrupted segment with id: {} and offset: {}", corrupted_segment_id, corrupted_offset ); - repair_last_corrupted_segment(clog, opts, corrupted_segment_id, corrupted_offset)?; + repair_last_corrupted_segment(clog, opts, corrupted_segment_id, corrupted_offset, vfs)?; } Ok(num_entries) @@ -485,10 +501,14 @@ impl Core { Ok(()) } - fn load_manifest(current_opts: &Options, manifest: &mut Aol) -> Result { + fn load_manifest( + current_opts: &Options, + manifest: &mut Aol, + vfs: &V, + ) -> Result { // Load existing manifests if any, else create a new one let existing_manifest = if manifest.size()? > 0 { - Core::read_manifest(¤t_opts.dir)? + Core::read_manifest(¤t_opts.dir, vfs)? } else { Manifest::new() }; @@ -549,9 +569,9 @@ impl Core { } /// Loads the latest options from the manifest log. - pub(crate) fn read_manifest(dir: &Path) -> Result { + pub(crate) fn read_manifest(dir: &Path, vfs: &V) -> Result { let manifest_subdir = dir.join("manifest"); - let sr = SegmentRef::read_segments_from_directory(manifest_subdir.as_path()) + let sr = SegmentRef::read_segments_from_directory(manifest_subdir.as_path(), vfs) .expect("should read segments"); let reader = MultiSegmentReader::new(sr)?; let mut reader = Reader::new_from(reader); diff --git a/src/storage/kv/transaction.rs b/src/storage/kv/transaction.rs index f13cce9d..c3445c5b 100644 --- a/src/storage/kv/transaction.rs +++ b/src/storage/kv/transaction.rs @@ -2514,7 +2514,9 @@ mod tests { store.close().await.unwrap(); let clog_subdir = tmp_dir.path().join("clog"); - let sr = SegmentRef::read_segments_from_directory(clog_subdir.as_path()).unwrap(); + let sr = + SegmentRef::read_segments_from_directory(clog_subdir.as_path(), &crate::vfs::Dummy) + .unwrap(); let reader = MultiSegmentReader::new(sr).unwrap(); let reader = Reader::new_from(reader); let mut tx_reader = RecordReader::new(reader, 100, 100); diff --git a/src/storage/kv/util.rs b/src/storage/kv/util.rs index 31be96e3..858b8df0 100644 --- a/src/storage/kv/util.rs +++ b/src/storage/kv/util.rs @@ -1,14 +1,11 @@ use std::{ - fs, ops::{Bound, RangeBounds}, - path::{Path, PathBuf}, + path::PathBuf, }; use chrono::Utc; use vart::VariableSizeKey; -use crate::Result; - /// Gets the current time in nanoseconds since the Unix epoch. /// It gets the current time in UTC, extracts the timestamp in nanoseconds, and asserts that the timestamp is positive. /// It returns the timestamp as a 64-bit unsigned integer. @@ -48,24 +45,6 @@ pub(crate) fn sanitize_directory(directory: &str) -> std::io::Result { Ok(path) } -// Utility function to recursively copy a directory -#[allow(unused)] -pub(crate) fn copy_dir_all(src: &Path, dst: &Path) -> Result<()> { - if !dst.exists() { - fs::create_dir_all(dst)?; - } - for entry in fs::read_dir(src)? { - let entry = entry?; - let ty = entry.file_type()?; - if ty.is_dir() { - copy_dir_all(&entry.path(), &dst.join(entry.file_name()))?; - } else { - fs::copy(entry.path(), dst.join(entry.file_name()))?; - } - } - Ok(()) -} - pub(crate) fn convert_range_bounds<'a, R>( range: R, ) -> (Bound, Bound) diff --git a/src/storage/log/aol.rs b/src/storage/log/aol.rs index d04e54c7..889d424b 100644 --- a/src/storage/log/aol.rs +++ b/src/storage/log/aol.rs @@ -1,4 +1,3 @@ -use std::fs; use std::io; use std::mem; use std::num::NonZeroUsize; @@ -7,6 +6,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; +use crate::vfs::FileSystem; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -47,15 +47,15 @@ impl Aol { /// /// This function prepares the AOL instance by creating the necessary directory, /// determining the active segment ID, and initializing the active segment. - pub fn open(dir: &Path, opts: &Options) -> Result { + pub fn open(dir: &Path, opts: &Options, vfs: &V) -> Result { // Ensure the options are valid opts.validate()?; // Ensure the directory exists with proper permissions - Self::prepare_directory(dir, opts)?; + Self::prepare_directory(dir, opts, vfs)?; // Determine the active segment ID - let active_segment_id = Self::calculate_current_write_segment_id(dir)?; + let active_segment_id = Self::calculate_current_write_segment_id(dir, vfs)?; // Open the active segment let active_segment = Segment::open(dir, active_segment_id, opts)?; @@ -77,10 +77,10 @@ impl Aol { } // Helper function to prepare the directory with proper permissions - fn prepare_directory(dir: &Path, opts: &Options) -> Result<()> { - fs::create_dir_all(dir)?; + fn prepare_directory(dir: &Path, opts: &Options, vfs: &V) -> Result<()> { + vfs.create_dir_all(dir)?; - if let Ok(metadata) = fs::metadata(dir) { + if let Ok(metadata) = vfs.metadata(dir) { let mut permissions = metadata.permissions(); #[cfg(unix)] @@ -94,15 +94,15 @@ impl Aol { permissions.set_readonly(false); } - fs::set_permissions(dir, permissions)?; + vfs.set_permissions(dir, permissions)?; } Ok(()) } // Helper function to calculate the active segment ID - fn calculate_current_write_segment_id(dir: &Path) -> Result { - let (_, last) = get_segment_range(dir)?; + fn calculate_current_write_segment_id(dir: &Path, vfs: &V) -> Result { + let (_, last) = get_segment_range(dir, vfs)?; Ok(last) } @@ -299,7 +299,8 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Test initial offset let sz = (a.active_segment_id, a.active_segment.offset()); @@ -370,7 +371,8 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Create two slices of bytes of different sizes let data1 = vec![1; 31 * 1024]; @@ -415,7 +417,8 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Create two slices of bytes of different sizes let data1 = vec![1; 31 * 1024]; @@ -484,7 +487,8 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); let large_record = vec![1; 1025]; let small_record = vec![1; 1024]; @@ -511,7 +515,8 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); let small_record = vec![1; 1024]; let r = a.append(&small_record); @@ -542,7 +547,8 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); let large_record = vec![1; 1024]; let small_record = vec![1; 512]; @@ -556,7 +562,8 @@ mod tests { a.close().expect("should close"); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); assert_eq!(0, a.active_segment_id); let r = a.append(&small_record); @@ -577,7 +584,8 @@ mod tests { max_file_size: 1024, // Small enough to ensure the second append creates a new file ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Append a record that fits within the first file let first_record = vec![1; 512]; @@ -613,7 +621,8 @@ mod tests { // Setup: Create a temporary directory and initialize the log with default options let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Append a single record to ensure there is some data in the log let record = vec![1; 512]; @@ -639,13 +648,15 @@ mod tests { // Step 1: Open the log, append a record, and then close it { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); a.append(&record).expect("append should succeed"); } // Log is closed here as `a` goes out of scope // Step 2: Reopen the log and append another record { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should reopen aol"); a.append(&record) .expect("append after reopen should succeed"); @@ -673,14 +684,16 @@ mod tests { // Step 1: Open the log, append a record, and then close it { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); a.append(&record).expect("first append should succeed"); a.append(&record).expect("first append should succeed"); } // Log is closed here as `a` goes out of scope // Step 2: Reopen the log and append another record, which should create a new file { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should reopen aol"); // Verify: Ensure the first record is in a new file by reading it back let mut read_buf = vec![0; 512]; @@ -712,7 +725,8 @@ mod tests { fn sequential_read_performance() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Append 1000 records to ensure we have enough data let record = vec![1; 512]; // Each record is 512 bytes @@ -739,7 +753,8 @@ mod tests { fn random_access_read_performance() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut a = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Append 1000 records to ensure we have enough data let record = vec![1; 512]; // Each record is 512 bytes @@ -772,7 +787,8 @@ mod tests { fn test_rotate_functionality() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut aol = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let mut aol = + Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol"); // Ensure there's data in the current segment to necessitate a rotation aol.append(b"data 1").unwrap(); diff --git a/src/storage/log/mod.rs b/src/storage/log/mod.rs index 415bbeee..090bdad9 100644 --- a/src/storage/log/mod.rs +++ b/src/storage/log/mod.rs @@ -1,10 +1,9 @@ mod aol; pub use aol::Aol; +use crate::vfs::{File, FileSystem, OpenOptions}; use ahash::{HashMap, HashMapExt}; use std::fmt; -use std::fs::File; -use std::fs::{read_dir, OpenOptions}; use std::io::BufReader; use std::io::{self, BufRead, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; @@ -526,8 +525,8 @@ pub(crate) fn segment_name(index: u64, ext: &str) -> String { /// /// This function returns a tuple containing the minimum and maximum segment IDs /// found in the directory. If no segments are found, the tuple will contain (0, 0). -fn get_segment_range(dir: &Path) -> Result<(u64, u64)> { - let refs = list_segment_ids(dir)?; +fn get_segment_range(dir: &Path, vfs: &V) -> Result<(u64, u64)> { + let refs = list_segment_ids(dir, vfs)?; if refs.is_empty() { return Ok((0, 0)); } @@ -539,15 +538,15 @@ fn get_segment_range(dir: &Path) -> Result<(u64, u64)> { /// This function reads the names of segment files in the directory and extracts the segment IDs. /// The segment IDs are returned as a sorted vector. If no segment files are found, an empty /// vector is returned. -fn list_segment_ids(dir: &Path) -> Result> { +fn list_segment_ids(dir: &Path, vfs: &V) -> Result> { let mut refs: Vec = Vec::new(); - let entries = read_dir(dir)?; + let entries = vfs.read_dir(dir)?; for entry in entries { let file = entry?; // Check if the entry is a file - if std::fs::metadata(file.path())?.is_file() { + if vfs.metadata(file.path())?.is_file() { let fn_name = file.file_name(); let fn_str = fn_name.to_string_lossy(); let (index, _) = parse_segment_name(&fn_str)?; @@ -572,11 +571,14 @@ pub(crate) struct SegmentRef { impl SegmentRef { /// Creates a vector of SegmentRef instances by reading segments in the specified directory. - pub fn read_segments_from_directory(directory_path: &Path) -> Result> { + pub fn read_segments_from_directory( + directory_path: &Path, + vfs: &V, + ) -> Result> { let mut segment_refs = Vec::new(); // Read the directory and iterate through its entries - let files = read_dir(directory_path)?; + let files = vfs.read_dir(directory_path)?; for file in files { let entry = file?; if entry.file_type()?.is_file() { @@ -1249,7 +1251,7 @@ mod tests { let temp_dir = create_temp_directory(); let dir = temp_dir.path().to_path_buf(); - let result = get_segment_range(&dir).unwrap(); + let result = get_segment_range(&dir, &crate::vfs::Dummy).unwrap(); assert_eq!(result, (0, 0)); } @@ -1263,7 +1265,7 @@ mod tests { create_segment_file(&dir, "00000000000000000002.log"); create_segment_file(&dir, "00000000000000000004.log"); - let result = get_segment_range(&dir).unwrap(); + let result = get_segment_range(&dir, &crate::vfs::Dummy).unwrap(); assert_eq!(result, (1, 4)); } @@ -1639,7 +1641,7 @@ mod tests { create_segment_file(dir_path, &segment_name(10, "")); // Call the function under test - let segment_ids = list_segment_ids(dir_path).unwrap(); + let segment_ids = list_segment_ids(dir_path, &crate::vfs::Dummy).unwrap(); // Verify the output assert_eq!(segment_ids, vec![1, 2, 10]); diff --git a/src/vfs.rs b/src/vfs.rs new file mode 100644 index 00000000..f0259051 --- /dev/null +++ b/src/vfs.rs @@ -0,0 +1,68 @@ +pub use std::fs::{File, OpenOptions}; +use std::{ + fs::{Metadata, Permissions, ReadDir}, + io, + path::Path, +}; + +pub trait FileSystem: Send + Sync + 'static { + fn rename, Q: AsRef>(&self, from: P, to: Q) -> io::Result<()>; + fn remove_file>(&self, path: P) -> io::Result<()>; + fn read_dir>(&self, path: P) -> io::Result; + fn remove_dir_all>(&self, path: P) -> io::Result<()>; + fn create_dir_all>(&self, path: P) -> io::Result<()>; + fn open>(&self, path: P) -> io::Result; + fn metadata>(&self, path: P) -> io::Result; + fn copy, Q: AsRef>(&self, from: P, to: Q) -> io::Result; + fn set_permissions>(&self, path: P, perm: Permissions) -> io::Result<()>; + fn create>(&self, path: P) -> io::Result; + fn copy_dir_all(&self, src: &Path, dst: &Path) -> io::Result<()> { + if !dst.exists() { + self.create_dir_all(dst)?; + } + for entry in self.read_dir(src)? { + let entry = entry?; + let ty = entry.file_type()?; + if ty.is_dir() { + self.copy_dir_all(&entry.path(), &dst.join(entry.file_name()))?; + } else { + self.copy(entry.path(), dst.join(entry.file_name()))?; + } + } + Ok(()) + } +} + +pub struct Dummy; +impl FileSystem for Dummy { + fn rename, Q: AsRef>(&self, from: P, to: Q) -> io::Result<()> { + std::fs::rename(from, to) + } + fn remove_file>(&self, path: P) -> io::Result<()> { + std::fs::remove_file(path) + } + fn read_dir>(&self, path: P) -> io::Result { + std::fs::read_dir(path) + } + fn remove_dir_all>(&self, path: P) -> io::Result<()> { + std::fs::remove_dir_all(path) + } + fn create_dir_all>(&self, path: P) -> io::Result<()> { + std::fs::create_dir_all(path) + } + fn open>(&self, path: P) -> io::Result { + std::fs::File::open(path) + } + fn metadata>(&self, path: P) -> io::Result { + std::fs::metadata(path) + } + fn copy, Q: AsRef>(&self, from: P, to: Q) -> io::Result { + std::fs::copy(from, to) + } + fn set_permissions>(&self, path: P, perm: Permissions) -> io::Result<()> { + std::fs::set_permissions(path, perm) + } + fn create>(&self, path: P) -> io::Result { + std::fs::File::create(path) + } +}