diff --git a/src/level.rs b/src/level.rs index ff3e64f..f6bb967 100644 --- a/src/level.rs +++ b/src/level.rs @@ -60,6 +60,12 @@ impl Level { } } + /// Set where to (try to) compact next + /// (only used for testing) + pub fn set_next_compaction_offset(&self, offset: usize) { + *self.next_compaction_offset.lock() = offset; + } + pub async fn remove_table_placeholder(&self, id: TableId) { let mut placeholders = self.table_placeholders.write().await; for (pos, placeholder) in placeholders.iter().enumerate() { @@ -92,7 +98,12 @@ impl Level { ) } + pub fn get_index(&self) -> u32 { + self.index + } + pub async fn add_l0_table(&self, table: SortedTable) { + assert_eq!(self.index, 0); let mut tables = self.tables.write().await; tables.push(Arc::new(table)); } @@ -158,7 +169,7 @@ impl Level { #[tracing::instrument(skip(self))] pub async fn maybe_start_compaction(&self) -> Result>>, ()> { log::trace!("Checking if we should compact level"); - let all_tables = self.tables.write().await; + let all_tables = self.tables.read().await; let (table, offset) = 'choice: { let mut next_offset = self.next_compaction_offset.lock(); @@ -238,14 +249,22 @@ impl Level { continue; } - if table.overlaps(&min, &max) && table.maybe_start_compaction() { - min = std::cmp::min(&min[..], table.get_min()).to_vec(); - max = std::cmp::max(&max[..], table.get_max()).to_vec(); + if table.overlaps(&min, &max) { + if table.maybe_start_compaction() { + min = std::cmp::min(&min[..], table.get_min()).to_vec(); + max = std::cmp::max(&max[..], table.get_max()).to_vec(); - offsets.push(pos); - tables.push(table.clone()); - change = true; - break; + offsets.push(pos); + tables.push(table.clone()); + change = true; + break; + } else { + // Lock contention! + for table in tables { + table.abort_compaction(); + } + return Err(()); + } } } } diff --git a/src/logic.rs b/src/logic.rs index 6f25753..653ca8a 100644 --- a/src/logic.rs +++ b/src/logic.rs @@ -16,7 +16,7 @@ use crate::manifest::{LevelId, Manifest}; use crate::memtable::{ ImmMemtableRef, Memtable, MemtableEntry, MemtableEntryRef, MemtableIterator, MemtableRef, }; -use crate::sorted_table::{InternalIterator, Key, TableIterator}; +use crate::sorted_table::{InternalIterator, Key, TableId, TableIterator}; use crate::wal::WriteAheadLog; use crate::{Error, Params, StartMode, WriteBatch, WriteOp, WriteOptions}; @@ -629,7 +629,10 @@ impl DbLogic { for (level_pos, level) in self.levels.iter().enumerate() { // Last level cannot be compacted if level_pos < self.params.num_levels - 1 { - match self.compact_level(level_pos as LevelId, level).await? { + match self + .compact_level(level, &self.levels[level_pos + 1]) + .await? + { CompactResult::DidWork => { log::trace!("Compacted level {level_pos}"); return Ok(true); @@ -655,20 +658,22 @@ impl DbLogic { /// 1. A "fast" compaction where a table simply gets moved down one level /// 2. Regular compaction where one or multiple tables get merged with a table on a level below /// 3. Abort due to concurrency - #[tracing::instrument(skip(self, level))] + #[tracing::instrument(skip(self, parent_level, child_level))] async fn compact_level( &self, - level_pos: LevelId, - level: &Level, + parent_level: &Level, + child_level: &Level, ) -> Result { - let parent_tables = match level.maybe_start_compaction().await { + assert_eq!(parent_level.get_index()+1, child_level.get_index()); + + let parent_tables = match parent_level.maybe_start_compaction().await { Ok(Some(result)) => result, Ok(None) => return Ok(CompactResult::NothingToDo), Err(()) => return Ok(CompactResult::Locked), }; assert!(!parent_tables.is_empty()); - log::trace!("Starting compaction on level {level_pos}"); + log::trace!("Starting compaction on level {}", parent_level.get_index()); let mut min = parent_tables[0].get_min(); let mut max = parent_tables[0].get_max(); @@ -680,9 +685,6 @@ impl DbLogic { } } - let parent_level = &self.levels[level_pos as usize]; - let child_level = &self.levels[(level_pos + 1) as usize]; - let overlap_result = if parent_tables.len() == 1 { child_level .get_overlaps(min, max, Some(parent_tables[0].get_id())) @@ -702,72 +704,18 @@ impl DbLogic { // Fast path if parent_tables.len() == 1 && child_tables.is_empty() { - let mut all_parent_tables = level.get_tables().await; - let mut all_child_tables = child_level.get_tables().await; - assert_eq!(parent_tables[0].get_id(), table_id); - - // Remove table entry from parent level - let table = { - let mut iter = all_parent_tables.iter().enumerate(); - - loop { - let (pos, other_table) = iter.next().expect("Entry for parent table not found"); - if other_table.get_id() == table_id { - break all_parent_tables.remove(pos); - } - } - }; - - log::debug!( - "Moving table #{} from level {} to level {}", - table_id, - level_pos, - level_pos + 1 - ); - - // Figure out where to place the table on the child lavel - let mut new_pos = 0; - for (pos, other_table) in all_child_tables.iter().enumerate() { - if other_table.get_min() > table.get_min() { - new_pos = pos; - break; - } - } - - // Add table to child level - all_child_tables.insert(new_pos, table.clone()); - child_level.remove_table_placeholder(table_id).await; - - for (pos, other_table) in all_parent_tables.iter().enumerate() { - if table.get_id() == other_table.get_id() { - all_parent_tables.remove(pos); - break; - } - } - - // Update manifest - let add_set = vec![(level_pos + 1, table.get_id())]; - let remove_set = vec![(level_pos, table.get_id())]; - self.manifest.update_table_set(add_set, remove_set).await; - - if let Some(logger) = &self.level_logger { - logger.compaction(level_pos, 1, 1); - } - - // Unlock table - table.finish_fast_compaction(); - - log::trace!("Done moving table #{table_id}"); + self.fast_compaction(parent_level, child_level, table_id) + .await; return Ok(CompactResult::DidWork); } log::debug!( "Compacting {} table(s) in level {} with {} table(s) in level {} into table #{table_id}", parent_tables.len(), - level_pos, + parent_level.get_index(), child_tables.len(), - level_pos + 1 + child_level.get_index(), ); for table in child_tables.iter() { @@ -886,7 +834,7 @@ impl DbLogic { let new_table = table_builder.finish().await?; - let add_set = vec![(level_pos + 1, new_table.get_id())]; + let add_set = vec![(child_level.get_index(), new_table.get_id())]; let mut remove_set = vec![]; // Install new tables atomically @@ -898,7 +846,7 @@ impl DbLogic { let mut found = false; for (pos, other_table) in all_child_tables.iter().enumerate() { if other_table.get_id() == table.get_id() { - remove_set.push((level_pos, table.get_id())); + remove_set.push((parent_level.get_index(), table.get_id())); all_child_tables.remove(pos); found = true; break; @@ -907,7 +855,8 @@ impl DbLogic { assert!(found); } - let mut new_pos = all_child_tables.len(); // insert at the end by default + // Find position for new child table + let mut new_pos = all_child_tables.len(); for (pos, other_table) in all_child_tables.iter().enumerate() { if other_table.get_min() > new_table.get_min() { new_pos = pos; @@ -924,7 +873,7 @@ impl DbLogic { let mut found = false; for (pos, other_table) in all_parent_tables.iter().enumerate() { if other_table.get_id() == table.get_id() { - remove_set.push((level_pos, table.get_id())); + remove_set.push((parent_level.get_index(), table.get_id())); all_parent_tables.remove(pos); found = true; break; @@ -939,7 +888,7 @@ impl DbLogic { } if let Some(logger) = &self.level_logger { - logger.compaction(level_pos, add_set.len(), remove_set.len()); + logger.compaction(parent_level.get_index(), add_set.len(), remove_set.len()); } self.manifest.update_table_set(add_set, remove_set).await; @@ -951,6 +900,64 @@ impl DbLogic { pub async fn stop(&self) -> Result<(), Error> { self.wal.stop().await } + + async fn fast_compaction(&self, parent_level: &Level, child_level: &Level, table_id: TableId) { + let mut all_parent_tables = parent_level.get_tables().await; + let mut all_child_tables = child_level.get_tables().await; + + // Remove table entry from parent level + let table = { + let mut iter = all_parent_tables.iter().enumerate(); + + loop { + let (pos, other_table) = iter.next().expect("Entry for parent table not found"); + if other_table.get_id() == table_id { + break all_parent_tables.remove(pos); + } + } + }; + + log::debug!( + "Moving table #{} from level {} to level {}", + table_id, + parent_level.get_index(), + child_level.get_index(), + ); + + // Figure out where to place the table on the child lavel + let mut new_pos = 0; + for (pos, other_table) in all_child_tables.iter().enumerate() { + if other_table.get_min() > table.get_min() { + new_pos = pos; + break; + } + } + + // Add table to child level + all_child_tables.insert(new_pos, table.clone()); + child_level.remove_table_placeholder(table_id).await; + + for (pos, other_table) in all_parent_tables.iter().enumerate() { + if table.get_id() == other_table.get_id() { + all_parent_tables.remove(pos); + break; + } + } + + // Update manifest + let add_set = vec![(child_level.get_index(), table.get_id())]; + let remove_set = vec![(parent_level.get_index(), table.get_id())]; + self.manifest.update_table_set(add_set, remove_set).await; + + if let Some(logger) = &self.level_logger { + logger.compaction(parent_level.get_index(), 1, 1); + } + + // Unlock table + table.finish_fast_compaction(); + + log::trace!("Done moving table #{table_id}"); + } } #[cfg(all(test, not(feature = "wisckey")))] @@ -1121,6 +1128,13 @@ mod tests { assert_eq!(logic.levels[1].get_tables_ro().await.len(), 2); assert_eq!(logic.manifest.get_tables().await[1].len(), 2); + // Ensure no tables exist on both levels + for table0 in logic.levels[0].get_tables_ro().await.iter() { + for table1 in logic.levels[1].get_tables_ro().await.iter() { + assert_ne!(table0.get_id(), table1.get_id()); + } + } + test_cleanup(tempdir, logic).await; } @@ -1173,9 +1187,85 @@ mod tests { assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables); assert!(logic.manifest.get_tables().await[1].is_empty()); - let result = logic.compact_level(0, &logic.levels[0]).await.unwrap(); + let result = logic.compact_level(&logic.levels[0], &logic.levels[1]).await.unwrap(); assert_eq!(result, CompactResult::Locked); test_cleanup(tempdir, logic).await; } + + #[async_test] + async fn fast_compaction_with_offset() { + let (tempdir, logic) = test_init().await; + + let num_tables = 10; + + // Create five tables with the exact same key entries + for idx in 0..num_tables { + let l0 = logic.levels.first().unwrap(); + let table_id = logic.manifest.next_table_id().await; + + let pos = idx * 100; + let next_pos = (idx + 1) * 100 - 1; + + let min_key = format!("{pos:04}").into_bytes(); + let max_key = format!("{next_pos:04}").into_bytes(); + + let mut table_builder = l0.build_table(table_id, min_key, max_key); + let mut seq_offset = 1; + + for num in pos..next_pos { + let key = format!("{num:04}").into_bytes(); + let value = "somevalue".to_string().into_bytes(); + let seq_number = seq_offset; + seq_offset += 1; + + table_builder + .add_value(&key, seq_number, &value) + .await + .unwrap(); + } + + let table = table_builder.finish().await.unwrap(); + let table_id = table.get_id(); + l0.add_l0_table(table).await; + + // Then update manifest and flush WAL + logic + .manifest + .update_table_set(vec![(0, table_id)], vec![]) + .await; + } + + // Check that compaction works fine if it is not the first table that gets pushed down + logic.levels[0].set_next_compaction_offset(3); + + assert_eq!(logic.levels[0].get_tables_ro().await.len(), num_tables); + assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables); + assert!(logic.manifest.get_tables().await[1].is_empty()); + + let did_work = logic.do_level_compaction().await.unwrap(); + assert!(did_work); + + // One table should have moved down + assert_eq!(logic.levels[0].get_tables_ro().await.len(), num_tables - 1); + assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables - 1); + assert_eq!(logic.levels[1].get_tables_ro().await.len(), 1); + assert_eq!(logic.manifest.get_tables().await[1].len(), 1); + + let did_work = logic.do_level_compaction().await.unwrap(); + assert!(did_work); + + assert_eq!(logic.manifest.get_tables().await[0].len(), num_tables - 2); + assert_eq!(logic.levels[1].get_tables_ro().await.len(), 2); + assert_eq!(logic.manifest.get_tables().await[1].len(), 2); + + // Ensure no tables exist on both levels + for table0 in logic.levels[0].get_tables_ro().await.iter() { + for table1 in logic.levels[1].get_tables_ro().await.iter() { + assert_ne!(table0.get_id(), table1.get_id()); + } + } + + test_cleanup(tempdir, logic).await; + } }