Skip to content
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ metrics = []
byteorder = "1.5.0"
crossbeam-skiplist = "0.1.3"
enum_dispatch = "0.3.13"
equivalent = "1.0.2"
interval-heap = "0.0.5"
log = "0.4.27"
lz4_flex = { version = "0.11.5", optional = true, default-features = false }
Expand All @@ -44,6 +45,7 @@ xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
fs_extra = "1.3.0"
nanoid = "0.4.0"
quickcheck = "1.0.3"
rand = "0.9.2"
test-log = "0.2.18"

Expand Down
14 changes: 10 additions & 4 deletions benches/memtable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::{InternalValue, Memtable};
use lsm_tree::{InternalValue, Memtable, SeqNo};
use nanoid::nanoid;

fn memtable_get_hit(c: &mut Criterion) {
Expand All @@ -25,7 +25,10 @@ fn memtable_get_hit(c: &mut Criterion) {
b.iter(|| {
assert_eq!(
[1, 2, 3],
&*memtable.get(b"abc_w5wa35aw35naw", None).unwrap().value,
&*memtable
.get(b"abc_w5wa35aw35naw", SeqNo::MAX)
.unwrap()
.value,
)
});
});
Expand Down Expand Up @@ -60,7 +63,10 @@ fn memtable_get_snapshot(c: &mut Criterion) {
b.iter(|| {
assert_eq!(
[1, 2, 3],
&*memtable.get(b"abc_w5wa35aw35naw", Some(1)).unwrap().value,
&*memtable
.get(b"abc_w5wa35aw35naw", SeqNo::MAX)
.unwrap()
.value,
);
});
});
Expand All @@ -79,7 +85,7 @@ fn memtable_get_miss(c: &mut Criterion) {
}

c.bench_function("memtable get miss", |b| {
b.iter(|| assert!(memtable.get(b"abc_564321", None).is_none()));
b.iter(|| assert!(memtable.get(b"abc_564321", SeqNo::MAX).is_none()));
});
}

Expand Down
49 changes: 26 additions & 23 deletions src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
SeqNo, UserKey, ValueType,
};
use byteorder::{ReadBytesExt, WriteBytesExt};
use equivalent::{Comparable, Equivalent};
use std::{
cmp::Reverse,
io::{Read, Write},
Expand Down Expand Up @@ -131,37 +132,27 @@ impl Ord for InternalKey {
}
}

// TODO: wait for new crossbeam-skiplist
// TODO: https://github.com/crossbeam-rs/crossbeam/pull/1162
//
// impl Equivalent<InternalKeyRef<'_>> for InternalKey {
// fn equivalent(&self, other: &InternalKeyRef<'_>) -> bool {
// self.user_key == other.user_key && self.seqno == other.seqno
// }
// }
impl Equivalent<InternalKeyRef<'_>> for InternalKey {
fn equivalent(&self, other: &InternalKeyRef<'_>) -> bool {
self.user_key == other.user_key && self.seqno == other.seqno
}
}

// impl Comparable<InternalKeyRef<'_>> for InternalKey {
// fn compare(&self, other: &InternalKeyRef<'_>) -> std::cmp::Ordering {
// (&*self.user_key, Reverse(self.seqno)).cmp(&(other.user_key, Reverse(other.seqno)))
// }
// }
impl Comparable<InternalKeyRef<'_>> for InternalKey {
fn compare(&self, other: &InternalKeyRef<'_>) -> std::cmp::Ordering {
(&*self.user_key, Reverse(self.seqno)).cmp(&(other.user_key, Reverse(other.seqno)))
}
}

/* /// Temporary internal key without heap allocation
#[derive(Clone, Debug, Eq)]
/// Temporary internal key without heap allocation
#[derive(Debug, Eq)]
pub struct InternalKeyRef<'a> {
pub user_key: &'a [u8],
pub seqno: SeqNo,
pub value_type: ValueType,
}

impl<'a> AsRef<[u8]> for InternalKeyRef<'a> {
fn as_ref(&self) -> &[u8] {
self.user_key
}
}

impl<'a> InternalKeyRef<'a> {
// Constructor for InternalKeyRef
pub fn new(user_key: &'a [u8], seqno: u64, value_type: ValueType) -> Self {
InternalKeyRef {
user_key,
Expand All @@ -187,4 +178,16 @@ impl<'a> Ord for InternalKeyRef<'a> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
} */
}

impl Equivalent<InternalKey> for InternalKeyRef<'_> {
fn equivalent(&self, other: &InternalKey) -> bool {
self.user_key == other.user_key && self.seqno == other.seqno
}
}

impl Comparable<InternalKey> for InternalKeyRef<'_> {
fn compare(&self, other: &InternalKey) -> std::cmp::Ordering {
(self.user_key, Reverse(self.seqno)).cmp(&(&other.user_key, Reverse(other.seqno)))
}
}
17 changes: 12 additions & 5 deletions src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::key::InternalKey;
#[allow(unsafe_code)]
mod skiplist;

use crate::key::{InternalKey, InternalKeyRef};
use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
use crossbeam_skiplist::SkipMap;
use skiplist::SkipMap;
use std::ops::RangeBounds;
use std::sync::atomic::AtomicU64;

