diff --git a/Cargo.toml b/Cargo.toml index fcafa3e5..ed0f05ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ zerocopy = { version = "0.8.24", features = ["derive"] } parking_lot = { version = "0.12.4", features = ["send_guard"] } fxhash = "0.2.1" static_assertions = "1.1.0" +rayon = "1.10.0" [dev-dependencies] criterion = "0.6.0" diff --git a/cli/Cargo.lock b/cli/Cargo.lock index 8ad15197..f6a71082 100644 --- a/cli/Cargo.lock +++ b/cli/Cargo.lock @@ -494,6 +494,31 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crunchy" version = "0.2.3" @@ -1272,6 +1297,26 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.11" @@ -1708,6 +1753,7 @@ dependencies = [ "parking_lot", "proptest", "proptest-derive 0.6.0", + "rayon", "sealed", "static_assertions", "zerocopy 0.8.24", diff --git a/src/database.rs b/src/database.rs index f373e262..bc16c41e 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,5 +1,6 @@ use crate::{ context::TransactionContext, + executor::threadpool, meta::{MetadataManager, OpenMetadataError}, metrics::DatabaseMetrics, page::{PageError, PageId, PageManager}, @@ -8,9 +9,11 @@ use crate::{ }; use alloy_primitives::B256; use parking_lot::Mutex; +use rayon::ThreadPoolBuildError; use std::{ fs::File, io, + num::NonZero, ops::Deref, path::{Path, PathBuf}, }; @@ -29,7 +32,9 @@ pub struct DatabaseOptions { create_new: bool, wipe: bool, meta_path: Option, - max_pages: u32, + max_pages: Option>, + num_threads: Option>, + io_parallelism: Option>, } #[derive(Debug)] @@ -42,6 +47,7 @@ pub enum Error { pub enum OpenError { PageError(PageError), MetadataError(OpenMetadataError), + ThreadPoolError(ThreadPoolBuildError), IO(io::Error), } @@ -81,8 +87,32 @@ impl DatabaseOptions { } /// Sets the maximum number of pages that can be allocated. - pub fn max_pages(&mut self, max_pages: u32) -> &mut Self { - self.max_pages = max_pages; + pub fn max_pages(&mut self, max_pages: NonZero) -> &mut Self { + self.max_pages = Some(max_pages); + self + } + + /// Sets the maximum number of threads used to CPU-intensive computations (like hashing). + /// + /// By default, the number of threads is selected automatically based on the number of + /// available CPUs on the system. The algorithm for deciding the default number is not + /// specified and may change in the future. + pub fn num_threads(&mut self, num_threads: NonZero) -> &mut Self { + self.num_threads = Some(num_threads); + self + } + + /// Sets the maximum amount I/O parallelism that can be used during writes. + /// + /// When data is written to the database as part of a write transaction, the database may opt + /// to write data to storage in parallel, rather than sequentially. This can result in + /// significant performance gains on modern SSD devices over NVMe/PCIe, or over certain RAID + /// setups. The number specified through `io_parallelism` specifies the maximum number of + /// *pages* that can be written in parallel at any given time. + /// + /// By default, `io_parallelism` is set to 128, although this default may change in the future. + pub fn io_parallelism(&mut self, io_parallelism: NonZero) -> &mut Self { + self.io_parallelism = Some(io_parallelism); self } @@ -136,15 +166,26 @@ impl Database { } let page_count = meta_manager.active_slot().page_count(); - let page_manager = PageManager::options() + let mut page_manager_opts = PageManager::options(); + page_manager_opts .create(opts.create) .create_new(opts.create_new) .wipe(opts.wipe) - .page_count(page_count) - .open(db_path) - .map_err(OpenError::PageError)?; + .page_count(page_count); + if let Some(max_pages) = opts.max_pages { + page_manager_opts.max_pages(max_pages.get()); + } + if let Some(io_parallelism) = opts.io_parallelism { + page_manager_opts.io_parallelism(io_parallelism); + } + let page_manager = page_manager_opts.open(db_path).map_err(OpenError::PageError)?; + + let thread_pool = threadpool::builder() + .num_threads(opts.num_threads.map(NonZero::get).unwrap_or(0)) + .build() + .map_err(OpenError::ThreadPoolError)?; - Ok(Self::new(StorageEngine::new(page_manager, meta_manager))) + Ok(Self::new(StorageEngine::new(page_manager, meta_manager, thread_pool))) } pub fn new(storage_engine: StorageEngine) -> Self { diff --git a/src/executor/futures.rs b/src/executor/futures.rs new file mode 100644 index 00000000..fd875182 --- /dev/null +++ b/src/executor/futures.rs @@ -0,0 +1,213 @@ +use crate::executor::Wait; +use std::{ + fmt, + sync::{Arc, OnceLock}, +}; + +#[derive(Debug)] +enum FutureStatus { + Completed(T), + Poisoned, +} + +use FutureStatus::*; + +/// A placeholder for a value that will be computed at a later time. +/// +/// `Future`s are the result of running functions in separate threads using +/// [`Executor::spawn`](crate::executor::Executor::spawn): calling `spawn()` in fact returns +/// immediately, even though the function will complete at a later time. The `Future` returned by +/// `spawn()` allows retrieving the result of the function once it completes. +/// +/// # Poisoning +/// +/// A `Future` may be in a "poisoned" status if the execution of the function that produced it +/// failed with a panic. +pub struct Future { + cell: Arc>>, +} + +impl Future { + #[inline] + #[must_use] + pub(super) fn pending() -> Self { + Self { cell: Arc::new(OnceLock::new()) } + } + + /// Creates a new `Future` that is already completed with the given value. + #[inline] + #[must_use] + pub fn ready(value: T) -> Self { + let this = Self::pending(); + this.complete(value); + this + } + + #[inline] + pub(super) fn complete(&self, value: T) { + self.try_complete(value).unwrap_or_else(|err| panic!("{err}")) + } + + #[inline] + pub(super) fn try_complete(&self, value: T) -> Result<(), ReadyError> { + self.cell.set(Completed(value)).map_err(|_| ReadyError) + } + + // There's no `poison()` method simply because it's not used internally. + + #[inline] + pub(super) fn try_poison(&self) -> Result<(), ReadyError> { + self.cell.set(Poisoned).map_err(|_| ReadyError) + } + + /// Returns the value of the `Future`, or `None` if this `Future` was not completed yet. + /// + /// # Panics + /// + /// If the `Future` is poisoned. See [`Future::try_get()`] for a non-panicking version of this + /// method. + #[inline] + #[must_use] + pub fn get(&self) -> Option<&T> { + self.try_get().map(|result| result.expect("Future is poisoned")) + } + + /// Returns the value of the `Future`, `None` if this `Future` was not completed yet, or an + /// error if this `Future` is poisoned. + #[inline] + #[must_use] + pub fn try_get(&self) -> Option> { + match self.cell.get() { + None => None, + Some(Completed(ref value)) => Some(Ok(value)), + Some(Poisoned) => Some(Err(PoisonError)), + } + } +} + +impl Wait for Future { + type Output = T; + + #[inline] + fn wait(&self) -> &Self::Output { + match self.cell.wait() { + Completed(value) => value, + Poisoned => panic!("{PoisonError}"), + } + } +} + +impl Clone for Future { + fn clone(&self) -> Self { + Self { cell: Arc::clone(&self.cell) } + } +} + +impl fmt::Debug for Future { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Pending; + + impl fmt::Debug for Pending { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + struct Poisoned; + + impl fmt::Debug for Poisoned { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + f.debug_tuple("Future") + .field(match self.try_get() { + None => &Pending, + Some(Ok(value)) => value, + Some(Err(PoisonError)) => &Poisoned, + }) + .finish() + } +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub(super) struct ReadyError; + +impl fmt::Display for ReadyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("attempted to complete or poison the Future twice") + } +} + +impl std::error::Error for ReadyError {} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct PoisonError; + +impl fmt::Display for PoisonError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("execution of the closure for this Future resulted in a panic") + } +} + +impl std::error::Error for PoisonError {} + +#[cfg(test)] +mod tests { + use super::*; + use std::{panic, sync::Barrier, thread, time::Duration}; + + #[test] + fn pending_to_completed() { + let f = Future::::pending(); + + assert_eq!(f.get(), None); + assert_eq!(f.try_get(), None); + + f.complete(123); + + assert_eq!(f.get(), Some(&123)); + assert_eq!(f.try_get(), Some(Ok(&123))); + } + + #[test] + fn pending_to_poisoned() { + let f = Future::::pending(); + + assert_eq!(f.get(), None); + assert_eq!(f.try_get(), None); + + f.try_poison().expect("poison failed"); + + panic::catch_unwind(|| f.get()).expect_err("get() should have panicked"); + assert_eq!(f.try_get(), Some(Err(PoisonError))); + } + + #[test] + fn wait() { + let f = Future::::pending(); + let g = f.clone(); + let barrier = Barrier::new(2); + + thread::scope(|s| { + s.spawn(|| { + barrier.wait(); + thread::sleep(Duration::from_secs(1)); + g.complete(123); + }); + + assert_eq!(f.get(), None); + assert_eq!(f.try_get(), None); + + barrier.wait(); + + assert_eq!(f.wait(), &123); + assert_eq!(f.get(), Some(&123)); + assert_eq!(f.try_get(), Some(Ok(&123))); + + // Waiting twice or more should return the same value + assert_eq!(f.wait(), &123); + }); + } +} diff --git a/src/executor/inline.rs b/src/executor/inline.rs new file mode 100644 index 00000000..607b1005 --- /dev/null +++ b/src/executor/inline.rs @@ -0,0 +1,29 @@ +use crate::executor::{Executor, Future}; + +/// A dummy executor that executes all functions in the same thread. +#[derive(Copy, Clone, Debug)] +pub struct Inline; + +impl Executor for Inline { + #[inline] + fn defer(&self, f: F) -> Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + Sync + 'static, + { + Future::ready(f()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::executor::Wait; + + #[test] + fn defer() { + let inline = Inline; + let future = inline.defer(|| 123); + assert_eq!(future.wait(), &123); + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs new file mode 100644 index 00000000..1ebd661c --- /dev/null +++ b/src/executor/mod.rs @@ -0,0 +1,33 @@ +//! Concurrent execution. +//! +//! This module provides structures and traits to run functions in separate threads and obtain +//! their results later on. The implementation is currently based on the popular [`rayon`] crate. +//! This module also provides a dummy implementation called [`Inline`] that does not spawn any +//! actual thread but executes everything serially. +//! +//! # Examples +//! +//! ``` +//! use triedb::executor::{threadpool, Executor, Wait}; +//! +//! // Create a thread pool +//! let pool = threadpool::builder().build().unwrap(); +//! +//! // Run some closures in the background. +//! let future1 = pool.defer(|| 1 + 1); +//! let future2 = pool.defer(|| 2 + 2); +//! +//! // Wait for the closures to return a result. +//! assert_eq!(future1.wait(), &2); +//! assert_eq!(future2.wait(), &4); +//! ``` + +mod futures; +mod inline; +mod traits; + +pub mod threadpool; + +pub use futures::{Future, PoisonError}; +pub use inline::Inline; +pub use traits::{Executor, Wait}; diff --git a/src/executor/threadpool.rs b/src/executor/threadpool.rs new file mode 100644 index 00000000..437a3299 --- /dev/null +++ b/src/executor/threadpool.rs @@ -0,0 +1,77 @@ +use crate::executor::{Executor, Future}; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::thread; + +/// A wrapper around [`ThreadPoolBuilder::new()`] which sets some default values to make the +/// resulting [`ThreadPool`] play nicely with the [`Executor`] trait. +pub fn builder() -> ThreadPoolBuilder { + ThreadPoolBuilder::new() + .thread_name(|num| format!("executor-thread-{num:03}")) + // The default behavior for `rayon` is to abort in case of panic, causing the whole program + // to crash. We instead want to catch individual panics and poison the relevant `Future` + // when those occur. + // + // This panic hanlder does nothing, so that the abort behavior is suppressed. We don't need + // to explicitly print an error message or the backtrace because this will be already taken + // care of. + .panic_handler(|_| {}) +} + +impl Executor for ThreadPool { + fn defer(&self, f: F) -> Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + Sync + 'static, + { + let sender_future = Future::pending(); + let receiver_future = sender_future.clone(); + + self.spawn(move || { + // Create the guard panic first, then run the closure. The guard will be dropped at the + // end of this scope. If the function succeeds, the guard won't do anything; if it + // panics, the guard's `Drop` implementation will poison the future. + let _guard = PanicGuard { future: &sender_future }; + sender_future.complete(f()) + }); + + receiver_future + } +} + +/// A "guard" to detect if this thread panics, and poison the `Future` in that case. +#[derive(Debug)] +struct PanicGuard<'a, T> { + future: &'a Future, +} + +impl<'a, T> Drop for PanicGuard<'a, T> { + fn drop(&mut self) { + if thread::panicking() { + // Unlikely, but if the future was already set, just ignore the error from + // `try_poison()` and carry on + let _ = self.future.try_poison(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::executor::{PoisonError, Wait}; + use std::panic; + + #[test] + fn defer() { + let pool = builder().build().expect("building thread pool failed"); + let future = pool.defer(|| 123); + assert_eq!(future.wait(), &123); + } + + #[test] + fn poisoning() { + let pool = builder().build().expect("building thread pool failed"); + let future = pool.defer(|| panic!("something went wrong")); + panic::catch_unwind(|| future.wait()).expect_err("wait() was expected to panic"); + assert_eq!(future.try_get(), Some(Err(PoisonError))); + } +} diff --git a/src/executor/traits.rs b/src/executor/traits.rs new file mode 100644 index 00000000..505f280b --- /dev/null +++ b/src/executor/traits.rs @@ -0,0 +1,33 @@ +use crate::executor::Future; + +/// Trait for objects that can be awaited until they reach their final state. +/// +/// The main structure that implements this trait is [`Future`], but also any structure that wraps +/// a `Future` may implement this trait. +pub trait Wait { + type Output; + + /// Blocks execution until the final state is reached. + fn wait(&self) -> &Self::Output; +} + +/// Trait for objects that can run functions concurrently. +pub trait Executor { + /// Runs the given closure `f` in a separate thread, and returns a [`Future`] that can be used + /// to obtain the result of `f` once its execution is complete. + fn defer(&self, f: F) -> Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + Sync + 'static; +} + +impl Executor for &E { + #[inline] + fn defer(&self, f: F) -> Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + Sync + 'static, + { + (**self).defer(f) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1ea572c6..6d9a18c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,10 +8,12 @@ // TODO: temporary allow these warnings so that we can enforce clippy rules #![allow(clippy::module_inception)] #![allow(clippy::too_many_arguments)] +#![allow(private_interfaces)] pub mod account; pub mod context; pub mod database; +pub mod executor; pub mod location; pub mod meta; pub mod metrics; diff --git a/src/page/manager.rs b/src/page/manager.rs index 21996001..d56241b1 100644 --- a/src/page/manager.rs +++ b/src/page/manager.rs @@ -1,7 +1,9 @@ use crate::page::PageId; +use rayon::ThreadPoolBuildError; pub(super) mod mmap; pub(super) mod options; +pub(super) mod syncer; /// Represents various errors that might arise from page operations. #[derive(Debug)] @@ -16,6 +18,7 @@ pub enum PageError { PageIsFull, PageSplitLimitReached, IO(std::io::Error), + ThreadPoolError(ThreadPoolBuildError), InvalidValue, InvalidPageContents(PageId), // TODO: add more errors here for other cases. diff --git a/src/page/manager/mmap.rs b/src/page/manager/mmap.rs index 1595659c..0d7a13c9 100644 --- a/src/page/manager/mmap.rs +++ b/src/page/manager/mmap.rs @@ -1,5 +1,5 @@ use crate::{ - page::{Page, PageError, PageId, PageManagerOptions, PageMut}, + page::{manager::syncer::PageSyncer, Page, PageError, PageId, PageManagerOptions, PageMut}, snapshot::SnapshotId, }; use memmap2::{Advice, MmapOptions, MmapRaw}; @@ -14,10 +14,10 @@ use std::{ // Manages pages in a memory mapped file. #[derive(Debug)] pub struct PageManager { - mmap: MmapRaw, file: Mutex, file_len: AtomicU64, page_count: AtomicU32, + syncer: PageSyncer, } impl PageManager { @@ -96,14 +96,22 @@ impl PageManager { file_len / (Page::SIZE as u64) ); + let syncer = + PageSyncer::new(mmap, opts.io_parallelism).map_err(PageError::ThreadPoolError)?; + Ok(Self { - mmap, file: Mutex::new(file), file_len: AtomicU64::new(file_len), page_count: AtomicU32::new(opts.page_count), + syncer, }) } + #[inline] + pub fn mmap(&self) -> &MmapRaw { + self.syncer.mmap() + } + /// Returns the number of pages currently stored in the file. pub fn size(&self) -> u32 { self.page_count.load(Ordering::Relaxed) @@ -111,7 +119,7 @@ impl PageManager { /// Returns the maximum number of pages that can be allocated to the file. pub fn capacity(&self) -> u32 { - (self.mmap.len() / Page::SIZE).min(u32::MAX as usize) as u32 + (self.mmap().len() / Page::SIZE).min(u32::MAX as usize) as u32 } /// Grows the size of the underlying file to make room for additional pages. @@ -124,7 +132,7 @@ impl PageManager { let cur_len = self.file_len.load(Ordering::Relaxed); let increment = (cur_len / 8).max(1024 * Page::SIZE as u64); let new_len = cur_len.checked_add(increment).ok_or(PageError::PageLimitReached)?; - let new_len = new_len.min(self.mmap.len() as u64); + let new_len = new_len.min(self.mmap().len() as u64); if new_len <= cur_len { return Err(PageError::PageLimitReached); } @@ -192,7 +200,7 @@ impl PageManager { let offset = page_id.as_offset(); // SAFETY: We have checked that the page fits inside the memory map. - let data = unsafe { self.mmap.as_mut_ptr().byte_add(offset).cast() }; + let data = unsafe { self.mmap().as_mut_ptr().byte_add(offset).cast() }; // SAFETY: All memory from the memory map is accessed through `Page` or `PageMut`, thus // respecting the page state access memory model. @@ -211,11 +219,11 @@ impl PageManager { let offset = page_id.as_offset(); // SAFETY: We have checked that the page fits inside the memory map. - let data = unsafe { self.mmap.as_mut_ptr().byte_add(offset).cast() }; + let data = unsafe { self.mmap().as_mut_ptr().byte_add(offset).cast() }; // TODO: This is actually unsafe, as it's possible to call `get()` arbitrary times before // calling this function (this will be fixed in a future commit). - unsafe { PageMut::from_ptr(page_id, snapshot_id, data) } + unsafe { PageMut::from_ptr(page_id, snapshot_id, data, Some(&self.syncer)) } } /// Adds a new page. @@ -225,14 +233,14 @@ impl PageManager { let (page_id, new_count) = self.next_page_id().ok_or(PageError::PageLimitReached)?; let new_len = new_count as usize * Page::SIZE; - if new_len > self.mmap.len() { + if new_len > self.mmap().len() { return Err(PageError::PageLimitReached); } self.grow_if_needed(new_len as u64)?; let offset = page_id.as_offset(); // SAFETY: We have checked that the page fits inside the memory map. - let data = unsafe { self.mmap.as_mut_ptr().byte_add(offset).cast() }; + let data = unsafe { self.mmap().as_mut_ptr().byte_add(offset).cast() }; // SAFETY: // - This is a newly created page at the end of the file, so we're guaranteed to have @@ -240,16 +248,13 @@ impl PageManager { // time, they would get a different `page_id`. // - All memory from the memory map is accessed through `Page` or `PageMut`, thus respecting // the page state access memory model. - unsafe { PageMut::acquire_unchecked(page_id, snapshot_id, data) } + unsafe { PageMut::acquire_unchecked(page_id, snapshot_id, data, Some(&self.syncer)) } } /// Syncs pages to the backing file. pub fn sync(&self) -> io::Result<()> { - if cfg!(not(miri)) { - self.mmap.flush() - } else { - Ok(()) - } + self.syncer.sync(); + Ok(()) } /// Syncs and closes the backing file. diff --git a/src/page/manager/options.rs b/src/page/manager/options.rs index f95cc20d..d2cd2674 100644 --- a/src/page/manager/options.rs +++ b/src/page/manager/options.rs @@ -1,6 +1,7 @@ use crate::page::{Page, PageError, PageManager}; use std::{ fs::{File, OpenOptions}, + num::NonZero, path::Path, }; @@ -9,6 +10,7 @@ pub struct PageManagerOptions { pub(super) open_options: OpenOptions, pub(super) page_count: u32, pub(super) max_pages: u32, + pub(super) io_parallelism: NonZero, } impl PageManagerOptions { @@ -24,7 +26,7 @@ impl PageManagerOptions { Page::MAX_COUNT / 1024 }; - Self { open_options, page_count: 0, max_pages } + Self { open_options, page_count: 0, max_pages, io_parallelism: NonZero::new(128).unwrap() } } /// Sets the option to create a new file, or open it if it already exists. @@ -70,6 +72,17 @@ impl PageManagerOptions { self } + /// Sets the maximum amount I/O parallelism that can be used during writes. + /// + /// This specifies the maximum number of *pages* that can be written in parallel at any given + /// time. + /// + /// By default, `io_parallelism` is set to 128, although this default may change in the future. + pub fn io_parallelism(&mut self, io_parallelism: NonZero) -> &mut Self { + self.io_parallelism = io_parallelism; + self + } + /// Opens the file at `path` with the options specified by `self`. pub fn open(&self, path: impl AsRef) -> Result { PageManager::open_with_options(self, path) diff --git a/src/page/manager/syncer.rs b/src/page/manager/syncer.rs new file mode 100644 index 00000000..4561e1db --- /dev/null +++ b/src/page/manager/syncer.rs @@ -0,0 +1,259 @@ +use crate::{ + executor::threadpool, + page::{Page, PageId}, +}; +use memmap2::MmapRaw; +use parking_lot::{Condvar, Mutex, MutexGuard}; +use rayon::{ThreadPool, ThreadPoolBuildError}; +use std::{ + collections::BTreeSet, + num::NonZero, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, +}; + +const MAX_PAGE_COUNT: u32 = 64; + +/// Writes pages to secondary storage using parallel I/O in background threads. +#[derive(Debug)] +pub(in crate::page) struct PageSyncer { + thread_pool: ThreadPool, + inner: Arc, +} + +#[derive(Debug)] +struct PageSyncerInner { + pub(super) mmap: MmapRaw, + /// Set of pages that are "dirty" (have been modified in-memory) and should be synced to + /// secondary storage. + dirty_pages: Mutex>, + /// Condition set whenever a new page is added to `dirty_pages`. Used to notify writer threads. + new_dirty_page: Condvar, + /// Condition set whenever `dirty_pages` is emptied out. Used to notify `sync()`. + dirty_pages_consumed: Condvar, + /// Set to `true` when `PageSyncer` is dropped, to signal all threads to quit. + quit: AtomicBool, + /// Tracks the number of active threads. Used by `stop_threads()` to know when to return. + active_threads: AtomicUsize, +} + +impl PageSyncer { + pub(super) fn new( + mmap: MmapRaw, + num_threads: NonZero, + ) -> Result { + let thread_pool = threadpool::builder().num_threads(num_threads.get()).build()?; + let syncer = Self { + thread_pool, + inner: Arc::new(PageSyncerInner { + mmap, + dirty_pages: Mutex::new(BTreeSet::new()), + new_dirty_page: Condvar::new(), + dirty_pages_consumed: Condvar::new(), + quit: AtomicBool::new(false), + active_threads: AtomicUsize::new(0), + }), + }; + syncer.spawn_threads(); + Ok(syncer) + } + + #[must_use] + pub(super) fn mmap(&self) -> &MmapRaw { + &self.inner.mmap + } + + pub(in crate::page) fn mark_dirty(&self, page_id: PageId) { + let mut dirty_pages = self.inner.dirty_pages.lock(); + if dirty_pages.insert(page_id) { + self.inner.new_dirty_page.notify_one(); + } + } + + pub(super) fn sync(&self) { + let mut dirty_pages = self.inner.dirty_pages.lock(); + if !dirty_pages.is_empty() { + // Wait once for the writer threads to consume `dirty_pages`. It's possible that right + // after that happens, another worker thread adds pages to `dirty_pages`, but we don't + // care: here we want to flush out all the pages that were in the set at the time that + // `sync()` was called; if more pages get added later, it's not our responsibility to + // wait for those to be synced. + // + // The problem with waiting until `dirty_pages` is really empty (i.e. changing the `if` + // to a `while`) is that this method would *potentially* be blocked forever. + self.inner.dirty_pages_consumed.wait(&mut dirty_pages); + } + } + + fn spawn_threads(&self) { + let num_threads = self.thread_pool.current_num_threads(); + for _ in 0..num_threads { + let inner_clone = Arc::clone(&self.inner); + self.thread_pool.spawn(move || inner_clone.sync_thread()); + } + } + + fn stop_threads(&self) { + // Set the quit flag and then wake up all threads + self.inner.quit.store(true, Ordering::Relaxed); + self.inner.new_dirty_page.notify_all(); + + // Wait for all threads to quit + loop { + let mut dirty_pages = self.inner.dirty_pages.lock(); + if self.inner.active_threads.load(Ordering::Relaxed) == 0 { + break; + } + self.inner.new_dirty_page.notify_all(); + self.inner.dirty_pages_consumed.wait(&mut dirty_pages); + } + } +} + +impl PageSyncerInner { + fn sync_thread(&self) { + self.active_threads.fetch_add(1, Ordering::Relaxed); + + loop { + let range = { + let mut dirty_pages = self.dirty_pages.lock(); + if dirty_pages.is_empty() { + // Let `sync()` know that `dirty_pages` is empty + self.dirty_pages_consumed.notify_all(); + if self.quit.load(Ordering::Relaxed) { + // `dirty_pages` is empty, and we've been asked by `stop_threads()` to quit + break; + } + // Wait for `dirty_pages` to be populated + self.new_dirty_page.wait(&mut dirty_pages); + } + // Get a chunk of dirty pages, and drop the lock (this is done implicitly as we + // exit the scope) + self.pop_dirty_pages(&mut dirty_pages) + }; + if let Some((start, count)) = range { + self.sync_range(start, count); + } + } + + // Decrement the thread count and send an extra notification so that `stop_threads()` can + // verify if `active_threads` has reached 0. + // + // In order to successfully synchronize with `stop_threads()`, we must hold a lock to + // `dirty_pages`, otherwise the risk is that the threads can quit after the + // `stop_threads()` has checked the thread count, but before it starts waiting, causing + // `stop_threads()` to be stuck forever. + let dirty_pages = self.dirty_pages.lock(); + self.active_threads.fetch_sub(1, Ordering::Relaxed); + self.dirty_pages_consumed.notify_all(); + drop(dirty_pages); + } + + /// Pops a contiguous chunk of pages from the `dirty_pages` set. + /// + /// The returned value (if any) is a pair `(start, count)`, where `start` is the first page to + /// be synced, and `count` is the number of pages (including `start`) to sync after the first + /// page. + #[must_use] + fn pop_dirty_pages( + &self, + dirty_pages: &mut MutexGuard<'_, BTreeSet>, + ) -> Option<(PageId, usize)> { + let start = dirty_pages.pop_first()?; + let mut end = start; + while end.as_u32() - start.as_u32() < MAX_PAGE_COUNT { + if let Some(next_page_id) = dirty_pages.first() { + if next_page_id.as_u32() == end.as_u32() + 1 { + end = *next_page_id; + dirty_pages.pop_first(); + continue; + } + } + break; + } + + let count = end.as_usize() - start.as_usize() + 1; + Some((start, count)) + } + + /// Calls `msync()` on a range of pages. + fn sync_range(&self, start: PageId, count: usize) { + let byte_start = start.as_offset(); + let byte_len = count * Page::SIZE; + if cfg!(not(miri)) { + if let Err(err) = self.mmap.flush_range(byte_start, byte_len) { + let byte_end = byte_start.saturating_add(byte_len); + // TODO: use `log::error!` instead + eprintln!("failed to sync pages to secondary storage (range: 0x{byte_start:x}..0x{byte_end:x}): {err}"); + } + } + } +} + +impl Drop for PageSyncer { + fn drop(&mut self) { + self.stop_threads(); + self.sync(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::page_id; + use memmap2::MmapOptions; + + #[must_use] + fn new_mmap() -> MmapRaw { + MmapOptions::new() + .len(10 * Page::SIZE) + .map_anon() + .expect("failed to create memory map") + .into() + } + + #[test] + fn drop_stops_all_threads() { + let num_threads = 16; + let syncer = PageSyncer::new(new_mmap(), NonZero::new(num_threads).unwrap()) + .expect("syncer creation failed"); + + // Obtain a weak reference to the inner struct. This struct is used by all background + // threads, which hold a strong reference to it, so if this weak reference becomes invalid + // later on it means that all threads have correctly stopped. + let inner = Arc::downgrade(&syncer.inner); + + assert_eq!(inner.strong_count(), num_threads + 1); + + drop(syncer); + + assert_eq!(inner.strong_count(), 0); + } + + #[test] + fn pop_dirty_pages_returns_contiguous_ranges() { + let num_threads = 16; + let syncer = PageSyncer::new(new_mmap(), NonZero::new(num_threads).unwrap()) + .expect("syncer creation failed"); + syncer.stop_threads(); + + syncer.mark_dirty(page_id!(1)); + syncer.mark_dirty(page_id!(3)); + syncer.mark_dirty(page_id!(5)); + syncer.mark_dirty(page_id!(7)); + syncer.mark_dirty(page_id!(9)); + + syncer.mark_dirty(page_id!(4)); + syncer.mark_dirty(page_id!(8)); + syncer.mark_dirty(page_id!(10)); + + let mut dirty_pages = syncer.inner.dirty_pages.lock(); + + assert_eq!(syncer.inner.pop_dirty_pages(&mut dirty_pages), Some((page_id!(1), 1))); + assert_eq!(syncer.inner.pop_dirty_pages(&mut dirty_pages), Some((page_id!(3), 3))); + assert_eq!(syncer.inner.pop_dirty_pages(&mut dirty_pages), Some((page_id!(7), 4))); + assert_eq!(syncer.inner.pop_dirty_pages(&mut dirty_pages), None); + } +} diff --git a/src/page/page.rs b/src/page/page.rs index 6a28ad47..f3f99646 100644 --- a/src/page/page.rs +++ b/src/page/page.rs @@ -1,6 +1,6 @@ use crate::{ page::{ - manager::PageError, + manager::{syncer::PageSyncer, PageError}, state::{PageState, RawPageState, RawPageStateMut}, PageId, }, @@ -22,22 +22,22 @@ compile_error!("This code only supports little-endian platforms"); /// /// This struct mainly exists to allow safe transmutation from [`PageMut`] to [`Page`]. #[derive(Copy, Clone)] -struct UnsafePage { +struct UnsafePage<'p> { id: PageId, ptr: *mut [u8; Page::SIZE], + syncer: Option<&'p PageSyncer>, } #[repr(transparent)] #[derive(Copy, Clone)] pub struct Page<'p> { - inner: UnsafePage, - phantom: PhantomData<&'p ()>, + inner: UnsafePage<'p>, } #[repr(transparent)] pub struct PageMut<'p> { - inner: UnsafePage, - phantom: PhantomData<&'p ()>, + inner: UnsafePage<'p>, + phantom: PhantomData<&'p mut ()>, } fn fmt_page(name: &str, p: &Page<'_>, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -71,9 +71,7 @@ impl Page<'_> { pub unsafe fn from_ptr(id: PageId, ptr: *mut [u8; Page::SIZE]) -> Result { // SAFETY: guaranteed by the caller match RawPageState::from_ptr(ptr.cast()).load() { - PageState::Occupied(_) => { - Ok(Self { inner: UnsafePage { id, ptr }, phantom: PhantomData }) - } + PageState::Occupied(_) => Ok(Self { inner: UnsafePage { id, ptr, syncer: None } }), PageState::Unused => Err(PageError::PageNotFound(id)), PageState::Dirty(_) => Err(PageError::PageDirty(id)), } @@ -139,6 +137,7 @@ impl<'p> PageMut<'p> { id: PageId, snapshot_id: SnapshotId, ptr: *mut [u8; Page::SIZE], + syncer: Option<&'p PageSyncer>, ) -> Result { let new_state = PageState::dirty(snapshot_id).expect("invalid value for `snapshot_id`"); @@ -147,7 +146,7 @@ impl<'p> PageMut<'p> { PageState::Unused | PageState::Occupied(_) => Some(new_state), PageState::Dirty(_) => None, }) { - Ok(_) => Ok(Self { inner: UnsafePage { id, ptr }, phantom: PhantomData }), + Ok(_) => Ok(Self { inner: UnsafePage { id, ptr, syncer }, phantom: PhantomData }), Err(PageState::Unused) => Err(PageError::PageNotFound(id)), Err(PageState::Dirty(_)) => Err(PageError::PageDirty(id)), Err(PageState::Occupied(_)) => unreachable!(), @@ -175,13 +174,14 @@ impl<'p> PageMut<'p> { id: PageId, snapshot_id: SnapshotId, ptr: *mut [u8; Page::SIZE], + syncer: Option<&'p PageSyncer>, ) -> Result { let new_state = PageState::dirty(snapshot_id).expect("invalid value for `snapshot_id`"); // SAFETY: guaranteed by the caller match RawPageStateMut::from_ptr(ptr.cast()).compare_exchange(PageState::Unused, new_state) { Ok(_) => { - let mut p = Self { inner: UnsafePage { id, ptr }, phantom: PhantomData }; + let mut p = Self { inner: UnsafePage { id, ptr, syncer }, phantom: PhantomData }; p.raw_contents_mut().fill(0); Ok(p) } @@ -219,11 +219,12 @@ impl<'p> PageMut<'p> { id: PageId, snapshot_id: SnapshotId, ptr: *mut [u8; Page::SIZE], + syncer: Option<&'p PageSyncer>, ) -> Result { let new_state = PageState::dirty(snapshot_id).expect("invalid value for `snapshot_id`"); RawPageStateMut::from_ptr(ptr.cast()).store(new_state); - let mut p = Self { inner: UnsafePage { id, ptr }, phantom: PhantomData }; + let mut p = Self { inner: UnsafePage { id, ptr, syncer }, phantom: PhantomData }; p.raw_contents_mut().fill(0); Ok(p) @@ -233,10 +234,15 @@ impl<'p> PageMut<'p> { /// /// This method is safe because the mutable reference ensures that there cannot be any other /// living reference to this page. - pub fn new(id: PageId, snapshot_id: SnapshotId, data: &'p mut [u8; Page::SIZE]) -> Self { + pub fn new( + id: PageId, + snapshot_id: SnapshotId, + data: &'p mut [u8; Page::SIZE], + syncer: Option<&'p PageSyncer>, + ) -> Self { // SAFETY: `data` is behind a mutable reference, therefore we have exclusive access to the // data. - unsafe { Self::acquire(id, snapshot_id, data) }.unwrap() + unsafe { Self::acquire(id, snapshot_id, data, syncer) }.unwrap() } #[inline] @@ -270,8 +276,8 @@ impl<'p> PageMut<'p> { /// Transitions the page state from *dirty* to *occupied*. /// /// This has the same effect as dropping the page, but it's more explicit. - pub fn commit(mut self) { - self.commit_internal(); + pub fn commit(self) { + // Do nothing and just let `self` get dropped } /// Transitions the page state from *dirty* to *unused*. @@ -289,6 +295,9 @@ impl<'p> PageMut<'p> { fn commit_internal(&mut self) { self.raw_state_mut().unset_dirty(); + if let Some(syncer) = self.inner.syncer { + syncer.mark_dirty(self.id()); + } } } @@ -342,7 +351,7 @@ mod tests { let mut data = DataArray([0; Page::SIZE]); let page_mut = unsafe { - PageMut::from_ptr(id, snapshot, &mut data.0).expect("loading mutable page failed") + PageMut::from_ptr(id, snapshot, &mut data.0, None).expect("loading mutable page failed") }; assert_eq!(page_mut.id(), 42); diff --git a/src/page/slotted_page.rs b/src/page/slotted_page.rs index 7082004d..940c6174 100644 --- a/src/page/slotted_page.rs +++ b/src/page/slotted_page.rs @@ -474,7 +474,7 @@ mod tests { #[test] fn test_insert_get_value() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("hello"); @@ -502,7 +502,7 @@ mod tests { #[test] fn test_insert_set_value() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("hello"); @@ -522,7 +522,7 @@ mod tests { #[test] fn test_set_value_same_length() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("hello"); @@ -552,7 +552,7 @@ mod tests { #[test] fn test_set_value_shrink() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("hello"); @@ -584,7 +584,7 @@ mod tests { #[test] fn test_set_value_shrink_with_neighbors() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("one"); @@ -644,7 +644,7 @@ mod tests { #[test] fn test_set_value_grow() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("this"); @@ -674,7 +674,7 @@ mod tests { #[test] fn test_set_value_grow_with_neighbors() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let v1 = String::from("one"); @@ -734,7 +734,7 @@ mod tests { #[test] fn test_allocate_get_delete_cell_pointer() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let cell_index = subtrie_page.insert_value(&String::from("foo")).unwrap(); assert_eq!(cell_index, 0); @@ -827,7 +827,7 @@ mod tests { #[test] fn test_allocate_reuse_deleted_space() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let i0 = subtrie_page.insert_value(&String::from_iter(&['a'; 1020])).unwrap(); @@ -856,7 +856,7 @@ mod tests { #[test] fn test_allocate_reuse_deleted_spaces() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); // bytes 0-12 are used by the header, and the next 4072 are used by the first 4 cells @@ -914,7 +914,7 @@ mod tests { #[test] fn test_defragment_page() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut subtrie_page = SlottedPageMut::try_from(page).unwrap(); let i0 = subtrie_page.insert_value(&String::from_iter(&['a'; 814])).unwrap(); @@ -960,7 +960,7 @@ mod tests { #[test] fn test_defragment_page_cells_out_of_order() { let mut data = DataArray([0; Page::SIZE]); - let page = PageMut::new(page_id!(42), 123, &mut data.0); + let page = PageMut::new(page_id!(42), 123, &mut data.0, None); let mut slotted_page = SlottedPageMut::try_from(page).unwrap(); slotted_page.set_num_cells(16); diff --git a/src/storage/engine.rs b/src/storage/engine.rs index e8cc9730..299c6e79 100644 --- a/src/storage/engine.rs +++ b/src/storage/engine.rs @@ -20,6 +20,7 @@ use crate::{ use alloy_primitives::StorageValue; use alloy_trie::{nodes::RlpNode, nybbles, Nibbles, EMPTY_ROOT_HASH}; use parking_lot::Mutex; +use rayon::ThreadPool; use std::{ fmt::Debug, io, @@ -37,6 +38,8 @@ pub struct StorageEngine { pub(crate) page_manager: PageManager, pub(crate) meta_manager: Mutex, pub(crate) alive_snapshot: AtomicU64, + #[allow(dead_code)] + thread_pool: ThreadPool, } #[derive(Debug)] @@ -47,12 +50,17 @@ enum PointerChange { } impl StorageEngine { - pub fn new(page_manager: PageManager, meta_manager: MetadataManager) -> Self { + pub fn new( + page_manager: PageManager, + meta_manager: MetadataManager, + thread_pool: ThreadPool, + ) -> Self { let alive_snapshot = meta_manager.active_slot().snapshot_id(); Self { page_manager, meta_manager: Mutex::new(meta_manager), alive_snapshot: AtomicU64::new(alive_snapshot), + thread_pool, } } diff --git a/src/storage/test_utils.rs b/src/storage/test_utils.rs index 7469f45a..efd91a82 100644 --- a/src/storage/test_utils.rs +++ b/src/storage/test_utils.rs @@ -1,20 +1,23 @@ #![cfg(test)] -use alloy_primitives::U256; -use alloy_trie::{EMPTY_ROOT_HASH, KECCAK_EMPTY}; -use rand::{rngs::StdRng, RngCore}; - use crate::{ - account::Account, context::TransactionContext, meta::MetadataManager, + account::Account, context::TransactionContext, executor::threadpool, meta::MetadataManager, storage::engine::StorageEngine, PageManager, }; +use alloy_primitives::U256; +use alloy_trie::{EMPTY_ROOT_HASH, KECCAK_EMPTY}; +use rand::{rngs::StdRng, RngCore}; pub(crate) fn create_test_engine(max_pages: u32) -> (StorageEngine, TransactionContext) { let meta_manager = MetadataManager::from_file(tempfile::tempfile().expect("failed to create temporary file")) .expect("failed to open metadata file"); - let page_manager = PageManager::options().max_pages(max_pages).open_temp_file().unwrap(); - let storage_engine = StorageEngine::new(page_manager, meta_manager); + let page_manager = PageManager::options() + .max_pages(max_pages) + .open_temp_file() + .expect("failed to create page manager"); + let thread_pool = threadpool::builder().build().expect("failed to create thread pool"); + let storage_engine = StorageEngine::new(page_manager, meta_manager, thread_pool); let context = storage_engine.write_context(); (storage_engine, context) }