Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ zerocopy = { version = "0.8.24", features = ["derive"] }
parking_lot = { version = "0.12.4", features = ["send_guard"] }
fxhash = "0.2.1"
static_assertions = "1.1.0"
rayon = "1.10.0"

[dev-dependencies]
criterion = "0.6.0"
Expand Down
46 changes: 46 additions & 0 deletions cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 49 additions & 8 deletions src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
context::TransactionContext,
executor::threadpool,
meta::{MetadataManager, OpenMetadataError},
metrics::DatabaseMetrics,
page::{PageError, PageId, PageManager},
Expand All @@ -8,9 +9,11 @@ use crate::{
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use rayon::ThreadPoolBuildError;
use std::{
fs::File,
io,
num::NonZero,
ops::Deref,
path::{Path, PathBuf},
};
Expand All @@ -29,7 +32,9 @@ pub struct DatabaseOptions {
create_new: bool,
wipe: bool,
meta_path: Option<PathBuf>,
max_pages: u32,
max_pages: Option<NonZero<u32>>,
num_threads: Option<NonZero<usize>>,
io_parallelism: Option<NonZero<usize>>,
}

#[derive(Debug)]
Expand All @@ -42,6 +47,7 @@ pub enum Error {
pub enum OpenError {
PageError(PageError),
MetadataError(OpenMetadataError),
ThreadPoolError(ThreadPoolBuildError),
IO(io::Error),
}

Expand Down Expand Up @@ -81,8 +87,32 @@ impl DatabaseOptions {
}

/// Sets the maximum number of pages that can be allocated.
pub fn max_pages(&mut self, max_pages: u32) -> &mut Self {
self.max_pages = max_pages;
pub fn max_pages(&mut self, max_pages: NonZero<u32>) -> &mut Self {
self.max_pages = Some(max_pages);
self
}

/// Sets the maximum number of threads used to CPU-intensive computations (like hashing).
///
/// By default, the number of threads is selected automatically based on the number of
/// available CPUs on the system. The algorithm for deciding the default number is not
/// specified and may change in the future.
pub fn num_threads(&mut self, num_threads: NonZero<usize>) -> &mut Self {
self.num_threads = Some(num_threads);
self
}

/// Sets the maximum amount I/O parallelism that can be used during writes.
///
/// When data is written to the database as part of a write transaction, the database may opt
/// to write data to storage in parallel, rather than sequentially. This can result in
/// significant performance gains on modern SSD devices over NVMe/PCIe, or over certain RAID
/// setups. The number specified through `io_parallelism` specifies the maximum number of
/// *pages* that can be written in parallel at any given time.
///
/// By default, `io_parallelism` is set to 128, although this default may change in the future.
pub fn io_parallelism(&mut self, io_parallelism: NonZero<usize>) -> &mut Self {
self.io_parallelism = Some(io_parallelism);
self
}

Expand Down Expand Up @@ -136,15 +166,26 @@ impl Database {
}

let page_count = meta_manager.active_slot().page_count();
let page_manager = PageManager::options()
let mut page_manager_opts = PageManager::options();
page_manager_opts
.create(opts.create)
.create_new(opts.create_new)
.wipe(opts.wipe)
.page_count(page_count)
.open(db_path)
.map_err(OpenError::PageError)?;
.page_count(page_count);
if let Some(max_pages) = opts.max_pages {
page_manager_opts.max_pages(max_pages.get());
}
if let Some(io_parallelism) = opts.io_parallelism {
page_manager_opts.io_parallelism(io_parallelism);
}
let page_manager = page_manager_opts.open(db_path).map_err(OpenError::PageError)?;

let thread_pool = threadpool::builder()
.num_threads(opts.num_threads.map(NonZero::get).unwrap_or(0))
.build()
.map_err(OpenError::ThreadPoolError)?;

Ok(Self::new(StorageEngine::new(page_manager, meta_manager)))
Ok(Self::new(StorageEngine::new(page_manager, meta_manager, thread_pool)))
}

pub fn new(storage_engine: StorageEngine) -> Self {
Expand Down
213 changes: 213 additions & 0 deletions src/executor/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
use crate::executor::Wait;
use std::{
fmt,
sync::{Arc, OnceLock},
};

#[derive(Debug)]
enum FutureStatus<T> {
Completed(T),
Poisoned,
}

use FutureStatus::*;

/// A placeholder for a value that will be computed at a later time.
///
/// `Future`s are the result of running functions in separate threads using
/// [`Executor::spawn`](crate::executor::Executor::spawn): calling `spawn()` in fact returns
/// immediately, even though the function will complete at a later time. The `Future` returned by
/// `spawn()` allows retrieving the result of the function once it completes.
///
/// # Poisoning
///
/// A `Future` may be in a "poisoned" status if the execution of the function that produced it
/// failed with a panic.
pub struct Future<T> {
cell: Arc<OnceLock<FutureStatus<T>>>,
}

impl<T> Future<T> {
#[inline]
#[must_use]
pub(super) fn pending() -> Self {
Self { cell: Arc::new(OnceLock::new()) }
}

/// Creates a new `Future` that is already completed with the given value.
#[inline]
#[must_use]
pub fn ready(value: T) -> Self {
let this = Self::pending();
this.complete(value);
this
}

#[inline]
pub(super) fn complete(&self, value: T) {
self.try_complete(value).unwrap_or_else(|err| panic!("{err}"))
}

#[inline]
pub(super) fn try_complete(&self, value: T) -> Result<(), ReadyError> {
self.cell.set(Completed(value)).map_err(|_| ReadyError)
}

// There's no `poison()` method simply because it's not used internally.

#[inline]
pub(super) fn try_poison(&self) -> Result<(), ReadyError> {
self.cell.set(Poisoned).map_err(|_| ReadyError)
}

/// Returns the value of the `Future`, or `None` if this `Future` was not completed yet.
///
/// # Panics
///
/// If the `Future` is poisoned. See [`Future::try_get()`] for a non-panicking version of this
/// method.
#[inline]
#[must_use]
pub fn get(&self) -> Option<&T> {
self.try_get().map(|result| result.expect("Future is poisoned"))
}

/// Returns the value of the `Future`, `None` if this `Future` was not completed yet, or an
/// error if this `Future` is poisoned.
#[inline]
#[must_use]
pub fn try_get(&self) -> Option<Result<&T, PoisonError>> {
match self.cell.get() {
None => None,
Some(Completed(ref value)) => Some(Ok(value)),
Some(Poisoned) => Some(Err(PoisonError)),
}
}
}

impl<T> Wait for Future<T> {
type Output = T;

#[inline]
fn wait(&self) -> &Self::Output {
match self.cell.wait() {
Completed(value) => value,
Poisoned => panic!("{PoisonError}"),
}
}
}

impl<T> Clone for Future<T> {
fn clone(&self) -> Self {
Self { cell: Arc::clone(&self.cell) }
}
}

impl<T: fmt::Debug> fmt::Debug for Future<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Pending;

impl fmt::Debug for Pending {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<pending>")
}
}

struct Poisoned;

impl fmt::Debug for Poisoned {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<poisoned>")
}
}

f.debug_tuple("Future")
.field(match self.try_get() {
None => &Pending,
Some(Ok(value)) => value,
Some(Err(PoisonError)) => &Poisoned,
})
.finish()
}
}

