diff --git a/README.md b/README.md index 8fafc1c..b51f892 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,14 @@ Generic value log implementation for key-value separated storage, inspired by RocksDB's BlobDB [[1]](#footnotes) and implemented in safe, stable Rust. -> This crate is intended for key-value separated LSM storage. +> This crate is intended as a building block for key-value separated LSM storage. > You probably want to use https://github.com/fjall-rs/fjall instead. ## Features - Thread-safe API - 100% safe & stable Rust -- Supports generic index structures (LSM-tree, ...) +- Supports generic KV-index structures (LSM-tree, ...) - Built-in per-blob compression (LZ4) - In-memory blob cache for hot data @@ -25,6 +25,14 @@ All source code is licensed under MIT OR Apache-2.0. All contributions are to be licensed as MIT OR Apache-2.0. +## Development + +### Run benchmarks + +```bash +cargo bench --features bloom +``` + ## Footnotes [1] https://github.com/facebook/rocksdb/wiki/BlobDB diff --git a/src/config.rs b/src/config.rs index 41d811e..a2b0cdf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,8 +20,7 @@ impl Default for Config { impl Config { /// Sets the blob cache. /// - /// Defaults to a blob cache 16 MiB of capacity shared - /// between all partitions inside this keyspace. + /// Defaults to a blob cache with 16 MiB of capacity. #[must_use] pub fn blob_cache(mut self, blob_cache: Arc) -> Self { self.blob_cache = blob_cache; diff --git a/src/index.rs b/src/index.rs index 791150f..2345c59 100644 --- a/src/index.rs +++ b/src/index.rs @@ -4,7 +4,7 @@ use crate::ValueHandle; /// /// An index should point into the value log using [`ValueHandle`]. pub trait Index { - /// Returns a value habdle for a given key. + /// Returns a value handle for a given key. /// /// This method is used to index back into the index to check for /// stale values when scanning through the value log's segments. @@ -27,11 +27,11 @@ pub trait Index { fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>; } -/// Trait that allows writing into an index +/// Trait that allows writing into an external index +/// +/// The write process should be atomic. pub trait Writer { - /// Inserts an value handle into the index. - /// - /// This method is called during value log garbage collection. + /// Inserts a value handle into the index. /// /// # Errors /// @@ -40,8 +40,6 @@ pub trait Writer { /// Finishes the write batch. /// - /// This operation should be atomic. - /// /// # Errors /// /// Will return `Err` if an IO error occurs. diff --git a/src/main.rs b/src/main.rs index 37c8107..160f4da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,7 +52,7 @@ fn main() -> value_log::Result<()> { let mut writer = value_log.get_writer()?; let segment_id = writer.segment_id(); - for key in ["a", "b", "c", "d", "e"] { + for key in ["a", "b", "c", "d", "e", "html"] { let offset = writer.offset(key.as_bytes()); index.insert_indirection( @@ -113,6 +113,52 @@ fn main() -> value_log::Result<()> { value_log.register(writer)?; } + { + let mut writer = value_log.get_writer()?; + let segment_id = writer.segment_id(); + + let key = "html"; + let offset = writer.offset(key.as_bytes()); + + index.insert_indirection( + key.as_bytes(), + ValueHandle { + offset, + segment_id: segment_id.clone(), + }, + )?; + + writer.write( + key.as_bytes(), + b" + +
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+
Hello World
+ ", + )?; + + value_log.register(writer)?; + } + /* { let mut writer = value_log.get_writer()?; let segment_id = writer.segment_id(); @@ -241,7 +287,25 @@ fn main() -> value_log::Result<()> { eprintln!("=== rollover ==="); value_log.rollover(&value_log.list_segments(), DebugIndexWriter(index.clone()))?; */ + eprintln!("=== before refresh ==="); eprintln!("{:#?}", value_log.segments.read().unwrap()); + eprintln!( + "space amp: {}, wasted bytes: {}", + value_log.space_amp(), + value_log.reclaimable_bytes() + ); + + for id in value_log.segments.read().unwrap().keys() { + value_log.refresh_stats(id)?; + } + + eprintln!("=== after refresh ==="); + eprintln!("{:#?}", value_log.segments.read().unwrap()); + eprintln!( + "space amp: {}, wasted bytes: {}", + value_log.space_amp(), + value_log.reclaimable_bytes() + ); let handle = index.get(b"html")?.unwrap(); eprintln!( diff --git a/src/value_log.rs b/src/value_log.rs index fe2903d..5b1e1b9 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -14,7 +14,7 @@ use std::{ sync::{atomic::AtomicU64, Arc, Mutex, RwLock}, }; -/// A disk-resident value log. +/// A disk-resident value log #[derive(Clone)] pub struct ValueLog(Arc); @@ -41,7 +41,7 @@ pub struct ValueLogInner { /// Segment manifest pub segments: RwLock, Arc>>, - semaphore: Mutex<()>, + rollover_guard: Mutex<()>, } impl ValueLog { @@ -97,7 +97,7 @@ impl ValueLog { blob_cache, index, segments: RwLock::new(BTreeMap::default()), - semaphore: Mutex::new(()), + rollover_guard: Mutex::new(()), }))) } @@ -197,13 +197,6 @@ impl ValueLog { Ok(Some(val)) } - /* pub fn get_multiple( - &self, - handles: &[ValueHandle], - ) -> crate::Result>>> { - handles.iter().map(|vr| self.get(vr)).collect() - } */ - /// Initializes a new segment writer /// /// # Errors @@ -248,21 +241,31 @@ impl ValueLog { Ok(()) } + /// Returns the amount of bytes on disk that are occupied by blobs. + /// + /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. + #[must_use] + pub fn disk_space_used(&self) -> u64 { + self.segments + .read() + .expect("lock is poisoned") + .values() + .map(|x| x.stats.total_bytes) + .sum::() + } + /// Returns the amount of bytes that can be freed on disk /// if all segments were to be defragmented /// /// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`]. #[must_use] pub fn reclaimable_bytes(&self) -> u64 { - let segments = self.segments.read().expect("lock is poisoned"); - - let dead_bytes = segments + self.segments + .read() + .expect("lock is poisoned") .values() .map(|x| x.stats.get_dead_bytes()) - .sum::(); - drop(segments); - - dead_bytes + .sum::() } /// Returns the percent of dead bytes in the value log @@ -383,7 +386,7 @@ impl ValueLog { index_writer: &W, ) -> crate::Result<()> { // IMPORTANT: Only allow 1 rollover at any given time - let _guard = self.semaphore.lock().expect("lock is poisoned"); + let _guard = self.rollover_guard.lock().expect("lock is poisoned"); let lock = self.segments.read().expect("lock is poisoned"); @@ -409,7 +412,6 @@ impl ValueLog { for item in reader { let (k, v, _) = item?; - eprintln!("{k:?} => {:?}", String::from_utf8_lossy(&v)); let segment_id = writer.segment_id(); let offset = writer.offset(&k);