Skip to content

Commit

Permalink
minor adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Mar 19, 2024
1 parent a9c26c5 commit 35db09b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 31 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

Generic value log implementation for key-value separated storage, inspired by RocksDB's BlobDB [[1]](#footnotes) and implemented in safe, stable Rust.

> This crate is intended for key-value separated LSM storage.
> This crate is intended as a building block for key-value separated LSM storage.
> You probably want to use https://github.com/fjall-rs/fjall instead.
## Features

- Thread-safe API
- 100% safe & stable Rust
- Supports generic index structures (LSM-tree, ...)
- Supports generic KV-index structures (LSM-tree, ...)
- Built-in per-blob compression (LZ4)
- In-memory blob cache for hot data

Expand All @@ -25,6 +25,14 @@ All source code is licensed under MIT OR Apache-2.0.

All contributions are to be licensed as MIT OR Apache-2.0.

## Development

### Run benchmarks

```bash
cargo bench --features bloom
```

## Footnotes

[1] https://github.com/facebook/rocksdb/wiki/BlobDB
3 changes: 1 addition & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ impl Default for Config {
impl Config {
/// Sets the blob cache.
///
/// Defaults to a blob cache 16 MiB of capacity shared
/// between all partitions inside this keyspace.
/// Defaults to a blob cache with 16 MiB of capacity.
#[must_use]
pub fn blob_cache(mut self, blob_cache: Arc<BlobCache>) -> Self {
self.blob_cache = blob_cache;
Expand Down
12 changes: 5 additions & 7 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::ValueHandle;
///
/// An index should point into the value log using [`ValueHandle`].
pub trait Index {
/// Returns a value habdle for a given key.
/// Returns a value handle for a given key.
///
/// This method is used to index back into the index to check for
/// stale values when scanning through the value log's segments.
Expand All @@ -27,11 +27,11 @@ pub trait Index {
fn insert_indirection(&self, key: &[u8], value: ValueHandle) -> std::io::Result<()>;
}

/// Trait that allows writing into an index
/// Trait that allows writing into an external index
///
/// The write process should be atomic.
pub trait Writer {
/// Inserts an value handle into the index.
///
/// This method is called during value log garbage collection.
/// Inserts a value handle into the index.
///
/// # Errors
///
Expand All @@ -40,8 +40,6 @@ pub trait Writer {

/// Finishes the write batch.
///
/// This operation should be atomic.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
Expand Down
66 changes: 65 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn main() -> value_log::Result<()> {
let mut writer = value_log.get_writer()?;
let segment_id = writer.segment_id();

for key in ["a", "b", "c", "d", "e"] {
for key in ["a", "b", "c", "d", "e", "html"] {
let offset = writer.offset(key.as_bytes());

index.insert_indirection(
Expand Down Expand Up @@ -113,6 +113,52 @@ fn main() -> value_log::Result<()> {
value_log.register(writer)?;
}

{
let mut writer = value_log.get_writer()?;
let segment_id = writer.segment_id();

let key = "html";
let offset = writer.offset(key.as_bytes());

index.insert_indirection(
key.as_bytes(),
ValueHandle {
offset,
segment_id: segment_id.clone(),
},
)?;

writer.write(
key.as_bytes(),
b"
<html>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-0\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-1\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-2\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-3\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-4\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-5\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-6\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-7\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-8\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-9\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-10\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-11\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-12\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-13\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-14\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-15\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-16\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-17\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-18\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-19\">Hello <i>World</i></div>
<div data-hk=\"0-0-0-0-0-0-0-0-0-0-0-0-0-0-20\">Hello <i>World</i></div>
</html>",
)?;

value_log.register(writer)?;
}

/* {
let mut writer = value_log.get_writer()?;
let segment_id = writer.segment_id();
Expand Down Expand Up @@ -241,7 +287,25 @@ fn main() -> value_log::Result<()> {
eprintln!("=== rollover ===");
value_log.rollover(&value_log.list_segments(), DebugIndexWriter(index.clone()))?; */

eprintln!("=== before refresh ===");
eprintln!("{:#?}", value_log.segments.read().unwrap());
eprintln!(
"space amp: {}, wasted bytes: {}",
value_log.space_amp(),
value_log.reclaimable_bytes()
);

for id in value_log.segments.read().unwrap().keys() {
value_log.refresh_stats(id)?;
}

eprintln!("=== after refresh ===");
eprintln!("{:#?}", value_log.segments.read().unwrap());
eprintln!(
"space amp: {}, wasted bytes: {}",
value_log.space_amp(),
value_log.reclaimable_bytes()
);

let handle = index.get(b"html")?.unwrap();
eprintln!(
Expand Down
40 changes: 21 additions & 19 deletions src/value_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
};

/// A disk-resident value log.
/// A disk-resident value log
#[derive(Clone)]
pub struct ValueLog(Arc<ValueLogInner>);

Expand All @@ -41,7 +41,7 @@ pub struct ValueLogInner {
/// Segment manifest
pub segments: RwLock<BTreeMap<Arc<str>, Arc<Segment>>>,

semaphore: Mutex<()>,
rollover_guard: Mutex<()>,
}

impl ValueLog {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl ValueLog {
blob_cache,
index,
segments: RwLock::new(BTreeMap::default()),
semaphore: Mutex::new(()),
rollover_guard: Mutex::new(()),
})))
}

Expand Down Expand Up @@ -197,13 +197,6 @@ impl ValueLog {
Ok(Some(val))
}

/* pub fn get_multiple(
&self,
handles: &[ValueHandle],
) -> crate::Result<Vec<Option<Vec<u8>>>> {
handles.iter().map(|vr| self.get(vr)).collect()
} */

/// Initializes a new segment writer
///
/// # Errors
Expand Down Expand Up @@ -248,21 +241,31 @@ impl ValueLog {
Ok(())
}

/// Returns the amount of bytes on disk that are occupied by blobs.
///
/// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`].
#[must_use]
pub fn disk_space_used(&self) -> u64 {
self.segments
.read()
.expect("lock is poisoned")
.values()
.map(|x| x.stats.total_bytes)
.sum::<u64>()
}

/// Returns the amount of bytes that can be freed on disk
/// if all segments were to be defragmented
///
/// This value may not be fresh, as it is only set after running [`ValueLog::refresh_stats`].
#[must_use]
pub fn reclaimable_bytes(&self) -> u64 {
let segments = self.segments.read().expect("lock is poisoned");

let dead_bytes = segments
self.segments
.read()
.expect("lock is poisoned")
.values()
.map(|x| x.stats.get_dead_bytes())
.sum::<u64>();
drop(segments);

dead_bytes
.sum::<u64>()
}

/// Returns the percent of dead bytes in the value log
Expand Down Expand Up @@ -383,7 +386,7 @@ impl ValueLog {
index_writer: &W,
) -> crate::Result<()> {
// IMPORTANT: Only allow 1 rollover at any given time
let _guard = self.semaphore.lock().expect("lock is poisoned");
let _guard = self.rollover_guard.lock().expect("lock is poisoned");

let lock = self.segments.read().expect("lock is poisoned");

Expand All @@ -409,7 +412,6 @@ impl ValueLog {

for item in reader {
let (k, v, _) = item?;
eprintln!("{k:?} => {:?}", String::from_utf8_lossy(&v));

let segment_id = writer.segment_id();
let offset = writer.offset(&k);
Expand Down

0 comments on commit 35db09b

Please sign in to comment.