Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SST deletion #95

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] }
num-traits = "0.2.19"
maplit = "1.0.2"
rocksdb = { version = "0.22" }
atomic_enum = "0.3.0"

[dev-dependencies]
tempfile = "3"
Expand Down
16 changes: 16 additions & 0 deletions src/persistent/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::persistent::SstHandle;
use anyhow::anyhow;

impl SstHandle for () {
async fn read(&self, _offset: u64, _len: usize) -> anyhow::Result<Vec<u8>> {
Err(anyhow!("unimplemented"))
}

fn size(&self) -> u64 {
unreachable!()
}

async fn delete(&self) -> anyhow::Result<()> {
Ok(())
}
}
30 changes: 20 additions & 10 deletions src/persistent/file_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl LocalFs {
Self { dir: dir.into() }
}

fn build_sst_path(&self, id: usize) -> PathBuf {
pub fn build_sst_path(&self, id: usize) -> PathBuf {
self.dir.join(format!("{}.sst", id))
}

Expand All @@ -47,14 +47,18 @@ impl Persistent for LocalFs {
async fn create_sst(&self, id: usize, data: Vec<u8>) -> anyhow::Result<Self::SstHandle> {
let size = data.len().try_into()?;
let path = self.build_sst_path(id);
let file = spawn_blocking(move || {
std::fs::write(&path, &data)?;
File::open(&path)?.sync_all()?;
let file = File::options().read(true).append(true).open(&path)?;
Ok::<_, anyhow::Error>(Arc::new(file))
})
.await??;
let handle = FileObject { file, size };
let file = {
let path = path.clone();
spawn_blocking(move || {
// todo: avoid clone
std::fs::write(&path, &data)?;
File::open(&path)?.sync_all()?;
let file = File::options().read(true).append(true).open(&path)?;
Ok::<_, anyhow::Error>(Arc::new(file))
})
.await??
};
let handle = FileObject { file, size, path };
Ok(handle)
}

Expand All @@ -68,7 +72,7 @@ impl Persistent for LocalFs {
.with_context(|| format!("id: {}, path: {:?}", id, &path))?;
let file = Arc::new(file);
let size = file.metadata()?.len();
let handle = FileObject { file, size };
let handle = FileObject { file, size, path };
Ok::<_, anyhow::Error>(handle)
})
.await??;
Expand Down Expand Up @@ -110,6 +114,7 @@ impl Persistent for LocalFs {
/// A file object.
pub struct FileObject {
file: Arc<File>,
path: PathBuf,
size: u64,
}

Expand All @@ -129,6 +134,11 @@ impl SstHandle for FileObject {
fn size(&self) -> u64 {
self.size
}

async fn delete(&self) -> anyhow::Result<()> {
tokio::fs::remove_file(&self.path).await?;
Ok(())
}
}

impl FileObject {}
11 changes: 1 addition & 10 deletions src/persistent/interface.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::future::Future;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};

pub trait Persistent: Send + Sync + Clone + 'static {
Expand Down Expand Up @@ -30,16 +29,8 @@ pub trait SstHandle: Send + Sync + 'static {
-> impl Future<Output = anyhow::Result<Vec<u8>>> + Send;

fn size(&self) -> u64;
}

impl<T: SstHandle> SstHandle for Arc<T> {
async fn read(&self, offset: u64, len: usize) -> anyhow::Result<Vec<u8>> {
self.as_ref().read(offset, len).await
}

fn size(&self) -> u64 {
self.as_ref().size()
}
fn delete(&self) -> impl Future<Output = anyhow::Result<()>> + Send;
}

pub trait WalHandle: AsyncWrite + AsyncRead + Send + Sync + Unpin + 'static {
Expand Down
1 change: 1 addition & 0 deletions src/persistent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod dummy;
pub mod file_object;
pub mod interface;
mod manifest_handle;
Expand Down
45 changes: 3 additions & 42 deletions src/sst/compact/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,6 @@ where
}
}

// pub async fn compact_with_task<P: Persistent>(
// sstables: &mut Sstables<P::SstHandle>,
// next_sst_id: SstIdGeneratorImpl,
// options: Arc<SstOptions>,
// persistent: P,
// task: &CompactionTask,
// watermark: Option<u64>,
// ) -> anyhow::Result<Vec<usize>> {
// let source = task.source();
// let source_level: Vec<_> = match task.source_index() {
// SourceIndex::Index { index } => {
// let source_id = *sstables.table_ids(source).get(index).unwrap();
// let source_level = sstables.sstables.get(&source_id).unwrap().as_ref();
// let source = iter::once(source_level);
// source.collect()
// }
// SourceIndex::Full { .. } => {
// let source = sstables.tables(source);
// source.collect()
// }
// };
//
// let destination = task.destination();
//
// let new_sst = assert_send(compact_generate_new_sst(
// source_level,
// sstables.tables(destination),
// next_sst_id,
// options,
// persistent,
// watermark,
// ))
// .await?;
//
// let new_sst_ids: Vec<_> = new_sst.iter().map(|table| table.id()).copied().collect();
//
// sstables.apply_compaction_sst(new_sst, task);
// sstables.apply_compaction_sst_ids(task, new_sst_ids.clone());
//
// Ok(new_sst_ids)
// }

