diff --git a/benches/value_log.rs b/benches/value_log.rs index a802730..4dfdad3 100644 --- a/benches/value_log.rs +++ b/benches/value_log.rs @@ -29,7 +29,6 @@ fn load_value(c: &mut Criterion) { let value_log = ValueLog::open( vl_path, Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), - index.clone(), ) .unwrap(); @@ -76,7 +75,6 @@ fn load_value(c: &mut Criterion) { vl_path, Config::default() .blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))), - index.clone(), ) .unwrap(); @@ -128,7 +126,6 @@ fn compression(c: &mut Criterion) { let value_log = ValueLog::open( vl_path, Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))), - index.clone(), ) .unwrap(); diff --git a/src/manifest.rs b/src/manifest.rs index d78cdeb..c869659 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -34,7 +34,7 @@ fn rewrite_atomic>(path: P, content: &[u8]) -> std::io::Result<() #[allow(clippy::module_name_repetitions)] pub struct SegmentManifestInner { path: PathBuf, - pub(crate) segments: RwLock>>, + pub segments: RwLock>>, } #[allow(clippy::module_name_repetitions)] @@ -141,12 +141,16 @@ impl SegmentManifest { } pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> { + // TODO: atomic swap + let mut lock = self.segments.write().expect("lock is poisoned"); lock.retain(|x, _| !ids.contains(x)); Self::write_to_disk(&self.path, &lock.keys().copied().collect::>()) } pub fn register(&self, writer: MultiWriter) -> crate::Result<()> { + // TODO: atomic swap + let mut lock = self.segments.write().expect("lock is poisoned"); let writers = writer.finish()?; @@ -162,6 +166,7 @@ impl SegmentManifest { stats: Stats { item_count: writer.item_count.into(), total_bytes: writer.written_blob_bytes.into(), + total_uncompressed_bytes: writer.uncompressed_bytes.into(), stale_items: AtomicU64::default(), stale_bytes: AtomicU64::default(), }, diff --git a/src/segment/merge.rs b/src/segment/merge.rs index 16bc5e3..e733322 100644 --- a/src/segment/merge.rs +++ b/src/segment/merge.rs @@ -1,7 +1,7 @@ use crate::{id::SegmentId, SegmentReader}; use std::cmp::Reverse; -// TODO: replace with MinHeap +// TODO: replace with MinHeap... use min_max_heap::MinMaxHeap; type IteratorIndex = usize; diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 15c28e1..9ac234f 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -10,12 +10,6 @@ use std::path::PathBuf; /// A disk segment is an immutable, sorted, contiguous file /// that contains key-value pairs. -/// -/// ### File format -/// -/// KV: \ \ \ \ \ -/// -/// Segment: { KV } + #[derive(Debug)] pub struct Segment { /// Segment ID diff --git a/src/segment/stats.rs b/src/segment/stats.rs index b354578..163d404 100644 --- a/src/segment/stats.rs +++ b/src/segment/stats.rs @@ -3,10 +3,11 @@ use std::sync::atomic::AtomicU64; #[derive(Debug, Default)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub struct Stats { - pub(crate) item_count: AtomicU64, + pub(crate) item_count: AtomicU64, // TODO: u64? pub(crate) stale_items: AtomicU64, - pub total_bytes: AtomicU64, + pub total_uncompressed_bytes: AtomicU64, // TODO: u64? + pub total_bytes: AtomicU64, // TODO: u64? pub(crate) stale_bytes: AtomicU64, // TODO: key range } @@ -24,6 +25,11 @@ impl Stats { self.item_count.load(std::sync::atomic::Ordering::Acquire) } + pub fn total_uncompressed_bytes(&self) -> u64 { + self.total_uncompressed_bytes + .load(std::sync::atomic::Ordering::Acquire) + } + pub fn total_bytes(&self) -> u64 { self.total_bytes.load(std::sync::atomic::Ordering::Acquire) } diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 358a85c..e1615bb 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -17,6 +17,7 @@ pub struct Writer { pub(crate) item_count: u64, pub(crate) written_blob_bytes: u64, + pub(crate) uncompressed_bytes: u64, } impl Writer { @@ -40,6 +41,7 @@ impl Writer { offset: 0, item_count: 0, written_blob_bytes: 0, + uncompressed_bytes: 0, }) } @@ -71,6 +73,8 @@ impl Writer { assert!(key.len() <= u16::MAX.into()); assert!(u32::try_from(value.len()).is_ok()); + self.uncompressed_bytes += value.len() as u64; + #[cfg(feature = "lz4")] let value = lz4_flex::compress_prepend_size(value); diff --git a/src/value_log.rs b/src/value_log.rs index 09ba534..5f6127e 100644 --- a/src/value_log.rs +++ b/src/value_log.rs @@ -6,7 +6,7 @@ use crate::{ path::absolute_path, segment::merge::MergeReader, version::Version, - Config, ExternalIndex, SegmentWriter, ValueHandle, + Config, SegmentWriter, ValueHandle, }; use byteorder::{BigEndian, ReadBytesExt}; use std::{ @@ -18,10 +18,10 @@ use std::{ /// A disk-resident value log #[derive(Clone)] -pub struct ValueLog(Arc>); +pub struct ValueLog(Arc); -impl std::ops::Deref for ValueLog { - type Target = ValueLogInner; +impl std::ops::Deref for ValueLog { + type Target = ValueLogInner; fn deref(&self) -> &Self::Target { &self.0 @@ -29,13 +29,14 @@ impl std::ops::Deref for ValueLog { } #[allow(clippy::module_name_repetitions)] -pub struct ValueLogInner { +pub struct ValueLogInner { config: Config, path: PathBuf, + // TODO: maybe not needed persistently... /// External index - pub index: I, + // pub index: I, /// In-memory blob cache blob_cache: Arc, @@ -48,7 +49,7 @@ pub struct ValueLogInner { rollover_guard: Mutex<()>, } -impl ValueLog { +impl ValueLog { /// Creates or recovers a value log in the given directory. /// /// # Errors @@ -57,23 +58,18 @@ impl ValueLog { pub fn open>( path: P, // TODO: move path into config? config: Config, - index: I, ) -> crate::Result { let path = path.into(); if path.join(VLOG_MARKER).try_exists()? { - Self::recover(path, config, index) + Self::recover(path, config) } else { - Self::create_new(path, config, index) + Self::create_new(path, config) } } /// Creates a new empty value log in a directory. - pub(crate) fn create_new>( - path: P, - config: Config, - index: I, - ) -> crate::Result { + pub(crate) fn create_new>(path: P, config: Config) -> crate::Result { let path = absolute_path(path.into()); log::trace!("Creating value-log at {}", path.display()); @@ -109,18 +105,13 @@ impl ValueLog { config, path, blob_cache, - index, manifest, id_generator: IdGenerator::default(), rollover_guard: Mutex::new(()), }))) } - pub(crate) fn recover>( - path: P, - config: Config, - index: I, - ) -> crate::Result { + pub(crate) fn recover>(path: P, config: Config) -> crate::Result { let path = path.into(); log::info!("Recovering value-log at {}", path.display()); @@ -143,7 +134,6 @@ impl ValueLog { config, path, blob_cache, - index, manifest, // TODO: recover ID, test!!!, maybe store next ID in manifest as u64 id_generator: IdGenerator::default(), @@ -215,7 +205,7 @@ impl ValueLog { )?) } - /// Scans through a segment, refreshing its statistics + /* /// Scans through a segment, refreshing its statistics /// /// This function is blocking. /// @@ -277,7 +267,7 @@ impl ValueLog { // TODO: changing stats doesn't happen **too** often, so the I/O is fine Ok(()) - } + } */ /// Drops stale segments pub fn drop_stale_segments(&self) -> crate::Result<()> { diff --git a/tests/basic_gc.rs b/tests/basic_gc.rs index 81725fa..fa1d313 100644 --- a/tests/basic_gc.rs +++ b/tests/basic_gc.rs @@ -9,7 +9,7 @@ fn basic_gc() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; { let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/basic_kv.rs b/tests/basic_kv.rs index 21e49e5..7622987 100644 --- a/tests/basic_kv.rs +++ b/tests/basic_kv.rs @@ -9,7 +9,7 @@ fn basic_kv() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/recovery.rs b/tests/recovery.rs index 28ee059..b34c378 100644 --- a/tests/recovery.rs +++ b/tests/recovery.rs @@ -13,7 +13,7 @@ fn basic_recovery() -> value_log::Result<()> { let items = ["a", "b", "c", "d", "e"]; { - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; { let mut writer = value_log.get_writer()?; @@ -47,7 +47,7 @@ fn basic_recovery() -> value_log::Result<()> { } { - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; // TODO: should be recovered for id in value_log.manifest.list_segment_ids() { @@ -76,8 +76,6 @@ fn basic_recovery() -> value_log::Result<()> { fn delete_unfinished_segment_folders() -> value_log::Result<()> { let folder = tempfile::tempdir()?; - let index = MockIndex::default(); - let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; @@ -86,12 +84,12 @@ fn delete_unfinished_segment_folders() -> value_log::Result<()> { assert!(mock_path.try_exists()?); { - let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let _value_log = ValueLog::open(vl_path, Config::default())?; assert!(mock_path.try_exists()?); } { - let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let _value_log = ValueLog::open(vl_path, Config::default())?; assert!(!mock_path.try_exists()?); } diff --git a/tests/rollover_index_fail_finish.rs b/tests/rollover_index_fail_finish.rs index 3a75eba..a20f625 100644 --- a/tests/rollover_index_fail_finish.rs +++ b/tests/rollover_index_fail_finish.rs @@ -61,7 +61,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; let items = ["a", "b", "c", "d", "e"]; diff --git a/tests/space_amp.rs b/tests/space_amp.rs index 6eedcfc..4f932bf 100644 --- a/tests/space_amp.rs +++ b/tests/space_amp.rs @@ -9,7 +9,7 @@ fn worst_case_space_amp() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; assert_eq!(0.0, value_log.manifest.space_amp()); assert_eq!(0.0, value_log.manifest.stale_ratio()); @@ -52,7 +52,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> { let vl_path = folder.path(); std::fs::create_dir_all(vl_path)?; - let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?; + let value_log = ValueLog::open(vl_path, Config::default())?; assert_eq!(0.0, value_log.manifest.stale_ratio()); assert_eq!(0.0, value_log.manifest.space_amp());