From 3ccd3bb230f816d0f7df53127910e2d61051a3eb Mon Sep 17 00:00:00 2001 From: Gregor Peach Date: Sun, 28 Nov 2021 15:38:21 -0800 Subject: [PATCH] Refactor the collector to use reference counting This is around a ~4x speedup, but required a lot of code changes. In terms of public API, only `AtomicGc` has a significant diff. (The old API would still work, but the changes make it work more efficently with the new internals.) --- Cargo.toml | 6 +- README.md | 1 - src/atomic.rs | 267 +++++++++++------------ src/collector/alloc.rs | 2 +- src/collector/collect_impl.rs | 184 +++++++--------- src/collector/data.rs | 41 +--- src/collector/dropper.rs | 56 +++-- src/collector/mod.rs | 291 ++++++++++++------------- src/collector/ref_cnt.rs | 92 ++++++++ src/concurrency/chunked_ll.rs | 10 +- src/concurrency/cross_thread_buffer.rs | 38 ++++ src/concurrency/lockout.rs | 94 +++++--- src/concurrency/mod.rs | 1 + src/lib.rs | 4 +- src/marker/gc_drop.rs | 4 +- src/plumbing.rs | 2 +- src/r.rs | 4 +- src/scan.rs | 7 +- src/smart_ptr/deref_gc.rs | 4 +- src/smart_ptr/gc.rs | 28 ++- src/std_impls/mod.rs | 2 +- tests/integration.rs | 58 ++--- 22 files changed, 659 insertions(+), 537 deletions(-) create mode 100644 src/collector/ref_cnt.rs create mode 100644 src/concurrency/cross_thread_buffer.rs diff --git a/Cargo.toml b/Cargo.toml index 6d2978d..fdabddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ crossbeam = "0.8.1" dynqueue = { version = "0.3.0", features = ["crossbeam-queue"] } log = "0.4.14" once_cell = "1.8" +num_cpus = "1.0" parking_lot = "0.11.2" rayon = "1.5" rental = "0.5.6" @@ -25,14 +26,15 @@ shredder_derive = "0.2.0" #shredder_derive = { git = "https://github.com/Others/shredder_derive.git" } #shredder_derive = { path = "../shredder_derive" } stable_deref_trait = "1.2" +thread_local = "1.1" [dev-dependencies] paste = "1.0" rand = "0.8.3" trybuild = "1.0" -#[profile.release] -#debug = true +[profile.release] +debug = true [features] default = [] diff --git a/README.md b/README.md index 828d00c..4099950 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,6 @@ of the data has unpredictable cycles in it. (So Arc would not be appropriate.) - guarded access: accessing `Gc` data requires acquiring a guard (although you can use `DerefGc` in many cases to avoid this) - multiple collectors: only a single global collector is supported - can't handle `Rc`/`Arc`: requires all `Gc` objects have straightforward ownership semantics -- collection optimized for speed, not memory use: `Gc` and internal metadata is small, but there is bloat during collection (will fix!) - no no-std support: The collector requires threading and other `std` features (will fix!) Getting Started diff --git a/src/atomic.rs b/src/atomic.rs index 83d19c2..7f81e10 100644 --- a/src/atomic.rs +++ b/src/atomic.rs @@ -4,26 +4,31 @@ use std::ptr::drop_in_place; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; -use crate::collector::{GcData, InternalGcRef, COLLECTOR}; +use crate::collector::{GcData, InternalGcRef, RefCountPolicy, COLLECTOR}; use crate::marker::{GcDeref, GcSafe}; use crate::{Finalize, Gc, Scan, Scanner}; /// An atomic `Gc`, useful for concurrent algorithms /// -/// This has more overhead than an `AtomicPtr`, but cleanly handles memory management. It also is -/// similar to `Gc` in that it can be cloned, and therefore easily shared. -/// -/// A good analogy would be to the excellent `arc-swap` crate. However, we can be more performant, -/// as relying on the collector lets us avoid some synchronization. +/// This has more overhead than an `AtomicPtr`, but cleanly handles memory management. (Similar +/// to the excellent `arc-swap` crate or crossbeam's `Atomic`.) /// /// `AtomicGc` should be fairly fast, but you may not assume it does not block. In fact in the /// presence of an active garbage collection operation, all operations will block. Otherwise /// it shouldn't block. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct AtomicGc { - // It is only safe to read the data here if a collection is not happening - atomic_ptr: Arc>, - backing_handle: InternalGcRef, + // This is a pointer to the data that this "AtomicGc" is pointing to. This is taken from an `Arc` + // and is only valid as long as that `Arc` is valid. However, we know that the collector must + // hold arcs to the data, so as long as the data is live, this pointer is valid. + // + // Only in a `drop` or `finalize` call (in the background thread) will the data no longer be + // live. But the contracts of `GcDrop` and `Finalize` require that no methods on `AtomicGc` are + // called. + // + // Taken together, this means that this pointer is always valid when executing a method on + // `AtomicGc`. + atomic_ptr: AtomicPtr, _mark: PhantomData>, } @@ -32,7 +37,7 @@ impl AtomicGc { /// /// The created `AtomicGc` will point to the same data as `data` #[must_use] - pub fn new(data: &Gc) -> Self { + pub fn new(data: Gc) -> Self { // Ensure we don't create an atomic out of dead data... data.assert_live(); @@ -41,19 +46,27 @@ impl AtomicGc { // Carefully craft a ptr to store atomically let data_arc = data.internal_handle_ref().data(); - let data_ptr = Arc::as_ptr(data_arc); - - let atomic_ptr = Arc::new(AtomicPtr::new(data_ptr as _)); + let atomic_ptr = AtomicPtr::new(Arc::as_ptr(data_arc) as _); + // Forget the initial data, we will absorb its reference counts + data.drop_preserving_reference_counts(); Self { - atomic_ptr: atomic_ptr.clone(), - backing_handle: COLLECTOR.new_handle_for_atomic(atomic_ptr), + atomic_ptr, _mark: PhantomData, } } - pub(crate) fn internal_handle(&self) -> InternalGcRef { - self.backing_handle.clone() + // NOTE: Throughout the methods here, the `collection_blocker_spinlock` is used to protect + // against concurrently changing the graph while the collector is running. + // + // TODO: Validate if we could make the collector work without this + + #[inline] + unsafe fn arc_ptr_to_new_arc(v: *const GcData) -> Arc { + let temp = Arc::from_raw(v); + let new = temp.clone(); + mem::forget(temp); + new } /// `load` the data from this `AtomicGc`, getting back a `Gc` @@ -61,26 +74,16 @@ impl AtomicGc { /// The ordering/atomicity guarantees are identical to `AtomicPtr::load` #[must_use] pub fn load(&self, ordering: Ordering) -> Gc { - let ptr; - let internal_handle; - { - let _collection_blocker = COLLECTOR.get_collection_blocker_spinlock(); - - // Safe to manipulate this ptr only because we have the `_collection_blocker` - // (And we know this `Arc` still has a pointer in the collector data structures, - // otherwise someone would be accessing an `AtomicGc` pointing to freed data--which - // is impossible in safe code.) - let gc_data_ptr = self.atomic_ptr.load(ordering); - let gc_data_temp = unsafe { Arc::from_raw(gc_data_ptr) }; + // No need for collection blocker, as we're not modifying the graph + // This is safe. See comment on the `atomic_ptr` field + let gc_data_ptr = self.atomic_ptr.load(ordering); - // Create a new `Arc` pointing to the same data, but don't invalidate the existing `Arc` - // (which is effectively "behind" the pointer) - let new_gc_data_ref = gc_data_temp.clone(); - mem::forget(gc_data_temp); + // Create a new `Arc` pointing to the same data, but don't invalidate the existing `Arc` + // (which is actually stored in the collector metadata struct) + let data = unsafe { Self::arc_ptr_to_new_arc(gc_data_ptr) }; - ptr = new_gc_data_ref.scan_ptr().cast(); - internal_handle = COLLECTOR.handle_from_data(new_gc_data_ref); - } + let ptr = data.scan_ptr().cast(); + let internal_handle = InternalGcRef::new(data, RefCountPolicy::FromExistingHandle); Gc::new_raw(internal_handle, ptr) } @@ -88,151 +91,139 @@ impl AtomicGc { /// `store` new data into this `AtomicGc` /// /// The ordering/atomicity guarantees are identical to `AtomicPtr::store` - pub fn store(&self, v: &Gc, ordering: Ordering) { + pub fn store(&self, new: Gc, ordering: Ordering) { // Ensure we're not storing dead data... - v.assert_live(); - - let data = v.internal_handle_ref().data(); - let raw_data_ptr = Arc::as_ptr(data); + new.assert_live(); + let raw_data_ptr = Arc::as_ptr(new.internal_handle_ref().data()); { + // Need the collection blocker as we are mutating the graph let _collection_blocker = COLLECTOR.get_collection_blocker_spinlock(); - // Safe to manipulate this ptr only because we have the `_collection_blocker` - // (And we know this `Arc` still has a pointer in the collector data structures, - // otherwise someone would be accessing an `AtomicGc` pointing to freed data--which - // is impossible in safe code.) - self.atomic_ptr.store(raw_data_ptr as _, ordering); - } - } + // We absorb the reference counts of the data we're storing + // TODO: Is this actually more efficient that taking by reference and incrementing? Do we want to support both? + new.drop_preserving_reference_counts(); - /// `swap` what data is stored in this `AtomicGc`, getting a `Gc` to the old data back - /// - /// The ordering/atomicity guarantees are identical to `AtomicPtr::swap` - #[must_use] - pub fn swap(&self, v: &Gc, ordering: Ordering) -> Gc { - // Ensure we're not storing dead data... - v.assert_live(); - - let data = v.internal_handle_ref().data(); - let raw_data_ptr = Arc::as_ptr(data); - - let ptr; - let internal_handle; - { - let _collection_blocker = COLLECTOR.get_collection_blocker_spinlock(); - let old_data_ptr = self.atomic_ptr.swap(raw_data_ptr as _, ordering); + // Safe to change this ptr only because we have the `_collection_blocker` + let old_data = self.atomic_ptr.swap(raw_data_ptr as _, ordering); + let old_arc = unsafe { Arc::from_raw(old_data) }; - // Safe to manipulate this ptr only because we have the `_collection_blocker` - // (And we know this `Arc` still has a pointer in the collector data structures, - // otherwise someone would be accessing an `AtomicGc` pointing to freed data--which - // is impossible in safe code.) - let old_data_arc = unsafe { Arc::from_raw(old_data_ptr) }; - let gc_data = old_data_arc.clone(); - mem::forget(old_data_arc); - - ptr = gc_data.scan_ptr().cast(); - internal_handle = COLLECTOR.handle_from_data(gc_data); + // The count of the data going out decreases + COLLECTOR.decrement_reference_count(&old_arc); + mem::forget(old_arc); } - - Gc::new_raw(internal_handle, ptr) } - /// Do a CAS operation. If this `AtomicGc` points to the same data as `current` then after this - /// operation it will point to the same data as `new`. (And this happens atomically.) - /// - /// Data is compared for pointer equality. NOT `Eq` equality. (A swap will only happen if - /// `current` and this `AtomicGc` point to the same underlying allocation.) - /// - /// The ordering/atomicity guarantees are identical to `AtomicPtr::compare_and_swap` + /// `swap` new data with the data in this `AtomicGc` /// - /// # Returns - /// Returns `true` if the swap happened and this `AtomicGc` now points to `new` - /// Returns `false` if the swap failed / this `AtomicGc` was not pointing to `current` - #[allow(clippy::must_use_candidate)] - #[allow(deprecated)] - pub fn compare_and_swap(&self, current: &Gc, new: &Gc, ordering: Ordering) -> bool { + /// The ordering/atomicity guarantees are identical to `AtomicPtr::swap` + pub fn swap(&self, new: Gc, ordering: Ordering) -> Gc { // Ensure we're not storing dead data... new.assert_live(); - // Turn guess data into a raw ptr - let guess_data = current.internal_handle_ref().data(); - let guess_data_raw = Arc::as_ptr(guess_data) as _; - - // Turn new data into a raw ptr - let new_data = new.internal_handle_ref().data(); - let new_data_raw = Arc::as_ptr(new_data) as _; + let raw_data_ptr = Arc::as_ptr(new.internal_handle_ref().data()); - let compare_res; { + // Need the collection blocker as we are mutating the graph let _collection_blocker = COLLECTOR.get_collection_blocker_spinlock(); - // Safe to manipulate this ptr only because we have the `_collection_blocker` - // (And we know this `Arc` still has a pointer in the collector data structures, - // otherwise someone would be accessing an `AtomicGc` pointing to freed data--which - // is impossible in safe code.) - compare_res = self - .atomic_ptr - .compare_and_swap(guess_data_raw, new_data_raw, ordering); - } - compare_res == guess_data_raw + let old_data_ptr = self.atomic_ptr.swap(raw_data_ptr as _, ordering); + // We absorb the reference counts of the data we're storing + new.drop_preserving_reference_counts(); + + // Then we return out the old data + let old_data = unsafe { Self::arc_ptr_to_new_arc(old_data_ptr) }; + let old_ptr = old_data.underlying_allocation.scan_ptr.cast(); + let internal_handle = + InternalGcRef::new(old_data, RefCountPolicy::InheritExistingCounts); + Gc::new_raw(internal_handle, old_ptr) + } } - /// Do a CAE operation. If this `AtomicGc` points to the same data as `current` then after this - /// operation it will point to the same data as `new`. (And this happens atomically.) - /// - /// Data is compared for pointer equality. NOT `Eq` equality. (A swap will only happen if - /// `current` and this `AtomicGc` point to the same underlying allocation.) + /// Execute a `compare_exchange` operation /// - /// The ordering/atomicity guarantees are identical to `AtomicPtr::compare_exchange`, refer to - /// that documentation for documentation about `success` and `failure` orderings. + /// The ordering/atomicity guarantees are identical to `AtomicPtr::compare_exchange` /// - /// # Returns - /// Returns `true` if the swap happened and this `AtomicGc` now points to `new` - /// Returns `false` if the swap failed / this `AtomicGc` was not pointing to `current` - #[allow(clippy::must_use_candidate)] + /// # Errors + /// On success returns `Ok(previous_value)` (which is guaranteed to be the same as `current`) + /// On failure returns an error containing the current value, and the `new` value passed in pub fn compare_exchange( &self, current: &Gc, - new: &Gc, + new: Gc, success: Ordering, failure: Ordering, - ) -> bool { + ) -> Result, CompareExchangeError> { // Ensure we're not storing dead data... new.assert_live(); - let guess_data = current.internal_handle_ref().data(); - let guess_data_raw = Arc::as_ptr(guess_data) as _; - - let new_data = new.internal_handle_ref().data(); - let new_data_raw = Arc::as_ptr(new_data) as _; + let guess_ptr = Arc::as_ptr(current.internal_handle_ref().data()); + let new_ptr = Arc::as_ptr(new.internal_handle_ref().data()); - let swap_result; { + // Need the collection blocker as we are mutating the graph let _collection_blocker = COLLECTOR.get_collection_blocker_spinlock(); - // Safe to manipulate this ptr only because we have the `_collection_blocker` - // (And we know this `Arc` still has a pointer in the collector data structures, - // otherwise someone would be accessing an `AtomicGc` pointing to freed data--which - // is impossible in safe code.) - swap_result = + + let exchange_res = self.atomic_ptr - .compare_exchange(guess_data_raw, new_data_raw, success, failure); + .compare_exchange(guess_ptr as _, new_ptr as _, success, failure); + + match exchange_res { + Ok(old) => { + // Get the old value + let old_data = unsafe { Self::arc_ptr_to_new_arc(old) }; + let old_ptr = old_data.underlying_allocation.scan_ptr.cast(); + + // We absorb the reference counts of the data we're storing + new.drop_preserving_reference_counts(); + // Our current reference counts aer being inhereted by the new data + let internal_handle = + InternalGcRef::new(old_data, RefCountPolicy::InheritExistingCounts); + + Ok(Gc::new_raw(internal_handle, old_ptr)) + } + Err(current) => { + let current = unsafe { Self::arc_ptr_to_new_arc(current) }; + let current_ptr = current.underlying_allocation.scan_ptr.cast(); + + let internal_handle = + InternalGcRef::new(current, RefCountPolicy::FromExistingHandle); + + let current = Gc::new_raw(internal_handle, current_ptr); + + Err(CompareExchangeError { current, new }) + } + } } - - swap_result.is_ok() } +} - // TODO: Compare and swap/compare and exchange that return the current value +/// If a `compare_exchange` operation fails, this error is returned +/// +/// It contains the actual value that was in the `AtomicGc`, as well as the `new` value that was +/// passed in to the `compare_exchange` operation +pub struct CompareExchangeError { + /// The value that was in the `AtomicGc` at the time of the `compare_exchange` operation + pub current: Gc, + /// The value that was in the `new` parameter when you called `compare_exchange` + pub new: Gc, } unsafe impl Scan for AtomicGc { fn scan(&self, scanner: &mut Scanner<'_>) { - scanner.add_internal_handle(self.internal_handle()); + // This is safe for the same reasons as `AtomicPtr::load` + let gc_data_ptr = self.atomic_ptr.load(Ordering::SeqCst); + let gc_data = unsafe { Self::arc_ptr_to_new_arc(gc_data_ptr) }; + + let internal_handle = InternalGcRef::new(gc_data, RefCountPolicy::TransientHandle); + + scanner.add_internal_handle(&internal_handle); } } unsafe impl GcSafe for AtomicGc {} // unsafe impl !GcDrop for AtomicGc {} +// This is valid, as `AtomicGc` does its own sychronization with the collector unsafe impl GcDeref for AtomicGc {} unsafe impl Finalize for AtomicGc { @@ -243,7 +234,9 @@ unsafe impl Finalize for AtomicGc { impl Drop for AtomicGc { fn drop(&mut self) { - // Manually cleanup the backing handle... - self.backing_handle.invalidate(); + // This is safe, since `Finalize` and `GcDrop` rules prevent reviving an `AtomicGc` + // (and the background dropper always preserves the `Arc` until all drop/finalize are run) + let x = *self.atomic_ptr.get_mut(); + COLLECTOR.decrement_reference_count(unsafe { &*x }); } } diff --git a/src/collector/alloc.rs b/src/collector/alloc.rs index 9e74ccc..10e428c 100644 --- a/src/collector/alloc.rs +++ b/src/collector/alloc.rs @@ -203,7 +203,7 @@ impl GcAllocation { } } - pub fn scan(&self, callback: F) { + pub fn scan(&self, callback: F) { unsafe { let mut scanner = Scanner::new(callback); let to_scan = &*self.scan_ptr; diff --git a/src/collector/collect_impl.rs b/src/collector/collect_impl.rs index f4eb08d..71cb8f6 100644 --- a/src/collector/collect_impl.rs +++ b/src/collector/collect_impl.rs @@ -1,146 +1,123 @@ -use std::sync::atomic::Ordering; - -use crossbeam::deque::Injector; use crossbeam::queue::SegQueue; use dynqueue::IntoDynQueue; -use parking_lot::{MutexGuard, RwLock}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::sync::atomic::Ordering; use crate::collector::dropper::DropMessage; -use crate::collector::{Collector, GcExclusiveWarrant}; +use crate::collector::Collector; use crate::concurrency::lockout::Lockout; +use parking_lot::MutexGuard; + impl Collector { pub(super) fn do_collect(&self, gc_guard: MutexGuard<'_, ()>) { - // Be careful modifying this method. The tracked data and tracked handles can change underneath us + // TODO: Improve this comment + // Be careful modifying this method. The tracked data, reference counts, and to some extent + // the graph, can change underneath us. + // // Currently the state is this, as far as I can tell: - // - New handles are conservatively seen as roots if seen at all while we are touching handles - // (there is nowhere a new "secret root" can be created and then the old root stashed and seen as non-rooted) - // - New data is treated as a special case, and only deallocated if it existed at the start of collection - // - Deleted handles cannot make the graph "more connected" if the deletion was not observed + // - New data is always seen as rooted as long is it is allocated after the graph freezing step + // - After graph freezing (where we take all the Lockouts we can) there is no way to + // smuggle items in or out of the graph + // - The reference count preperation is conservative (if concurrently modified, the graph will simply look more connected) trace!("Beginning collection"); let _atomic_spinlock_guard = self.atomic_spinlock.lock_exclusive(); - let current_collection = self - .tracked_data - .current_collection_number - .load(Ordering::SeqCst); - // Here we synchronize destructors: this ensures that handles in objects in the background thread are dropped - // Otherwise we'd see those handles as rooted and keep them around. + // Otherwise we'd see those handles as rooted and keep them around. (This would not lead to incorrectness, but + // this improves consistency and determinism.) + // // This makes a lot of sense in the background thread (since it's totally async), // but may slow direct calls to `collect`. self.synchronize_destructors(); - // The warrant system prevents us from scanning in-use data - let warrants: Injector = Injector::new(); - // eprintln!("tracked data {:?}", tracked_data); // eprintln!("tracked handles {:?}", tracked_handles); - // In this step we calculate what's not rooted by marking all data definitively in a Gc - self.tracked_data.data.par_iter(|data| { - // If data.last_marked == 0, then it is new data. Update that we've seen this data - // (this step helps synchronize what data is valid to be deallocated) - if data.last_marked.load(Ordering::SeqCst) == 0 { - data.last_marked - .store(current_collection - 1, Ordering::SeqCst); + // First, go through the data, resetting all the reference count trackers, + // and taking exclusive warrants where possible + self.tracked_data.par_iter(|data| { + unsafe { + // Safe as we are the collector + Lockout::try_take_exclusive_access_unsafe(&data); } + // This can be done concurrently with the `Lockout` managment, since the ref-count snapshot is conservative + // TODO: Double check this logic + data.ref_cnt.prepare_for_collection(); + }); - if let Some(warrant) = Lockout::get_exclusive_warrant(data.clone()) { - // Save that warrant so things can't shift around under us - warrants.push(warrant); - - // Now figure out what handles are not rooted + // Then adjust reference counts to figure out what is rooted + self.tracked_data.par_iter(|data| { + if Lockout::unsafe_exclusive_access_taken(&data) { data.underlying_allocation.scan(|h| { - h.handle_ref - .v - .last_non_rooted - .store(current_collection, Ordering::SeqCst); + h.data_ref.ref_cnt.found_once_internally(); }); } else { - // eprintln!("failed to get warrant!"); - // If we can't get the warrant, then this data must be in use, so we can mark it - data.last_marked.store(current_collection, Ordering::SeqCst); + // Someone else had this data during the collection, so it is clearly rooted + data.ref_cnt.override_mark_as_rooted(); } }); - // The handles that were not just marked need to be treated as roots + // Now we need to translate our set of roots into a queue + // TODO: This is the only allocation in the collector at this point, probably is removable or re-usable let roots = SegQueue::new(); - self.tracked_data.handles.par_iter(|handle| { - // If the `last_non_rooted` number was not now, then it is a root - if handle.last_non_rooted.load(Ordering::SeqCst) != current_collection { - roots.push(handle); + self.tracked_data.par_iter(|data| { + if data.ref_cnt.is_rooted() { + // We need to scan data that dynamically becomes rooted, so we use the `override_mark_as_rooted` + // flag to track what we've enqued to scan already + data.ref_cnt.override_mark_as_rooted(); + roots.push(data); } }); - // eprintln!("roots {:?}", roots); - - // This step is dfs through the object graph (starting with the roots) - // We mark each object we find let dfs_stack = roots.into_dyn_queue(); - dfs_stack - .into_par_iter() - .for_each(|(queue, handle)| unsafe { - handle.underlying_data.with_data(|data| { - // If this data is new, we don't want to `Scan` it, since we may not have its Lockout - // Any handles inside this could not of been seen in step 1, so they'll be rooted anyway - if data.last_marked.load(Ordering::SeqCst) != 0 { - // Essential note! All non-new non-warranted data is automatically marked - // Thus we will never accidentally scan non-warranted data here - let previous_mark = - data.last_marked.swap(current_collection, Ordering::SeqCst); - - // Since we've done an atomic swap, we know we've already scanned this iff it was marked - // (excluding data marked because we couldn't get its warrant, who's handles would be seen as roots) - // This stops us for scanning data more than once and, crucially, concurrently scanning the same data - if previous_mark != current_collection { - data.last_marked.store(current_collection, Ordering::SeqCst); - - data.underlying_allocation.scan(|h| { - let mut should_enque = false; - h.handle_ref.v.underlying_data.with_data(|scanned_data| { - if scanned_data.last_marked.load(Ordering::SeqCst) - != current_collection - { - should_enque = true; - } - }); - if should_enque { - queue.enqueue(h.handle_ref.v); - } - }); - } + dfs_stack.into_par_iter().for_each(|(queue, data)| { + debug_assert!(!data.deallocated.load(Ordering::SeqCst)); + + if Lockout::unsafe_exclusive_access_taken(&data) { + data.underlying_allocation.scan(|h| { + let ref_cnt = &h.data_ref.ref_cnt; + // We need to scan data that dynamically becomes rooted, so we use the `override_mark_as_rooted` + // flag to track what we've enqued to scan already. (So we can't just use `is_rooted` here.) + if !ref_cnt.was_overriden_as_rooted() { + // This is technically racy, since we check the rooting status, THEN mark as rooted/enqueue + // But that doesn't matter since the worse that can happen is that we enqueue the data twice + ref_cnt.override_mark_as_rooted(); + queue.enqueue(h.data_ref.clone()); } - }) - }); - // We're done scanning things, and have established what is marked. Release the warrants - drop(warrants); + }); + } else { + // Someone else had this data during the collection, so it is clearly rooted + data.ref_cnt.override_mark_as_rooted(); + } + }); + + // We are done scanning, so release any warrants + self.tracked_data.par_iter(|data| unsafe { + Lockout::try_release_exclusive_access_unsafe(&data); + }); + + // Since new refcnts are created as rooted, and new data is created with new refcnts, we + // can safely treat the refcnt data as definitive // Now cleanup by removing all the data that is done for - let to_drop = RwLock::new(Vec::new()); + let to_drop = self.dropper.get_buffer(); - self.tracked_data.data.par_retain(|data| { - // Mark the new data as in use for now - // This stops us deallocating data that was allocated during collection - if data.last_marked.load(Ordering::SeqCst) == 0 { - data.last_marked.store(current_collection, Ordering::SeqCst); + self.tracked_data.par_retain(|data| { + let is_marked = data.ref_cnt.is_rooted(); + if is_marked { + // this is marked so retain it + return true; } - // If this is true, we just marked this data - if data.last_marked.load(Ordering::SeqCst) == current_collection { - // so retain it - true - } else { - // Otherwise we didn't mark it and it should be deallocated - // eprintln!("deallocating {:?}", data_ptr); - // Send it to the drop thread to be dropped - to_drop.write().push(data.clone()); + // Otherwise we didn't mark it and it should be deallocated + // eprintln!("deallocating {:?}", data_ptr); + // Send it to the drop thread to be dropped + to_drop.push(data.clone()); - // Don't retain this data - false - } + // Don't retain this data + false }); // Send off the data to be dropped in the background @@ -153,11 +130,6 @@ impl Collector { self.trigger .set_data_count_after_collection(self.tracked_data_count()); - // update collection number - self.tracked_data - .current_collection_number - .fetch_add(1, Ordering::SeqCst); - drop(gc_guard); trace!("Collection finished"); diff --git a/src/collector/data.rs b/src/collector/data.rs index 1202236..dee85dc 100644 --- a/src/collector/data.rs +++ b/src/collector/data.rs @@ -1,7 +1,8 @@ -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use crate::collector::alloc::GcAllocation; +use crate::collector::ref_cnt::GcRefCount; use crate::concurrency::lockout::{Lockout, LockoutProvider}; use crate::Scan; @@ -12,9 +13,8 @@ pub struct GcData { pub(crate) lockout: Lockout, /// have we started deallocating this piece of data yet? pub(crate) deallocated: AtomicBool, - // During what collection was this last marked? - // 0 if this is a new piece of data - pub(crate) last_marked: AtomicU64, + // reference count + pub(crate) ref_cnt: GcRefCount, /// a wrapper to manage (ie deallocate) the underlying allocation pub(crate) underlying_allocation: GcAllocation, } @@ -26,38 +26,7 @@ impl LockoutProvider for Arc { } impl GcData { - pub fn scan_ptr(&self) -> *const dyn Scan { + pub(crate) fn scan_ptr(&self) -> *const dyn Scan { self.underlying_allocation.scan_ptr } } - -/// There is one `GcHandle` per `Gc`. We need this metadata for collection -#[derive(Debug)] -pub struct GcHandle { - /// what data is backing this handle - pub(crate) underlying_data: UnderlyingData, - // During what collection was this last found in a piece of GcData? - // 0 if this is a new piece of data - pub(crate) last_non_rooted: AtomicU64, -} - -#[derive(Debug)] -pub enum UnderlyingData { - Fixed(Arc), - DynamicForAtomic(Arc>), -} - -impl UnderlyingData { - // Safe only if called when the data is known to be live, and you know atomics can't be modified - // (Basically only okay to call in the collector itself) - #[inline] - pub unsafe fn with_data(&self, f: F) { - match self { - Self::Fixed(data) => f(&*data), - Self::DynamicForAtomic(ptr) => { - let arc_ptr = ptr.load(Ordering::Relaxed); - f(&*arc_ptr) - } - } - } -} diff --git a/src/collector/dropper.rs b/src/collector/dropper.rs index 594aa99..6b04eb3 100644 --- a/src/collector/dropper.rs +++ b/src/collector/dropper.rs @@ -3,44 +3,47 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::thread::spawn; -use crossbeam::channel::{self, SendError, Sender}; -use parking_lot::RwLock; -use rayon::iter::IntoParallelRefIterator; -use rayon::iter::ParallelIterator; +use crossbeam::channel::{self, Receiver, SendError, Sender}; use crate::collector::GcData; +use crate::concurrency::cross_thread_buffer::CrossThreadBuffer; + +type DropBuffer = CrossThreadBuffer>; pub(crate) struct BackgroundDropper { - sender: Sender, + // TODO: This would probably be marginally more efficient with non-channel based synchronization + drop_message_sender: Sender, + buffer_recycler: Receiver, } pub(crate) enum DropMessage { /// Signals the `BackgroundDropper` to deallocate the following data (possibly running some destructor) - DataToDrop(RwLock>>), + DataToDrop(DropBuffer), /// Indicates to the `BackgroundDropper` that it should sync up with the calling code SyncUp(Sender<()>), } impl BackgroundDropper { + const RECYCLING_CHANNEL_SIZE: usize = 1; + pub fn new() -> Self { - let (sender, receiver) = channel::unbounded(); + let (drop_message_sender, drop_message_retriever) = channel::unbounded(); + let (recycling_sender, recycling_receiver) = channel::bounded(Self::RECYCLING_CHANNEL_SIZE); // The drop thread deals with doing all the Drops this collector needs to do spawn(move || { // An Err value means the stream will never recover - while let Ok(drop_msg) = receiver.recv() { + while let Ok(drop_msg) = drop_message_retriever.recv() { match drop_msg { - DropMessage::DataToDrop(to_drop) => { - let to_drop = to_drop.read(); - + DropMessage::DataToDrop(mut to_drop) => { // NOTE: It's important that all data is correctly marked as deallocated before we start - to_drop.par_iter().for_each(|data| { + to_drop.par_for_each(|data| { // Mark this data as in the process of being deallocated and unsafe to access data.deallocated.store(true, Ordering::SeqCst); }); // Then run the drops if needed - to_drop.par_iter().for_each(|data| { + to_drop.par_for_each(|data| { let underlying_allocation = data.underlying_allocation; let res = catch_unwind(move || unsafe { underlying_allocation.deallocate(); @@ -49,20 +52,41 @@ impl BackgroundDropper { eprintln!("Gc background drop failed: {:?}", e); } }); + + // Then clear and recycle the buffer + to_drop.clear(); + // ignore recycling failures + let recycling_error = recycling_sender.try_send(to_drop); + if let Err(e) = recycling_error { + error!("Error recycling drop buffer {:?}", e); + } } DropMessage::SyncUp(responder) => { if let Err(e) = responder.send(()) { - eprintln!("Gc background syncup failed: {:?}", e); + error!("Gc background syncup failed: {:?}", e); } } } } }); - Self { sender } + Self { + drop_message_sender, + buffer_recycler: recycling_receiver, + } } pub fn send_msg(&self, msg: DropMessage) -> Result<(), SendError> { - self.sender.send(msg) + self.drop_message_sender.send(msg) + } + + pub fn get_buffer(&self) -> DropBuffer { + self.buffer_recycler.try_recv().unwrap_or_default() + } +} + +impl Default for BackgroundDropper { + fn default() -> Self { + Self::new() } } diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 0af2477..076f56e 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -2,10 +2,11 @@ mod alloc; mod collect_impl; mod data; mod dropper; +mod ref_cnt; mod trigger; use std::ptr; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread::spawn; @@ -17,35 +18,82 @@ use crate::collector::alloc::GcAllocation; use crate::collector::dropper::{BackgroundDropper, DropMessage}; use crate::collector::trigger::GcTrigger; use crate::concurrency::atomic_protection::{APSInclusiveGuard, AtomicProtectingSpinlock}; -use crate::concurrency::chunked_ll::{CLLItem, ChunkedLinkedList}; -use crate::concurrency::lockout::{ExclusiveWarrant, Lockout, Warrant}; +use crate::concurrency::chunked_ll::ChunkedLinkedList; +use crate::concurrency::lockout::{Lockout, Warrant}; use crate::marker::GcDrop; use crate::{Finalize, Scan, ToScan}; -pub use crate::collector::data::{GcData, GcHandle, UnderlyingData}; +pub use crate::collector::data::GcData; +use crate::collector::ref_cnt::GcRefCount; -/// Intermediate struct. `Gc` holds a `InternalGcRef`, which references a `GcHandle` -/// There should be one `GcHandle` per `Gc` -#[derive(Clone, Debug)] +/// Intermediate struct. `Gc` holds a `InternalGcRef`, which owns incrementing and decrementing +/// the reference count of the stored data. +#[derive(Debug)] pub struct InternalGcRef { - handle_ref: CLLItem, + data_ref: Arc, + invalidated: AtomicBool, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub(crate) enum RefCountPolicy { + // A transient handle doesn't increment or decrement the reference count + TransientHandle, + // On initial creation, we assume the reference count is exactly one, and only increment the handle count + // Then on deallocation, we decrement both the data and handle count + InitialCreation, + // If a handle already exists, we manage both the data reference and handle counts + FromExistingHandle, + // The reference counts are being inherited from another source (used mostly by `AtomicGc`) + InheritExistingCounts, } impl InternalGcRef { - pub(crate) fn new(handle_ref: CLLItem) -> Self { - Self { handle_ref } + #[inline] + #[allow(clippy::match_same_arms)] + pub(crate) fn new(data_ref: Arc, ref_cnt_policy: RefCountPolicy) -> Self { + match &ref_cnt_policy { + RefCountPolicy::TransientHandle => { + // No action: this is a transient handle + } + RefCountPolicy::InheritExistingCounts => { + // No action: we are inheriting the reference counts from another source + } + RefCountPolicy::InitialCreation => { + debug_assert_eq!(data_ref.ref_cnt.snapshot_ref_count(), 1); + // Increment handle count only + COLLECTOR.increment_handle_count(); + } + RefCountPolicy::FromExistingHandle => { + COLLECTOR.increment_reference_count(&data_ref); + } + } + + let pre_invalidated = matches!(ref_cnt_policy, RefCountPolicy::TransientHandle); + + Self { + data_ref, + invalidated: AtomicBool::new(pre_invalidated), + } } + #[inline] pub(crate) fn invalidate(&self) { COLLECTOR.drop_handle(self); } + #[inline] + pub(crate) fn is_invalidated(&self) -> bool { + self.invalidated.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn invalidate_without_touching_reference_counts(&self) { + self.invalidated.store(true, Ordering::Relaxed); + } + + #[inline] pub(crate) fn data(&self) -> &Arc { - if let UnderlyingData::Fixed(data) = &self.handle_ref.v.underlying_data { - data - } else { - panic!("Only fixed data has a usable `data` method") - } + &self.data_ref } } @@ -55,7 +103,6 @@ pub struct GcGuardWarrant { /// stores the internal warrant. only the drop being run is relevant _warrant: Warrant>, } -type GcExclusiveWarrant = ExclusiveWarrant>; pub struct Collector { /// shredder only allows one collection to proceed at a time @@ -70,25 +117,10 @@ pub struct Collector { /// sending to this channel indicates that thread should check the trigger, then collect if the /// trigger indicates it should async_gc_notifier: Sender<()>, - /// all the data we are managing plus metadata about what `Gc`s exist - tracked_data: TrackedData, -} - -/// Stores metadata about each piece of tracked data, plus metadata about each handle -#[derive(Debug)] -struct TrackedData { - /// we increment this whenever we collect - current_collection_number: AtomicU64, /// a set storing metadata on the live data the collector is managing - data: ChunkedLinkedList, - /// a set storing metadata on each live handle (`Gc`) the collector is managing - handles: ChunkedLinkedList, -} - -#[derive(Debug)] -#[must_use] -struct TrackingSetupToken { - data_to_track: Arc, + tracked_data: ChunkedLinkedList, + /// a count of how many handles are live + live_handle_count: AtomicUsize, } // TODO(issue): https://github.com/Others/shredder/issues/7 @@ -103,17 +135,8 @@ impl Collector { trigger: GcTrigger::default(), dropper: BackgroundDropper::new(), async_gc_notifier, - tracked_data: TrackedData { - // This is janky, but we subtract one from the collection number - // to get a previous collection number in `do_collect` - // - // We also use 0 as a sentinel value for newly allocated data - // - // Together that implies we need to start the collection number sequence at 2, not 1 - current_collection_number: AtomicU64::new(2), - data: ChunkedLinkedList::new(), - handles: ChunkedLinkedList::new(), - }, + tracked_data: ChunkedLinkedList::new(), + live_handle_count: AtomicUsize::new(0), }); // The async Gc thread deals with background Gc'ing @@ -175,15 +198,8 @@ impl Collector { T: Scan + GcDrop, F: FnOnce(InternalGcRef, *const T) -> T, { - let (gc_data_ptr, uninit_ptr) = GcAllocation::allocate_uninitialized_with_drop(); - let (token, reference) = self.setup_gc_reference(gc_data_ptr); - - let t = init_function(self.clone_handle(&reference), uninit_ptr); - ptr::write(uninit_ptr as *mut T, t); - let init_ptr = uninit_ptr; - - self.track_from_token(token); - (reference, init_ptr) + let (gc_allocation, uninit_ptr) = GcAllocation::allocate_uninitialized_with_drop(); + self.initialize_and_track(init_function, gc_allocation, uninit_ptr) } pub unsafe fn track_with_initializer_and_finalize( @@ -194,125 +210,115 @@ impl Collector { T: Finalize + Scan, F: FnOnce(InternalGcRef, *const T) -> T, { - let (gc_data_ptr, uninit_ptr) = GcAllocation::allocate_uninitialized_with_finalization(); - let (token, reference) = self.setup_gc_reference(gc_data_ptr); - - let t = init_function(self.clone_handle(&reference), uninit_ptr); - ptr::write(uninit_ptr as *mut T, t); - let init_ptr = uninit_ptr; - - self.track_from_token(token); - (reference, init_ptr) + let (gc_allocation, uninit_ptr) = GcAllocation::allocate_uninitialized_with_finalization(); + self.initialize_and_track(init_function, gc_allocation, uninit_ptr) } - fn setup_gc_reference(&self, gc_data_ptr: GcAllocation) -> (TrackingSetupToken, InternalGcRef) { - let new_data_arc = Arc::new(GcData { - underlying_allocation: gc_data_ptr, + unsafe fn initialize_and_track( + &self, + init_function: F, + gc_allocation: GcAllocation, + uninit_ptr: *const T, + ) -> (InternalGcRef, *const T) + where + T: Scan, + F: FnOnce(InternalGcRef, *const T) -> T, + { + let gc_data = Arc::new(GcData { + underlying_allocation: gc_allocation, lockout: Lockout::new(), deallocated: AtomicBool::new(false), - last_marked: AtomicU64::new(0), + // Must start count at 1 to avoid a race condition between inserting the data and creating the handle + ref_cnt: GcRefCount::new(1), }); - let new_handle_arc = Arc::new(GcHandle { - underlying_data: UnderlyingData::Fixed(new_data_arc.clone()), - last_non_rooted: AtomicU64::new(0), - }); + // Take a warrant to prevent the collector from accessing the data while we're initializing it + let warrant = Lockout::try_take_exclusive_warrant(gc_data.clone()) + .expect("lockout just created, so should be avaliable for locking"); - // Insert handle before data -- don't want the data to be observable before there is a relevant handle - let new_handle = self.tracked_data.handles.insert(new_handle_arc); + // Setup our data structures to handle this data + let gc_handle = self.tracked_data.insert(gc_data); + let reference = InternalGcRef::new(gc_handle.v, RefCountPolicy::InitialCreation); - ( - TrackingSetupToken { - data_to_track: new_data_arc, - }, - InternalGcRef::new(new_handle), - ) - } + // Initialize + let t = init_function(self.clone_handle(&reference), uninit_ptr); + ptr::write(uninit_ptr as *mut T, t); - fn track_from_token(&self, token: TrackingSetupToken) { - self.tracked_data.data.insert(token.data_to_track); + // We've written the data, so drop the warrant + drop(warrant); // When we allocate, the heuristic for whether we need to GC might change self.notify_async_gc_thread(); + + (reference, uninit_ptr) } fn track(&self, gc_data_ptr: GcAllocation) -> InternalGcRef { - let (tracking_token, reference) = self.setup_gc_reference(gc_data_ptr); - self.track_from_token(tracking_token); - reference + let item = self.tracked_data.insert(Arc::new(GcData { + underlying_allocation: gc_data_ptr, + lockout: Lockout::new(), + deallocated: AtomicBool::new(false), + // Start the reference count at 1 to avoid a race condition between insertion and handle creation + ref_cnt: GcRefCount::new(1), + })); + let res = InternalGcRef::new(item.v, RefCountPolicy::InitialCreation); + + // When we allocate, the heuristic for whether we need to GC might change + self.notify_async_gc_thread(); + + res } pub fn drop_handle(&self, handle: &InternalGcRef) { - self.tracked_data.handles.remove(&handle.handle_ref); + let was_invalidated = handle.invalidated.swap(true, Ordering::Relaxed); + if !was_invalidated { + self.decrement_reference_count(&handle.data_ref); + } // NOTE: This is worth experimenting with // self.notify_async_gc_thread(); } + #[allow(clippy::unused_self)] pub fn clone_handle(&self, handle: &InternalGcRef) -> InternalGcRef { - let new_handle_arc = Arc::new(GcHandle { - underlying_data: UnderlyingData::Fixed(handle.data().clone()), - last_non_rooted: AtomicU64::new(0), - }); - - let new_handle = self.tracked_data.handles.insert(new_handle_arc); - - InternalGcRef { - handle_ref: new_handle, - } + InternalGcRef::new(handle.data_ref.clone(), RefCountPolicy::FromExistingHandle) } - pub fn handle_from_data(&self, underlying_data: Arc) -> InternalGcRef { - let new_handle_arc = Arc::new(GcHandle { - underlying_data: UnderlyingData::Fixed(underlying_data), - last_non_rooted: AtomicU64::new(0), - }); - - let new_handle = self.tracked_data.handles.insert(new_handle_arc); - - InternalGcRef { - handle_ref: new_handle, - } + pub fn increment_handle_count(&self) { + self.live_handle_count.fetch_add(1, Ordering::Relaxed); } - pub fn new_handle_for_atomic(&self, atomic_ptr: Arc>) -> InternalGcRef { - let new_handle_arc = Arc::new(GcHandle { - underlying_data: UnderlyingData::DynamicForAtomic(atomic_ptr), - last_non_rooted: AtomicU64::new(0), - }); - - let new_handle = self.tracked_data.handles.insert(new_handle_arc); + pub fn increment_reference_count(&self, data: &GcData) { + data.ref_cnt.inc_count(); + self.live_handle_count.fetch_add(1, Ordering::Relaxed); + } - InternalGcRef { - handle_ref: new_handle, - } + pub fn decrement_reference_count(&self, data: &GcData) { + data.ref_cnt.dec_count(); + // NOTE: This will wrap around on overflow + self.live_handle_count.fetch_sub(1, Ordering::Relaxed); } + // TODO: Fix the abstraction layer between `InternalGcRef` and `Collector` #[allow(clippy::unused_self)] pub fn get_data_warrant(&self, handle: &InternalGcRef) -> GcGuardWarrant { - // This check is only necessary in the destructors - // The destructor thread will always set the `deallocated` flag before deallocating data - if let UnderlyingData::Fixed(fixed) = &handle.handle_ref.v.underlying_data { - let data_deallocated = fixed.deallocated.load(Ordering::SeqCst); + let data_deallocated = handle.data_ref.deallocated.load(Ordering::SeqCst); - if data_deallocated { - panic!("Tried to access into a Gc, but the internal state was corrupted (perhaps you're manipulating Gc in a destructor?)"); - } + if data_deallocated { + panic!("Tried to access into a Gc, but the internal state was corrupted (perhaps you're manipulating Gc in a destructor?)"); + } - GcGuardWarrant { - _warrant: Lockout::get_warrant(fixed.clone()), - } - } else { - panic!("Cannot get data warrant for atomic data!") + GcGuardWarrant { + _warrant: Lockout::take_warrant(handle.data_ref.clone()), } } pub fn tracked_data_count(&self) -> usize { - self.tracked_data.data.estimate_len() + self.tracked_data.estimate_len() } pub fn handle_count(&self) -> usize { - self.tracked_data.handles.estimate_len() + self.live_handle_count.load(Ordering::Relaxed) } pub fn set_gc_trigger_percent(&self, new_trigger_percent: f32) { @@ -348,8 +354,8 @@ impl Collector { pub fn check_then_collect(&self) -> bool { let gc_guard = self.gc_lock.lock(); - let current_data_count = self.tracked_data.data.estimate_len(); - let current_handle_count = self.tracked_data.handles.estimate_len(); + let current_data_count = self.tracked_data.estimate_len(); + let current_handle_count = self.live_handle_count.load(Ordering::Relaxed); if self .trigger .should_collect(current_data_count, current_handle_count) @@ -382,19 +388,12 @@ pub(crate) fn get_mock_handle() -> InternalGcRef { let mock_scannable: Box = Box::new(MockAllocation); - // This leaks some memory... - let mock_master_list = ChunkedLinkedList::new(); - - // Note: Here we assume a random u64 is unique. That's hacky, but is fine for testing :) - let handle_arc = Arc::new(GcHandle { - underlying_data: UnderlyingData::Fixed(Arc::new(GcData { - underlying_allocation: unsafe { GcAllocation::raw(Box::into_raw(mock_scannable)) }, - lockout: Lockout::new(), - deallocated: AtomicBool::new(false), - last_marked: AtomicU64::new(0), - })), - last_non_rooted: AtomicU64::new(0), + let data_arc = Arc::new(GcData { + underlying_allocation: unsafe { GcAllocation::raw(&*mock_scannable) }, + lockout: Lockout::new(), + deallocated: AtomicBool::new(false), + ref_cnt: GcRefCount::new(1), }); - InternalGcRef::new(mock_master_list.insert(handle_arc)) + InternalGcRef::new(data_arc, RefCountPolicy::InitialCreation) } diff --git a/src/collector/ref_cnt.rs b/src/collector/ref_cnt.rs new file mode 100644 index 0000000..30cd2a3 --- /dev/null +++ b/src/collector/ref_cnt.rs @@ -0,0 +1,92 @@ +use std::sync::atomic::{AtomicI64, Ordering}; + +#[derive(Debug)] +pub struct GcRefCount { + // `total_handles` = count_positive + count_negative + // `count_positive` is always >= the actual count >= 0 + // At the beginning of every collection we recalculate `count_positive` + count_positive: AtomicI64, + // `count_negative` is always <= 0 + count_negative: AtomicI64, + + // Handles found in the current collection + found_internally: AtomicI64, +} + +impl GcRefCount { + pub fn new(starting_count: i64) -> Self { + let s = Self { + count_positive: AtomicI64::new(starting_count), + count_negative: AtomicI64::new(0), + found_internally: AtomicI64::new(0), + }; + s.override_mark_as_rooted(); + + s + } + + pub fn prepare_for_collection(&self) { + // Ordering = relaxed, as this is protected by the collection mutex + self.found_internally.store(0, Ordering::Relaxed); + + // `Ordering::Acquire` to sequence with the `Release` in `dec_count` + let negative = self.count_negative.swap(0, Ordering::Acquire); + // `Ordering::Acquire` to sequence with the `Release` in `inc_count` + let fixed_positive = self.count_positive.fetch_add(negative, Ordering::Acquire); + + // If enabled, double check that we're adhering to the invariant + debug_assert!(fixed_positive >= 0); + } + + pub fn found_once_internally(&self) { + // Ordering = relaxed, as this is protected by the collection mutex + self.found_internally.fetch_add(1, Ordering::Relaxed); + } + + pub fn is_rooted(&self) -> bool { + // Due to the count invariant, this can never be less than the actual count + // Ordering = Acquire for the same reason as `prepare_for_collection` + let actual_count_plus_n = self.count_positive.load(Ordering::Acquire); + // Ordering = Relaxed, as this is protected by the collection mutex + let found_internally = self.found_internally.load(Ordering::Relaxed); + + if actual_count_plus_n > found_internally { + // If n = 0, then `actual_count > found_internally` so we know we're rooted + // If n > 0, then we may or may not be rooted, but it's safe to assume we are + return true; + } + // In this case `actual_count + n <= found_internally` + // This implies `actual_count <= found_internally` + // So we found at least as many handles as actually exist, so this data must not be rooted + false + } + + const ROOT_OVERRIDE_VALUE: i64 = -(1 << 60); + + pub fn override_mark_as_rooted(&self) { + // Ordering = relaxed, as this is protected by the collection mutex + self.found_internally + .store(Self::ROOT_OVERRIDE_VALUE, Ordering::Relaxed); + } + + pub fn was_overriden_as_rooted(&self) -> bool { + self.found_internally.load(Ordering::Relaxed) == Self::ROOT_OVERRIDE_VALUE + } + + pub fn inc_count(&self) { + // `Ordering::Release` to sequence with the `Acquire` in `prepare_for_collection` + self.count_positive.fetch_add(1, Ordering::Release); + } + + pub fn dec_count(&self) { + // `Ordering::Release` to sequence with the `Acquire` in `prepare_for_collection` + self.count_negative.fetch_sub(1, Ordering::Release); + } + + pub fn snapshot_ref_count(&self) -> i64 { + let positive = self.count_positive.load(Ordering::Acquire); + let negative = self.count_negative.load(Ordering::Acquire); + + positive + negative + } +} diff --git a/src/concurrency/chunked_ll.rs b/src/concurrency/chunked_ll.rs index f4222ed..63ad1ea 100644 --- a/src/concurrency/chunked_ll.rs +++ b/src/concurrency/chunked_ll.rs @@ -48,14 +48,9 @@ impl Chunk { if self.next.is_null() { self.iter_this(f); } else { - rayon::join( - || self.iter_this(f), - || { - let next = unsafe { &*self.next }; + let next = unsafe { &*self.next }; - next.par_iter_rest(f) - }, - ); + rayon::join(|| self.iter_this(f), || next.par_iter_rest(f)); } } @@ -197,6 +192,7 @@ impl ChunkedLinkedList { } } + #[allow(dead_code)] pub fn remove(&self, cll_item: &CLLItem) { let chunk = unsafe { &*cll_item.from }; diff --git a/src/concurrency/cross_thread_buffer.rs b/src/concurrency/cross_thread_buffer.rs new file mode 100644 index 0000000..f9090f5 --- /dev/null +++ b/src/concurrency/cross_thread_buffer.rs @@ -0,0 +1,38 @@ +use rayon::iter::{IntoParallelRefMutIterator, ParallelBridge, ParallelIterator}; +use std::cell::RefCell; +use thread_local::ThreadLocal; + +pub(crate) struct CrossThreadBuffer { + buffers: ThreadLocal>>, +} + +impl CrossThreadBuffer { + pub fn new() -> Self { + Self { + buffers: ThreadLocal::with_capacity(num_cpus::get()), + } + } + + pub fn push(&self, item: T) { + let tlb = self.buffers.get_or_default(); + tlb.borrow_mut().push(item); + } + + pub fn clear(&mut self) { + for v in self.buffers.iter_mut() { + v.get_mut().clear(); + } + } + + pub fn par_for_each(&mut self, f: F) { + self.buffers.iter_mut().par_bridge().for_each(|vec| { + vec.borrow_mut().par_iter_mut().for_each(|mut x| f(&mut x)); + }) + } +} + +impl Default for CrossThreadBuffer { + fn default() -> Self { + Self::new() + } +} diff --git a/src/concurrency/lockout.rs b/src/concurrency/lockout.rs index eb80d40..20259ac 100644 --- a/src/concurrency/lockout.rs +++ b/src/concurrency/lockout.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use parking_lot::Condvar; use parking_lot::Mutex; -const EXCLUSIVE_SIGNPOST: u64 = !0; +const UNSAFE_EXCLUSIVE_SIGNPOST: u64 = !0; +const EXCLUSIVE_SIGNPOST: u64 = UNSAFE_EXCLUSIVE_SIGNPOST - 1; /// The Lockout mechanism is used internally. It's basically just a `RwLock` that doesn't support /// blocking on reads. It also has a `LockoutProvider` interface that eases sharing the guards @@ -25,13 +26,13 @@ impl Lockout { } } - pub fn get_warrant(provider: P) -> Warrant