#[derive(Clone, PartialEq, Eq, Debug)]
pub(super) struct ReadyError;

impl fmt::Display for ReadyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("attempted to complete or poison the Future twice")
}
}

impl std::error::Error for ReadyError {}

#[derive(Clone, PartialEq, Eq, Debug)]
pub struct PoisonError;

impl fmt::Display for PoisonError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("execution of the closure for this Future resulted in a panic")
}
}

impl std::error::Error for PoisonError {}

#[cfg(test)]
mod tests {
use super::*;
use std::{panic, sync::Barrier, thread, time::Duration};

#[test]
fn pending_to_completed() {
let f = Future::<u32>::pending();

assert_eq!(f.get(), None);
assert_eq!(f.try_get(), None);

f.complete(123);

assert_eq!(f.get(), Some(&123));
assert_eq!(f.try_get(), Some(Ok(&123)));
}

#[test]
fn pending_to_poisoned() {
let f = Future::<u32>::pending();

assert_eq!(f.get(), None);
assert_eq!(f.try_get(), None);

f.try_poison().expect("poison failed");

panic::catch_unwind(|| f.get()).expect_err("get() should have panicked");
assert_eq!(f.try_get(), Some(Err(PoisonError)));
}

#[test]
fn wait() {
let f = Future::<u32>::pending();
let g = f.clone();
let barrier = Barrier::new(2);

thread::scope(|s| {
s.spawn(|| {
barrier.wait();
thread::sleep(Duration::from_secs(1));
g.complete(123);
});

assert_eq!(f.get(), None);
assert_eq!(f.try_get(), None);

barrier.wait();

assert_eq!(f.wait(), &123);
assert_eq!(f.get(), Some(&123));
assert_eq!(f.try_get(), Some(Ok(&123)));

// Waiting twice or more should return the same value
assert_eq!(f.wait(), &123);
});
}
}
Loading
Loading