pub async fn force_compact<P: Persistent + Clone>(
old_sstables: Arc<Sstables<P::SstHandle>>,
sstables: &mut Sstables<P::SstHandle>,
Expand Down Expand Up @@ -240,6 +198,9 @@ pub async fn force_compact<P: Persistent + Clone>(
.iter()
.chain(record.task.destination_ids.iter())
{
if let Some(sst) = sstables.sstables.get(old_id) {
sst.set_to_delete();
}
sstables.sstables.remove(old_id);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sst/compact/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::persistent::SstHandle;
use crate::sst::compact::common::NewCompactionTask;
use crate::sst::Sstables;

#[derive(Debug, Clone, Copy)]
pub struct LeveledCompactionOptions;

pub fn generate_full_compaction_task<File>(sstables: &Sstables<File>) -> Option<NewCompactionTask> {
pub fn generate_full_compaction_task<File: SstHandle>(
sstables: &Sstables<File>,
) -> Option<NewCompactionTask> {
let source_ids = sstables.table_ids(0).clone();
let destination_level = 1;
let destination_ids = sstables.table_ids(1).clone();
Expand Down
40 changes: 26 additions & 14 deletions src/sst/compact/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn generate_tasks<File: SstHandle>(
}
}

fn generate_tasks_for_other_level<'a, File>(
fn generate_tasks_for_other_level<'a, File: SstHandle>(
source_level: usize,
sstables: &'a Sstables<File>,
tables_in_compaction: &'a mut HashSet<usize>,
Expand Down Expand Up @@ -203,7 +203,7 @@ fn generate_tasks_for_other_level<'a, File>(
.flatten()
}

fn generate_task_for_l0<'a, File>(
fn generate_task_for_l0<'a, File: SstHandle>(
source_level: usize,
sstables: &'a Sstables<File>,
target_sizes: &[u64],
Expand Down Expand Up @@ -258,7 +258,7 @@ pub async fn compact_task<'a, P: Persistent>(
.await
}

fn generate_next_level_table_ids<File>(
fn generate_next_level_table_ids<File: SstHandle>(
tables_in_compaction: &mut HashSet<usize>,
sstables: &Sstables<File>,
source_key_range: &MinMax<KeyBytes>,
Expand All @@ -284,7 +284,6 @@ mod tests {

use std::collections::HashSet;

use nom::AsBytes;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use tokio::sync::Mutex;
Expand All @@ -299,7 +298,7 @@ mod tests {
use crate::sst::compact::{CompactionOptions, LeveledCompactionOptions};

use crate::sst::{SsTable, SstOptions, Sstables};
use crate::state::{LsmStorageState, Map};
use crate::state::LsmStorageState;
use crate::test_utils::insert_sst;

#[test]
Expand Down Expand Up @@ -407,7 +406,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_force_compaction() {
let dir = tempdir().unwrap();
let (state, mut sstables) = prepare_sstables(&dir).await;
Expand All @@ -427,6 +426,10 @@ mod tests {
)
.await
.unwrap();

let persistent = state.persistent.clone();
drop(state);

{
assert_eq!(sstables.l0_sstables, Vec::<usize>::new());
assert_eq!(
Expand All @@ -436,16 +439,25 @@ mod tests {
assert_eq!(sstables.sstables.len(), 8);
}

for i in 0..5 {
let begin = i * 100;
let range = begin..begin + 100;
for i in range {
let key = format!("key-{:04}", i);
let expected_value = format!("value-{:04}", i);
let value = state.get(key.as_bytes()).await.unwrap().unwrap();
assert_eq!(expected_value.as_bytes(), value.as_bytes());
// check old sst deleted
{
for id in 0..9 {
let path = persistent.build_sst_path(id);
assert!(!path.as_path().exists(), "sst {} still exists", id);
}
}

// todo: check keys
// for i in 0..5 {
// let begin = i * 100;
// let range = begin..begin + 100;
// for i in range {
// let key = format!("key-{:04}", i);
// let expected_value = format!("value-{:04}", i);
// let value = state.get(key.as_bytes()).await.unwrap().unwrap();
// assert_eq!(expected_value.as_bytes(), value.as_bytes());
// }
// }
}

async fn prepare_sstables(dir: &TempDir) -> (LsmStorageState<LocalFs>, Sstables<FileObject>) {
Expand Down
6 changes: 3 additions & 3 deletions src/sst/iterator/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ where
}

#[pin_project]
pub struct SsTableIterator<'a, File> {
pub struct SsTableIterator<'a, File: SstHandle> {
table: &'a SsTable<File>,
#[pin]
inner: InnerIter<'a>,
bloom: Option<&'a Bloom>,
}

impl<'a, File> SsTableIterator<'a, File> {
impl<'a, File: SstHandle> SsTableIterator<'a, File> {
pub fn may_contain(&self, key: &[u8]) -> bool {
bloom::may_contain(self.bloom, key)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ where
}

// todo: 感觉没必要 impl Stream,使用 (Bloom, InnerIter) 比较好?
impl<'a, File> Stream for SsTableIterator<'a, File> {
impl<'a, File: SstHandle> Stream for SsTableIterator<'a, File> {
type Item = anyhow::Result<InnerEntry>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
Loading