Skip to content

Commit

Permalink
Improve WAL shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
kaimast committed Jul 25, 2024
1 parent 88ab8cc commit 1e38606
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 135 deletions.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.1:
- Fix an error during compaction
- Ensure everything has been written to the write-ahead log before shutting down

0.4.0:
- Move sync API into a separate lsm-sync crate
Expand Down
1 change: 1 addition & 0 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl Database {

/// Stop all background tasks gracefully
pub async fn stop(&self) -> Result<(), Error> {
self.inner.stop().await?;
self.tasks.stop_all().await
}
}
Expand Down
123 changes: 101 additions & 22 deletions src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::values::{ValueLog, ValueRef};

use crate::data_blocks::DataEntry;

#[derive(Debug, PartialEq, Eq)]
enum CompactResult {
NothingToDo,
DidWork,
Expand Down Expand Up @@ -615,7 +616,10 @@ impl DbLogic {
}

/// Do compaction if necessary
/// Returns true if any work was done
///
/// Returns true if we should try again. This can happen for two reasons:
/// 1. Compaction succeded and there might be more to compact
/// 2. Compaction failed due to locks and we should try to grab the locks again
#[tracing::instrument(skip(self))]
pub async fn do_level_compaction(&self) -> Result<bool, Error> {
let mut was_locked = false;
Expand Down Expand Up @@ -943,11 +947,15 @@ impl DbLogic {
log::trace!("Done compacting tables");
Ok(CompactResult::DidWork)
}

pub async fn stop(&self) -> Result<(), Error> {
self.wal.stop().await
}
}

#[cfg(all(test, not(feature = "wisckey")))]
mod tests {
use tempfile::tempdir;
use tempfile::TempDir;

#[cfg(feature = "async-io")]
use tokio_uring_executor::test as async_test;
Expand All @@ -958,23 +966,43 @@ mod tests {
use crate::params::Params;
use crate::StartMode;

use super::DbLogic;
use super::{CompactResult, DbLogic};

async fn test_init() -> (TempDir, DbLogic) {
let _ = env_logger::builder().is_test(true).try_init();

let tmpdir = tempfile::Builder::new()
.prefix("lsm-logic-test-")
.tempdir()
.unwrap();

// Test that compaction works as expected
//
// Note: This test makes some assumptions about the inner workings of
// DbLogic and might need to be adjusted with future changes
#[async_test]
async fn compaction() {
let dir = tempdir().unwrap();
let params = Params {
db_path: dir.path().to_path_buf(),
db_path: tmpdir.path().to_path_buf(),
..Default::default()
};

let logic = DbLogic::new(StartMode::CreateOrOverride, params)
.await
.unwrap();

(tmpdir, logic)
}

async fn test_cleanup(tmpdir: TempDir, logic: DbLogic) {
logic.stop().await.unwrap();

drop(logic);
drop(tmpdir);
}

// Test that compaction works as expected
//
// Note: This test makes some assumptions about the inner workings of
// DbLogic and might need to be adjusted with future changes
#[async_test]
async fn compaction() {
let (tempdir, logic) = test_init().await;

let num_tables = 5;

// Create five tables with the exact same key entries
Expand All @@ -990,7 +1018,7 @@ mod tests {

for num in 0..=100 {
let key = format!("{num:03}").into_bytes();
let value = format!("somevalue").into_bytes();
let value = "somevalue".to_string().into_bytes();
let seq_number = seq_offset;
seq_offset += 1;

Expand Down Expand Up @@ -1022,6 +1050,8 @@ mod tests {
assert!(logic.manifest.get_tables().await[0].is_empty());
assert_eq!(logic.levels[1].get_tables_ro().await.len(), 1);
assert_eq!(logic.manifest.get_tables().await[1].len(), 1);

test_cleanup(tempdir, logic).await;
}

// Test that fast compaction (simply moving a table down) works as expected
Expand All @@ -1030,15 +1060,8 @@ mod tests {
// DbLogic and might need to be adjusted with future changes
#[async_test]
async fn fast_compaction() {
let dir = tempdir().unwrap();
let params = Params {
db_path: dir.path().to_path_buf(),
..Default::default()
};
let (tempdir, logic) = test_init().await;

let logic = DbLogic::new(StartMode::CreateOrOverride, params)
.await
.unwrap();
let num_tables = 10;

// Create five tables with the exact same key entries
Expand All @@ -1057,7 +1080,7 @@ mod tests {

for num in pos..next_pos {
let key = format!("{num:04}").into_bytes();
let value = format!("somevalue").into_bytes();
let value = "somevalue".to_string().into_bytes();
let seq_number = seq_offset;
seq_offset += 1;

Expand Down Expand Up @@ -1094,9 +1117,65 @@ mod tests {
let did_work = logic.do_level_compaction().await.unwrap();
assert!(did_work);

assert_eq!(logic.levels[0].get_tables_ro().await.len(), num_tables - 2);
assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables - 2);
assert_eq!(logic.levels[1].get_tables_ro().await.len(), 2);
assert_eq!(logic.manifest.get_tables().await[1].len(), 2);

test_cleanup(tempdir, logic).await;
}

// Test that no compaction happens if tables are already marked with a compaction flag
#[async_test]
async fn compaction_flag() {
let (tempdir, logic) = test_init().await;

let num_tables = 5;

// Create five tables with the exact same key entries
for _ in 0..num_tables {
let l0 = logic.levels.first().unwrap();
let table_id = logic.manifest.next_table_id().await;

let min_key = "000".to_string().into_bytes();
let max_key = "100".to_string().into_bytes();

let mut table_builder = l0.build_table(table_id, min_key, max_key);
let mut seq_offset = 1;

for num in 0..=100 {
let key = format!("{num:03}").into_bytes();
let value = "somevalue".to_string().into_bytes();
let seq_number = seq_offset;
seq_offset += 1;

table_builder
.add_value(&key, seq_number, &value)
.await
.unwrap();
}

let table = table_builder.finish().await.unwrap();
let table_id = table.get_id();

let could_set_flag = table.maybe_start_compaction();
assert!(could_set_flag);

l0.add_l0_table(table).await;

// Then update manifest and flush WAL
logic
.manifest
.update_table_set(vec![(0, table_id)], vec![])
.await;
}

assert_eq!(logic.levels[0].get_tables_ro().await.len(), num_tables);
assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables);
assert!(logic.manifest.get_tables().await[1].is_empty());

let result = logic.compact_level(0, &logic.levels[0]).await.unwrap();
assert_eq!(result, CompactResult::Locked);

test_cleanup(tempdir, logic).await;
}
}
Loading

0 comments on commit 1e38606

Please sign in to comment.