Skip to content

Commit

Permalink
Add tests for manifest and db logic (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai Mast authored Jul 25, 2024
1 parent e50fa73 commit f6cee16
Show file tree
Hide file tree
Showing 10 changed files with 870 additions and 241 deletions.
12 changes: 8 additions & 4 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
0.4:
0.4.1:
- Fix an error during compaction
- Ensure everything has been written to the write-ahead log before shutting down

0.4.0:
- Move sync API into a separate lsm-sync crate
- Removed KvTrait. The crate now only accept and returns bytes
- Get operations now return a reference to the data without copying
- Leverage zerocopy wherever possible to reduce serialization cost
- Update tokio-uring and kioto-uring-executor dependencies

0.3:
0.3.0:
- Write-Ahead logging moved to a dedicated thread (or async task)
- Support for io_uring
- Allow iterating in reverse order
- Add bloom filter support
- Various performance improvements
- Use tokio-condvar in more places

0.2:
0.2.0:
- Support for tracing to benchmark the library
- Removed custom Condition Variable implementation
- Databases can be reopened
Expand All @@ -22,5 +26,5 @@
- Implemented proper garbage collection for the value log
- The Write-Ahead-Log is properly truncated after writes are flushed to L0

0.1:
0.1.0:
- Basic key-value store functionality
1 change: 1 addition & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl Database {

/// Stop all background tasks gracefully
pub async fn stop(&self) -> Result<(), Error> {
self.inner.stop().await?;
self.tasks.stop_all().await
}
}
Expand Down
71 changes: 54 additions & 17 deletions src/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ impl Level {
}
}

/// Set where to (try to) compact next
/// (only used for testing)
pub fn set_next_compaction_offset(&self, offset: usize) {
*self.next_compaction_offset.lock() = offset;
}

/// Table placeholder must be removed once compaction is done
pub async fn remove_table_placeholder(&self, id: TableId) {
let mut placeholders = self.table_placeholders.write().await;
for (pos, placeholder) in placeholders.iter().enumerate() {
Expand Down Expand Up @@ -92,7 +99,12 @@ impl Level {
)
}

pub fn get_index(&self) -> u32 {
self.index
}

pub async fn add_l0_table(&self, table: SortedTable) {
assert_eq!(self.index, 0);
let mut tables = self.tables.write().await;
tables.push(Arc::new(table));
}
Expand Down Expand Up @@ -153,10 +165,12 @@ impl Level {
result
}

/// Checks if any compaction can be done, and if so returns a list of tables to
/// be compacted
#[tracing::instrument(skip(self))]
pub async fn maybe_start_compaction(&self) -> Result<Option<Vec<Arc<SortedTable>>>, ()> {
log::trace!("Checking if we should compact level");
let all_tables = self.tables.write().await;
let all_tables = self.tables.read().await;

let (table, offset) = 'choice: {
let mut next_offset = self.next_compaction_offset.lock();
Expand Down Expand Up @@ -206,7 +220,8 @@ impl Level {
}
};

// Abort due to concurrency?
// Try to set the compaction flag
// otherwise, we abort (due to concurrency)
if !table.maybe_start_compaction() {
return Err(());
}
Expand Down Expand Up @@ -236,14 +251,22 @@ impl Level {
continue;
}

if table.overlaps(&min, &max) && table.maybe_start_compaction() {
min = std::cmp::min(&min[..], table.get_min()).to_vec();
max = std::cmp::max(&max[..], table.get_max()).to_vec();
if table.overlaps(&min, &max) {
if table.maybe_start_compaction() {
min = std::cmp::min(&min[..], table.get_min()).to_vec();
max = std::cmp::max(&max[..], table.get_max()).to_vec();

offsets.push(pos);
tables.push(table.clone());
change = true;
break;
offsets.push(pos);
tables.push(table.clone());
change = true;
break;
} else {
// Lock contention!
for table in tables {
table.abort_compaction();
}
return Err(());
}
}
}
}
Expand All @@ -252,14 +275,25 @@ impl Level {
Ok(Some(tables))
}

/// This is called on the target level at the beginning of compaction and does three things
///
/// 1. It checks for any tables that overlap and need to be compacted as well
/// 2. It will place a marker(lock) to prevent any concurrent compaction on the same range
/// 3. It checks for placeholders and aborts compaction if any are found
///
/// On success this returns the TableId of the placeholder
/// This id then must be used to creat on the lower level
///
/// Note, if fast_path is set, and no overlaps exist, the supplied id will be used for the
/// placeholder
#[tracing::instrument(skip(self))]
pub async fn get_overlaps(
&self,
min: &[u8],
max: &[u8],
fast_path: Option<TableId>,
) -> Option<(TableId, Vec<Arc<SortedTable>>)> {
let mut overlaps: Vec<Arc<SortedTable>> = Vec::new();
let mut tables_to_compact: Vec<Arc<SortedTable>> = Vec::new();
let tables = self.tables.read().await;

let mut min = min;
Expand All @@ -269,29 +303,32 @@ impl Level {
if table.overlaps(min, max) {
if !table.maybe_start_compaction() {
// Abort
for table in overlaps.into_iter() {
table.finish_compaction();
for table in tables_to_compact.into_iter() {
table.abort_compaction();
}
return None;
}

overlaps.push(table.clone());
tables_to_compact.push(table.clone());
min = table.get_min().min(min);
max = table.get_max().max(max);
}
}

// set placeholder to avoid race conditions
// and abort if one exists
let mut placeholders = self.table_placeholders.write().await;

for placeholder in placeholders.iter() {
if placeholder.overlaps(min, max) {
for table in tables_to_compact {
table.abort_compaction();
}
return None;
}
}

let table_id = if let Some(table_id) = fast_path
&& overlaps.is_empty()
&& tables_to_compact.is_empty()
{
table_id
} else {
Expand All @@ -304,12 +341,12 @@ impl Level {
max: max.to_vec(),
});

Some((table_id, overlaps))
Some((table_id, tables_to_compact))
}

/// Get a reference to all tables with an exclusive/write lock
#[inline]
pub async fn get_tables(&self) -> tokio::sync::RwLockWriteGuard<'_, TableVec> {
pub async fn get_tables_rw(&self) -> tokio::sync::RwLockWriteGuard<'_, TableVec> {
self.tables.write().await
}

Expand Down
Loading

0 comments on commit f6cee16

Please sign in to comment.