Skip to content

Commit

Permalink
prepare for v1
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Jun 2, 2024
1 parent c09084f commit 03656d1
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 48 deletions.
3 changes: 0 additions & 3 deletions benches/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ fn load_value(c: &mut Criterion) {
let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
)
.unwrap();

Expand Down Expand Up @@ -76,7 +75,6 @@ fn load_value(c: &mut Criterion) {
vl_path,
Config::default()
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
index.clone(),
)
.unwrap();

Expand Down Expand Up @@ -128,7 +126,6 @@ fn compression(c: &mut Criterion) {
let value_log = ValueLog::open(
vl_path,
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
index.clone(),
)
.unwrap();

Expand Down
7 changes: 6 additions & 1 deletion src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()
#[allow(clippy::module_name_repetitions)]
pub struct SegmentManifestInner {
path: PathBuf,
pub(crate) segments: RwLock<HashMap<SegmentId, Arc<Segment>>>,
pub segments: RwLock<HashMap<SegmentId, Arc<Segment>>>,
}

#[allow(clippy::module_name_repetitions)]
Expand Down Expand Up @@ -141,12 +141,16 @@ impl SegmentManifest {
}

pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
// TODO: atomic swap

let mut lock = self.segments.write().expect("lock is poisoned");
lock.retain(|x, _| !ids.contains(x));
Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
}

pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
// TODO: atomic swap

let mut lock = self.segments.write().expect("lock is poisoned");
let writers = writer.finish()?;

Expand All @@ -162,6 +166,7 @@ impl SegmentManifest {
stats: Stats {
item_count: writer.item_count.into(),
total_bytes: writer.written_blob_bytes.into(),
total_uncompressed_bytes: writer.uncompressed_bytes.into(),
stale_items: AtomicU64::default(),
stale_bytes: AtomicU64::default(),
},
Expand Down
2 changes: 1 addition & 1 deletion src/segment/merge.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{id::SegmentId, SegmentReader};
use std::cmp::Reverse;

// TODO: replace with MinHeap
// TODO: replace with MinHeap...
use min_max_heap::MinMaxHeap;

type IteratorIndex = usize;
Expand Down
6 changes: 0 additions & 6 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ use std::path::PathBuf;

/// A disk segment is an immutable, sorted, contiguous file
/// that contains key-value pairs.
///
/// ### File format
///
/// KV: \<key length: u16\> \<key: N\> \<crc hash: u32\> \<value length: u32\> \<value: N\>
///
/// Segment: { KV } +
#[derive(Debug)]
pub struct Segment {
/// Segment ID
Expand Down
10 changes: 8 additions & 2 deletions src/segment/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::sync::atomic::AtomicU64;
#[derive(Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Stats {
pub(crate) item_count: AtomicU64,
pub(crate) item_count: AtomicU64, // TODO: u64?
pub(crate) stale_items: AtomicU64,

pub total_bytes: AtomicU64,
pub total_uncompressed_bytes: AtomicU64, // TODO: u64?
pub total_bytes: AtomicU64, // TODO: u64?
pub(crate) stale_bytes: AtomicU64,
// TODO: key range
}
Expand All @@ -24,6 +25,11 @@ impl Stats {
self.item_count.load(std::sync::atomic::Ordering::Acquire)
}

pub fn total_uncompressed_bytes(&self) -> u64 {
self.total_uncompressed_bytes
.load(std::sync::atomic::Ordering::Acquire)
}

pub fn total_bytes(&self) -> u64 {
self.total_bytes.load(std::sync::atomic::Ordering::Acquire)
}
Expand Down
4 changes: 4 additions & 0 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Writer {
pub(crate) item_count: u64,

pub(crate) written_blob_bytes: u64,
pub(crate) uncompressed_bytes: u64,
}

impl Writer {
Expand All @@ -40,6 +41,7 @@ impl Writer {
offset: 0,
item_count: 0,
written_blob_bytes: 0,
uncompressed_bytes: 0,
})
}