{ + pub fn take_warrant(provider: P) -> Warrant

{ let lockout = provider.provide(); let starting_count = lockout.count.load(Ordering::SeqCst); // Fast path, where the count is not SIGNPOSTED - if starting_count != EXCLUSIVE_SIGNPOST { + if starting_count < EXCLUSIVE_SIGNPOST { let swap_result = lockout.count.compare_exchange( starting_count, starting_count + 1, @@ -48,7 +49,7 @@ impl Lockout { loop { let value = lockout.count.load(Ordering::SeqCst); - if value == EXCLUSIVE_SIGNPOST { + if value >= EXCLUSIVE_SIGNPOST { lockout.lockout_condvar.wait(&mut guard); } else { let swap_result = lockout.count.compare_exchange( @@ -67,7 +68,9 @@ impl Lockout { } } - pub fn get_exclusive_warrant(provider: P) -> Option> { + pub fn try_take_exclusive_warrant( + provider: P, + ) -> Option> { let lockout = provider.provide(); let swap_result = lockout.count.compare_exchange( @@ -77,12 +80,50 @@ impl Lockout { Ordering::SeqCst, ); - if swap_result.is_ok() { - Some(ExclusiveWarrant { provider }) - } else { - None + match swap_result { + Ok(_) => Some(ExclusiveWarrant { provider }), + Err(_) => None, } } + + // Unsafe: only safe if paired with `try_release_exclusive_access_unsafe` + pub unsafe fn try_take_exclusive_access_unsafe(provider: &P) -> bool { + let lockout = provider.provide(); + + let swap_result = lockout.count.compare_exchange( + 0, + UNSAFE_EXCLUSIVE_SIGNPOST, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + swap_result.is_ok() + } + + // Unsafe: you must guarantee that this is either paired with your `try_take_exclusive_access_unsafe` + // call. Otherwise, you may be releasing someone else's exclusive access + // + // in shredder only the collector uses this method for this reason + pub unsafe fn try_release_exclusive_access_unsafe(provider: &P) { + let lockout = provider.provide(); + + let _guard = lockout.lockout_mutex.lock(); + + // It's okay if this fails, since we only are trying to relase if it is taken + let _ = lockout.count.compare_exchange( + UNSAFE_EXCLUSIVE_SIGNPOST, + 0, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + lockout.lockout_condvar.notify_all(); + } + + pub fn unsafe_exclusive_access_taken(provider: &P) -> bool { + let lockout = provider.provide(); + lockout.count.load(Ordering::SeqCst) == UNSAFE_EXCLUSIVE_SIGNPOST + } } #[derive(Debug)] @@ -92,21 +133,9 @@ pub struct Warrant { impl Drop for Warrant

{ fn drop(&mut self) { - loop { - let lockout = self.provider.provide(); - - let count = lockout.count.load(Ordering::SeqCst); - assert!(count > 0 && count != EXCLUSIVE_SIGNPOST); - let swap_result = lockout.count.compare_exchange( - count, - count - 1, - Ordering::SeqCst, - Ordering::SeqCst, - ); - if swap_result.is_ok() { - return; - } - } + let lockout = self.provider.provide(); + // Safe to assume we can subtract, because the warrant promises we incremented once + lockout.count.fetch_sub(1, Ordering::SeqCst); } } @@ -120,13 +149,16 @@ impl Drop for ExclusiveWarrant

{ let lockout = self.provider.provide(); let _guard = lockout.lockout_mutex.lock(); - let swap_result = lockout.count.compare_exchange( + + let res = lockout.count.compare_exchange( EXCLUSIVE_SIGNPOST, 0, Ordering::SeqCst, Ordering::SeqCst, ); - assert!(swap_result.is_ok()); + + debug_assert!(res.is_ok()); + lockout.lockout_condvar.notify_all(); } } @@ -151,22 +183,22 @@ mod test { #[test] fn warrant_prevents_exclusive_warrant() { let lockout = Arc::new(Lockout::new()); - let _warrant = Lockout::get_warrant(lockout.clone()); - let exclusive_warrant_option = Lockout::get_exclusive_warrant(lockout); + let _warrant = Lockout::take_warrant(lockout.clone()); + let exclusive_warrant_option = Lockout::try_take_exclusive_warrant(lockout); assert!(exclusive_warrant_option.is_none()); } #[test] fn exclusive_warrant_works_by_itself() { let lockout = Arc::new(Lockout::new()); - let exclusive_warrant_option = Lockout::get_exclusive_warrant(lockout); + let exclusive_warrant_option = Lockout::try_take_exclusive_warrant(lockout); assert!(exclusive_warrant_option.is_some()); } #[test] fn multiple_warrants() { let lockout = Arc::new(Lockout::new()); - let _warrant_1 = Lockout::get_warrant(lockout.clone()); - let _warrant_2 = Lockout::get_warrant(lockout); + let _warrant_1 = Lockout::take_warrant(lockout.clone()); + let _warrant_2 = Lockout::take_warrant(lockout); } } diff --git a/src/concurrency/mod.rs b/src/concurrency/mod.rs index 5ee1475..3c97674 100644 --- a/src/concurrency/mod.rs +++ b/src/concurrency/mod.rs @@ -1,3 +1,4 @@ pub mod atomic_protection; pub mod chunked_ll; +pub mod cross_thread_buffer; pub mod lockout; diff --git a/src/lib.rs b/src/lib.rs index 51b2a14..385db92 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,6 @@ //! - guarded access: accessing `Gc` data requires acquiring a guard (although you can use `DerefGc` in many cases to avoid this) //! - multiple collectors: only a single global collector is supported //! - can't handle `Rc`/`Arc`: requires all `Gc` objects have straightforward ownership semantics -//! - collection optimized for speed, not memory use: `Gc` and internal metadata is small, but there is bloat during collection (will fix!) //! - no no-std support: The collector requires threading and other `std` features (will fix!) #![cfg_attr(feature = "nightly-features", feature(unsize, coerce_unsized))] @@ -34,6 +33,7 @@ clippy::redundant_clone, clippy::use_self, rust_2018_idioms + // TODO: Enable `unreachable_pub` )] // But I don't care about these ones #![allow( @@ -42,7 +42,7 @@ clippy::explicit_deref_methods, // Sometimes calling `deref` directly is clearer clippy::module_name_repetitions, // Sometimes clear naming calls for repetition clippy::multiple_crate_versions, // There is no way to easily fix this without modifying our dependencies - proc_macro_back_compat // Hide this error until we have a path forward. FIXME: issue + proc_macro_back_compat // Hide this error until we have a path forward. TODO: issue )] #[macro_use] diff --git a/src/marker/gc_drop.rs b/src/marker/gc_drop.rs index c8785b1..6f866d9 100644 --- a/src/marker/gc_drop.rs +++ b/src/marker/gc_drop.rs @@ -1,8 +1,8 @@ /// A marker trait that the destructor of this data can be safely run in the background thread /// /// Basically it asserts three things -/// 1) Any thread can drop this data (It is `Send`, or `!Send` purely because it contains a `Gc`, or -/// it is `!Send` but you know that any thread dropping this data is safe.) +/// 1) Any thread can drop this data (It is `Send`, or `!Send` purely because it contains a `!Send` +/// `Gc`, or it is `!Send` but you know that any thread dropping this data is safe.) /// 2) This data does not own a `AtomicGc` or `DerefGc` /// 3) This data is `'static`, or you can guarantee that it's safe to drop it after its lifetime /// has ended. diff --git a/src/plumbing.rs b/src/plumbing.rs index ba6452b..106c26f 100644 --- a/src/plumbing.rs +++ b/src/plumbing.rs @@ -15,7 +15,7 @@ pub fn check_gc_drop(_: &T) {} #[inline(always)] pub fn check_gc_safe(_: &T) {} -// FIXME: This macro can be removed once we have overlapping marker traits +// TODO: This macro can be removed once we have overlapping marker traits // (https://github.com/rust-lang/rust/issues/29864) /// A `Send + 'static` type can be safely marked as `GcSafe`, and this macro eases that /// implementation diff --git a/src/r.rs b/src/r.rs index 98504e6..2e57ceb 100644 --- a/src/r.rs +++ b/src/r.rs @@ -73,7 +73,7 @@ unsafe impl<'a, T: ?Sized> GcDeref for R<'a, T> where T: GcDeref {} unsafe impl<'a, T: ?Sized> GcSafe for RMut<'a, T> {} // unsafe impl<'a, T: ?Sized> !GcDrop for RMut<'a, T> {} -// FIXME: Kinda a subtle impl, not sure if it's correct +// This is counter intuitive, but safe (because you can't get a mutable reference from a &RMut) unsafe impl<'a, T: ?Sized> GcDeref for RMut<'a, T> where T: GcDeref {} unsafe impl<'a, T: ?Sized> Scan for R<'a, T> { @@ -161,8 +161,6 @@ where } } -// TODO: Ord, PartialOrd - impl<'a, T: ?Sized> PartialEq for R<'a, T> where T: PartialEq, diff --git a/src/scan.rs b/src/scan.rs index 0944623..d9f8ce1 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -111,13 +111,12 @@ unsafe impl ToScan for T { /// Scanner is a struct used to manage the scanning of data, sort of analogous to `Hasher` /// Usually you will only care about this while implementing `Scan` pub struct Scanner<'a> { - pub(crate) scan_callback: Box, + pub(crate) scan_callback: Box, } -#[allow(clippy::unused_self)] impl<'a> Scanner<'a> { #[must_use] - pub(crate) fn new(callback: F) -> Self { + pub(crate) fn new(callback: F) -> Self { Self { scan_callback: Box::new(callback), } @@ -130,7 +129,7 @@ impl<'a> Scanner<'a> { } #[inline] - pub(crate) fn add_internal_handle(&mut self, gc_ref: InternalGcRef) { + pub(crate) fn add_internal_handle(&mut self, gc_ref: &InternalGcRef) { (self.scan_callback)(gc_ref); } } diff --git a/src/smart_ptr/deref_gc.rs b/src/smart_ptr/deref_gc.rs index 1cab5f4..2f0a74b 100644 --- a/src/smart_ptr/deref_gc.rs +++ b/src/smart_ptr/deref_gc.rs @@ -145,7 +145,7 @@ unsafe impl Scan for DerefGc { #[allow(clippy::inline_always)] #[inline(always)] fn scan(&self, scanner: &mut Scanner<'_>) { - scanner.add_internal_handle(self.backing_handle.clone()); + scanner.add_internal_handle(&self.backing_handle); } } @@ -291,11 +291,11 @@ where } #[cfg(test)] +#[allow(clippy::eq_op)] mod test { use crate::DerefGc; #[test] - #[allow(clippy::eq_op)] fn test_eq() { let a = DerefGc::new(1); let b = DerefGc::new(1); diff --git a/src/smart_ptr/gc.rs b/src/smart_ptr/gc.rs index 5e2dff3..7ed6992 100644 --- a/src/smart_ptr/gc.rs +++ b/src/smart_ptr/gc.rs @@ -132,7 +132,7 @@ impl Gc { // Create a Gc let gc = Self { - backing_handle: gc_ref.clone(), + backing_handle: COLLECTOR.clone_handle(&gc_ref), direct_ptr: uninit_ptr, }; @@ -160,7 +160,7 @@ impl Gc { /// See `new_cyclic` and `new_with_finalizer` pub fn new_cyclic_with_finalizer(f: F) -> Self where - T: Sized + Finalize, // FIXME: Add a `GcDrop` variant + T: Sized + Finalize, F: FnOnce(Self) -> T, { let (handle, ptr) = unsafe { @@ -173,7 +173,7 @@ impl Gc { // Create a Gc let gc = Self { - backing_handle: gc_ref.clone(), + backing_handle: COLLECTOR.clone_handle(&gc_ref), direct_ptr: uninit_ptr, }; @@ -227,13 +227,21 @@ impl Gc { } pub(crate) fn assert_live(&self) { - let ordering = atomic::Ordering::Relaxed; - let is_deallocated = self.backing_handle.data().deallocated.load(ordering); - assert!(!is_deallocated); + let is_deallocated = self + .backing_handle + .data() + .deallocated + .load(atomic::Ordering::Relaxed); + let is_invalidated = self.backing_handle.is_invalidated(); + assert!(!is_deallocated && !is_invalidated); } - pub(crate) fn internal_handle(&self) -> InternalGcRef { - self.backing_handle.clone() + #[inline] + pub(crate) fn drop_preserving_reference_counts(self) { + // Ensure the `drop` implementation doesn't touch the reference count + self.backing_handle + .invalidate_without_touching_reference_counts(); + drop(self); } pub(crate) fn internal_handle_ref(&self) -> &InternalGcRef { @@ -286,7 +294,7 @@ unsafe impl Scan for Gc { #[allow(clippy::inline_always)] #[inline(always)] fn scan(&self, scanner: &mut Scanner<'_>) { - scanner.add_internal_handle(self.internal_handle()); + scanner.add_internal_handle(&self.backing_handle); } } @@ -317,7 +325,7 @@ impl Drop for Gc { unsafe impl Finalize for Gc { unsafe fn finalize(&mut self) { - self.internal_handle().invalidate(); + self.backing_handle.invalidate(); } } diff --git a/src/std_impls/mod.rs b/src/std_impls/mod.rs index 7302b29..6066bea 100644 --- a/src/std_impls/mod.rs +++ b/src/std_impls/mod.rs @@ -19,7 +19,7 @@ mod test { unsafe impl GcSafe for MockGc {} unsafe impl Scan for MockGc { fn scan(&self, scanner: &mut Scanner<'_>) { - (scanner.scan_callback)(self.handle.clone()); + (scanner.scan_callback)(&self.handle); } } diff --git a/tests/integration.rs b/tests/integration.rs index 2e995e8..8d0c672 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -130,7 +130,7 @@ fn scan_skip_problem() { let root_con = Gc::new(sync::Mutex::new(Connection::default())); eprintln!("root {:?}", root_con); - // FIXME: Use shortcut methods here + // TODO: Use shortcut methods here let hidden = Gc::new(sync::Mutex::new(Connection::default())); eprintln!("hidden {:?}", hidden); let hider = Gc::new(sync::Mutex::new(Connection::default())); @@ -233,16 +233,14 @@ fn simple_atomic_cleanup() { run_with_gc_cleanup(|| { let value = Gc::new(17); - let atomic = AtomicGc::new(&value); - drop(value); + let atomic = AtomicGc::new(value); let fr = atomic.load(Ordering::Relaxed); assert_eq!(*fr.get(), 17); drop(fr); let new_value = Gc::new(20); - atomic.store(&new_value, Ordering::Relaxed); - drop(new_value); + atomic.store(new_value, Ordering::Relaxed); let sr = atomic.load(Ordering::Relaxed); assert_eq!(*sr.get(), 20); @@ -262,10 +260,8 @@ fn atomic_cycle() { let b = Gc::new(sync::Mutex::new(Connection { connect: None })); - let a_atomic = AtomicGc::new(&a); - let b_atomic = AtomicGc::new(&b); - drop(a); - drop(b); + let a_atomic = AtomicGc::new(a); + let b_atomic = AtomicGc::new(b); let a_read = a_atomic.load(Ordering::Relaxed); let b_read = b_atomic.load(Ordering::Relaxed); @@ -281,47 +277,51 @@ fn atomic_cycle() { } #[test] -fn atomic_compare_and_swap_test() { +fn atomic_compare_and_exchange_test() { let _guard = TEST_MUTEX.lock(); run_with_gc_cleanup(|| { let v1 = Gc::new(123); let v2 = Gc::new(1776); let v1_alt = Gc::new(123); - let atomic = AtomicGc::new(&v1); + let atomic = AtomicGc::new(v1.clone()); assert_eq!(*atomic.load(Ordering::Relaxed).get(), 123); - let res = atomic.compare_and_swap(&v1, &v2, Ordering::Relaxed); - assert!(res); + let res = atomic.compare_exchange(&v1, v2.clone(), Ordering::Relaxed, Ordering::Relaxed); + assert!(res.is_ok()); assert_eq!(*atomic.load(Ordering::Relaxed).get(), 1776); - atomic.store(&v1, Ordering::Relaxed); - let res = atomic.compare_and_swap(&v1_alt, &v2, Ordering::Relaxed); - assert!(!res); + atomic.store(v1, Ordering::Relaxed); + let res = atomic.compare_exchange(&v1_alt, v2, Ordering::Relaxed, Ordering::Relaxed); + assert!(res.is_err()); assert_eq!(*atomic.load(Ordering::Relaxed).get(), 123); }); assert_eq!(number_of_tracked_allocations(), 0); + assert_eq!(number_of_active_handles(), 0); } #[test] -fn atomic_compare_and_exchange_test() { +fn atomic_swap_test() { let _guard = TEST_MUTEX.lock(); run_with_gc_cleanup(|| { - let v1 = Gc::new(123); - let v2 = Gc::new(1776); - let v1_alt = Gc::new(123); + let x1 = Gc::new(76); + let x2 = Gc::new(667); - let atomic = AtomicGc::new(&v1); - assert_eq!(*atomic.load(Ordering::Relaxed).get(), 123); + let atomic = AtomicGc::new(x1.clone()); + let s1 = atomic.swap(x2.clone(), Ordering::Relaxed); - let res = atomic.compare_exchange(&v1, &v2, Ordering::Relaxed, Ordering::Relaxed); - assert!(res); - assert_eq!(*atomic.load(Ordering::Relaxed).get(), 1776); + assert_eq!(x1, s1); + drop(x1); + drop(s1); + collect(); + assert_eq!(number_of_tracked_allocations(), 1); - atomic.store(&v1, Ordering::Relaxed); - let res = atomic.compare_exchange(&v1_alt, &v2, Ordering::Relaxed, Ordering::Relaxed); - assert!(!res); - assert_eq!(*atomic.load(Ordering::Relaxed).get(), 123); + let s2 = atomic.load(Ordering::Relaxed); + assert_eq!(s2, x2); + drop(x2); + + collect(); + assert_eq!(number_of_tracked_allocations(), 1); }); assert_eq!(number_of_tracked_allocations(), 0); assert_eq!(number_of_active_handles(), 0);