Expand All @@ -31,7 +34,7 @@ pub struct Memtable {
impl Memtable {
/// Clears the memtable.
pub fn clear(&mut self) {
self.items.clear();
self.items = SkipMap::default();
self.highest_seqno = AtomicU64::new(0);
self.approximate_size
.store(0, std::sync::atomic::Ordering::Release);
Expand Down Expand Up @@ -81,7 +84,7 @@ impl Memtable {
// abcdef -> 6
// abcdef -> 5
//
let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
let lower_bound = InternalKeyRef::new(key, seqno - 1, ValueType::Value);

let mut iter = self
.items
Expand Down Expand Up @@ -126,7 +129,11 @@ impl Memtable {
.fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);

let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
self.items.insert(key, item.value);
// TODO(ajwerner): Decide what we want to do here. The panic is sort of
// extreme, but also seems right given the invariants.
if let Err((key, _value)) = self.items.insert(key, item.value) {
panic!("duplicate insert of {key:?} into memtable")
}

self.highest_seqno
.fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
Expand Down
120 changes: 120 additions & 0 deletions src/memtable/skiplist/arena.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use std::{
alloc::Layout,
mem::offset_of,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Mutex,
},
};

// DEFAULT_BUFFER_SIZE needs to be at least big enough for one fullly-aligned node
// for the crate to work correctly. Anything larger than that will work.
//
// TODO: Justify this size.
const DEFAULT_BUFFER_SIZE: usize = (32 << 10) - size_of::<AtomicUsize>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to play with this a bit - but should probably be much higher by default: 1 MB or so?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should be bigger than 32k, but 1MiB might be too big. The keys and values are not inline, it’s just the metadata. The questions I’d have are how expensive is allocating a new block, and how expensive is inserting into the skip map. My guess is that the alloc is not likely worse than 10us (it’s probably way less) and the inserts are ~100ns. If you can fit 1000 in here (if we say the average links is 32 and the key and value are each 32 bytes), then you’ll have spent at least 10x as long doing the inserting. In practice I think the mallocs even with zeroing is a lot cheaper. The benchmarks I was playing with don’t show much win above 256KiB.


impl<const BUFFER_SIZE: usize> Default for Arenas<BUFFER_SIZE> {
fn default() -> Self {
Self::new()
}
}

unsafe impl<const N: usize> Send for Arenas<N> {}
unsafe impl<const N: usize> Sync for Arenas<N> {}

pub(crate) struct Arenas<const BUFFER_SIZE: usize = DEFAULT_BUFFER_SIZE> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, for write transactions, the size should be much smaller (so that small transactions don't overallocate too much) - so this needs to be a non-generic parameter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I can do that.

// The current set of Arenas
arenas: Mutex<Vec<*mut Buffer<BUFFER_SIZE>>>,
// Cache of the currently open Arena. It'll be the last item in the buffers
// vec. This atomic is only ever written while holding the buffers Mutex.
open_arena: AtomicPtr<Buffer<BUFFER_SIZE>>,
}

impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
pub(crate) fn new() -> Self {
Self {
arenas: Mutex::default(),
open_arena: AtomicPtr::default(),
}
}
}

impl<const BUFFER_SIZE: usize> Arenas<BUFFER_SIZE> {
pub(crate) fn alloc(&self, layout: Layout) -> *mut u8 {
loop {
let buffer_tail = self.open_arena.load(Ordering::Acquire);
if !buffer_tail.is_null() {
if let Some(offset) = try_alloc(buffer_tail, layout) {
return offset;
}
}

let mut buffers = self.arenas.lock().expect("lock is poisoned");
let buffer = buffers.last().unwrap_or(&std::ptr::null_mut());
if *buffer != buffer_tail {
// Lost the race with somebody else.
continue;
}

let new_buffer: Box<Buffer<BUFFER_SIZE>> = Box::default();
let new_buffer = Box::into_raw(new_buffer);
self.open_arena.store(new_buffer, Ordering::Release);
buffers.push(new_buffer);
}
}
}

struct Buffer<const N: usize> {
offset: AtomicUsize,
data: [u8; N],
}

impl<const N: usize> Default for Buffer<N> {
fn default() -> Self {
Self {
offset: AtomicUsize::default(),
data: [0; N],
}
}
}

impl<const N: usize> Drop for Arenas<N> {
fn drop(&mut self) {
let mut buffers = self.arenas.lock().expect("lock is poisoned");

for buffer in buffers.drain(..) {
drop(unsafe { Box::from_raw(buffer) });
}
}
}

fn try_alloc<const N: usize>(buf: *mut Buffer<N>, layout: Layout) -> Option<*mut u8> {
let mut cur_offset = unsafe { &(*buf).offset }.load(Ordering::Relaxed);

loop {
let buf_start = unsafe { buf.byte_add(offset_of!(Buffer<N>, data)) as *mut u8 };
let free_start = unsafe { buf_start.byte_add(cur_offset) };
let start_addr = unsafe { free_start.byte_add(free_start.align_offset(layout.align())) };
let new_offset = ((start_addr as usize) + layout.size()) - (buf_start as usize);
if new_offset > N {
return None;
}

// Note that we can get away with using relaxed ordering here because we're not
// asserting anything about the contents of the buffer. We're just trying to
// allocate a new node.
match unsafe { &(*buf).offset }.compare_exchange(
cur_offset,
new_offset,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_offset) => return Some(start_addr),
Err(offset) => cur_offset = offset,
}
}
}
30 changes: 30 additions & 0 deletions src/memtable/skiplist/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

// This implementation was heavily inspired by:
// * https://github.com/andy-kimball/arenaskl/tree/f7010085
// * https://github.com/crossbeam-rs/crossbeam/tree/983d56b6/crossbeam-skiplist

//! This mod is a purpose-built concurrent skiplist intended for use
//! by the memtable.
//!
//! Due to the requirements of memtable, there are a number of notable in the
//! features it lacks:
//! - Updates
//! - Deletes
//! - Overwrites
//!
//! The main reasons for its existence are that it
//! - provides concurrent reads and inserts, and
//! - batches memory allocations
//!
//! Prior to this implementation, `crossbeam_skiplist` was used.

mod arena;
mod skipmap;

pub use skipmap::SkipMap;

#[cfg(test)]
mod test;
Loading
Loading