Expand Down Expand Up @@ -71,6 +73,8 @@ impl Writer {
assert!(key.len() <= u16::MAX.into());
assert!(u32::try_from(value.len()).is_ok());

self.uncompressed_bytes += value.len() as u64;

#[cfg(feature = "lz4")]
let value = lz4_flex::compress_prepend_size(value);

Expand Down
38 changes: 14 additions & 24 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
path::absolute_path,
segment::merge::MergeReader,
version::Version,
Config, ExternalIndex, SegmentWriter, ValueHandle,
Config, SegmentWriter, ValueHandle,
};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
Expand All @@ -18,24 +18,25 @@ use std::{

/// A disk-resident value log
#[derive(Clone)]
pub struct ValueLog<I: ExternalIndex + Clone + Send + Sync>(Arc<ValueLogInner<I>>);
pub struct ValueLog(Arc<ValueLogInner>);

impl<I: ExternalIndex + Clone + Send + Sync> std::ops::Deref for ValueLog<I> {
type Target = ValueLogInner<I>;
impl std::ops::Deref for ValueLog {
type Target = ValueLogInner;

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[allow(clippy::module_name_repetitions)]
pub struct ValueLogInner<I: ExternalIndex + Clone + Send + Sync> {
pub struct ValueLogInner {
config: Config,

path: PathBuf,

// TODO: maybe not needed persistently...
/// External index
pub index: I,
// pub index: I,

/// In-memory blob cache
blob_cache: Arc<BlobCache>,
Expand All @@ -48,7 +49,7 @@ pub struct ValueLogInner<I: ExternalIndex + Clone + Send + Sync> {
rollover_guard: Mutex<()>,
}

impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
impl ValueLog {
/// Creates or recovers a value log in the given directory.
///
/// # Errors
Expand All @@ -57,23 +58,18 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
pub fn open<P: Into<PathBuf>>(
path: P, // TODO: move path into config?
config: Config,
index: I,
) -> crate::Result<Self> {
let path = path.into();

if path.join(VLOG_MARKER).try_exists()? {
Self::recover(path, config, index)
Self::recover(path, config)
} else {
Self::create_new(path, config, index)
Self::create_new(path, config)
}
}

/// Creates a new empty value log in a directory.
pub(crate) fn create_new<P: Into<PathBuf>>(
path: P,
config: Config,
index: I,
) -> crate::Result<Self> {
pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
let path = absolute_path(path.into());
log::trace!("Creating value-log at {}", path.display());

Expand Down Expand Up @@ -109,18 +105,13 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
config,
path,
blob_cache,
index,
manifest,
id_generator: IdGenerator::default(),
rollover_guard: Mutex::new(()),
})))
}

pub(crate) fn recover<P: Into<PathBuf>>(
path: P,
config: Config,
index: I,
) -> crate::Result<Self> {
pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
let path = path.into();
log::info!("Recovering value-log at {}", path.display());

Expand All @@ -143,7 +134,6 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
config,
path,
blob_cache,
index,
manifest,
// TODO: recover ID, test!!!, maybe store next ID in manifest as u64
id_generator: IdGenerator::default(),
Expand Down Expand Up @@ -215,7 +205,7 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
)?)
}

/// Scans through a segment, refreshing its statistics
/* /// Scans through a segment, refreshing its statistics
///
/// This function is blocking.
///
Expand Down Expand Up @@ -277,7 +267,7 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
// TODO: changing stats doesn't happen **too** often, so the I/O is fine
Ok(())
}
} */

/// Drops stale segments
pub fn drop_stale_segments(&self) -> crate::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion tests/basic_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn basic_gc() -> value_log::Result<()> {

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

{
let items = ["a", "b", "c", "d", "e"];
Expand Down
2 changes: 1 addition & 1 deletion tests/basic_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn basic_kv() -> value_log::Result<()> {

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

let items = ["a", "b", "c", "d", "e"];

Expand Down
10 changes: 4 additions & 6 deletions tests/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn basic_recovery() -> value_log::Result<()> {
let items = ["a", "b", "c", "d", "e"];

{
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

{
let mut writer = value_log.get_writer()?;
Expand Down Expand Up @@ -47,7 +47,7 @@ fn basic_recovery() -> value_log::Result<()> {
}

{
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

// TODO: should be recovered
for id in value_log.manifest.list_segment_ids() {
Expand Down Expand Up @@ -76,8 +76,6 @@ fn basic_recovery() -> value_log::Result<()> {
fn delete_unfinished_segment_folders() -> value_log::Result<()> {
let folder = tempfile::tempdir()?;

let index = MockIndex::default();

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;

Expand All @@ -86,12 +84,12 @@ fn delete_unfinished_segment_folders() -> value_log::Result<()> {
assert!(mock_path.try_exists()?);

{
let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let _value_log = ValueLog::open(vl_path, Config::default())?;
assert!(mock_path.try_exists()?);
}

{
let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let _value_log = ValueLog::open(vl_path, Config::default())?;
assert!(!mock_path.try_exists()?);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/rollover_index_fail_finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> {

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

let items = ["a", "b", "c", "d", "e"];

Expand Down
4 changes: 2 additions & 2 deletions tests/space_amp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn worst_case_space_amp() -> value_log::Result<()> {

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

assert_eq!(0.0, value_log.manifest.space_amp());
assert_eq!(0.0, value_log.manifest.stale_ratio());
Expand Down Expand Up @@ -52,7 +52,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> {

let vl_path = folder.path();
std::fs::create_dir_all(vl_path)?;
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
let value_log = ValueLog::open(vl_path, Config::default())?;

assert_eq!(0.0, value_log.manifest.stale_ratio());
assert_eq!(0.0, value_log.manifest.space_amp());
Expand Down

0 comments on commit 03656d1

Please sign in to comment.