-
Notifications
You must be signed in to change notification settings - Fork 9
Refactor the collector to use reference counting #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -203,7 +203,7 @@ impl GcAllocation { | |
| } | ||
| } | ||
|
|
||
| pub fn scan<F: FnMut(InternalGcRef)>(&self, callback: F) { | ||
| pub fn scan<F: FnMut(&InternalGcRef)>(&self, callback: F) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did this change?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation of |
||
| unsafe { | ||
| let mut scanner = Scanner::new(callback); | ||
| let to_scan = &*self.scan_ptr; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,146 +1,123 @@ | ||
| use std::sync::atomic::Ordering; | ||
|
|
||
| use crossbeam::deque::Injector; | ||
| use crossbeam::queue::SegQueue; | ||
| use dynqueue::IntoDynQueue; | ||
| use parking_lot::{MutexGuard, RwLock}; | ||
| use rayon::iter::{IntoParallelIterator, ParallelIterator}; | ||
| use std::sync::atomic::Ordering; | ||
|
|
||
| use crate::collector::dropper::DropMessage; | ||
| use crate::collector::{Collector, GcExclusiveWarrant}; | ||
| use crate::collector::Collector; | ||
| use crate::concurrency::lockout::Lockout; | ||
|
|
||
| use parking_lot::MutexGuard; | ||
|
|
||
| impl Collector { | ||
| pub(super) fn do_collect(&self, gc_guard: MutexGuard<'_, ()>) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little confused about what this whole function is doing
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the part that does the garbage collection :) |
||
| // Be careful modifying this method. The tracked data and tracked handles can change underneath us | ||
| // TODO: Improve this comment | ||
| // Be careful modifying this method. The tracked data, reference counts, and to some extent | ||
| // the graph, can change underneath us. | ||
| // | ||
| // Currently the state is this, as far as I can tell: | ||
| // - New handles are conservatively seen as roots if seen at all while we are touching handles | ||
| // (there is nowhere a new "secret root" can be created and then the old root stashed and seen as non-rooted) | ||
| // - New data is treated as a special case, and only deallocated if it existed at the start of collection | ||
| // - Deleted handles cannot make the graph "more connected" if the deletion was not observed | ||
| // - New data is always seen as rooted as long is it is allocated after the graph freezing step | ||
| // - After graph freezing (where we take all the Lockouts we can) there is no way to | ||
| // smuggle items in or out of the graph | ||
| // - The reference count preperation is conservative (if concurrently modified, the graph will simply look more connected) | ||
|
|
||
| trace!("Beginning collection"); | ||
| let _atomic_spinlock_guard = self.atomic_spinlock.lock_exclusive(); | ||
|
|
||
| let current_collection = self | ||
| .tracked_data | ||
| .current_collection_number | ||
| .load(Ordering::SeqCst); | ||
|
|
||
| // Here we synchronize destructors: this ensures that handles in objects in the background thread are dropped | ||
| // Otherwise we'd see those handles as rooted and keep them around. | ||
| // Otherwise we'd see those handles as rooted and keep them around. (This would not lead to incorrectness, but | ||
| // this improves consistency and determinism.) | ||
| // | ||
| // This makes a lot of sense in the background thread (since it's totally async), | ||
| // but may slow direct calls to `collect`. | ||
| self.synchronize_destructors(); | ||
|
|
||
| // The warrant system prevents us from scanning in-use data | ||
| let warrants: Injector<GcExclusiveWarrant> = Injector::new(); | ||
|
|
||
| // eprintln!("tracked data {:?}", tracked_data); | ||
| // eprintln!("tracked handles {:?}", tracked_handles); | ||
|
|
||
| // In this step we calculate what's not rooted by marking all data definitively in a Gc | ||
| self.tracked_data.data.par_iter(|data| { | ||
| // If data.last_marked == 0, then it is new data. Update that we've seen this data | ||
| // (this step helps synchronize what data is valid to be deallocated) | ||
| if data.last_marked.load(Ordering::SeqCst) == 0 { | ||
| data.last_marked | ||
| .store(current_collection - 1, Ordering::SeqCst); | ||
| // First, go through the data, resetting all the reference count trackers, | ||
| // and taking exclusive warrants where possible | ||
| self.tracked_data.par_iter(|data| { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate on what this step is doing?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It (a) tries to lock all the data so we can scan it using |
||
| unsafe { | ||
| // Safe as we are the collector | ||
| Lockout::try_take_exclusive_access_unsafe(&data); | ||
| } | ||
| // This can be done concurrently with the `Lockout` managment, since the ref-count snapshot is conservative | ||
| // TODO: Double check this logic | ||
| data.ref_cnt.prepare_for_collection(); | ||
| }); | ||
|
|
||
| if let Some(warrant) = Lockout::get_exclusive_warrant(data.clone()) { | ||
| // Save that warrant so things can't shift around under us | ||
| warrants.push(warrant); | ||
|
|
||
| // Now figure out what handles are not rooted | ||
| // Then adjust reference counts to figure out what is rooted | ||
| self.tracked_data.par_iter(|data| { | ||
| if Lockout::unsafe_exclusive_access_taken(&data) { | ||
| data.underlying_allocation.scan(|h| { | ||
| h.handle_ref | ||
| .v | ||
| .last_non_rooted | ||
| .store(current_collection, Ordering::SeqCst); | ||
| h.data_ref.ref_cnt.found_once_internally(); | ||
| }); | ||
| } else { | ||
| // eprintln!("failed to get warrant!"); | ||
| // If we can't get the warrant, then this data must be in use, so we can mark it | ||
| data.last_marked.store(current_collection, Ordering::SeqCst); | ||
| // Someone else had this data during the collection, so it is clearly rooted | ||
| data.ref_cnt.override_mark_as_rooted(); | ||
| } | ||
| }); | ||
|
|
||
| // The handles that were not just marked need to be treated as roots | ||
| // Now we need to translate our set of roots into a queue | ||
| // TODO: This is the only allocation in the collector at this point, probably is removable or re-usable | ||
| let roots = SegQueue::new(); | ||
| self.tracked_data.handles.par_iter(|handle| { | ||
| // If the `last_non_rooted` number was not now, then it is a root | ||
| if handle.last_non_rooted.load(Ordering::SeqCst) != current_collection { | ||
| roots.push(handle); | ||
| self.tracked_data.par_iter(|data| { | ||
| if data.ref_cnt.is_rooted() { | ||
| // We need to scan data that dynamically becomes rooted, so we use the `override_mark_as_rooted` | ||
| // flag to track what we've enqued to scan already | ||
| data.ref_cnt.override_mark_as_rooted(); | ||
| roots.push(data); | ||
| } | ||
| }); | ||
|
|
||
| // eprintln!("roots {:?}", roots); | ||
|
|
||
| // This step is dfs through the object graph (starting with the roots) | ||
| // We mark each object we find | ||
| let dfs_stack = roots.into_dyn_queue(); | ||
| dfs_stack | ||
| .into_par_iter() | ||
| .for_each(|(queue, handle)| unsafe { | ||
| handle.underlying_data.with_data(|data| { | ||
| // If this data is new, we don't want to `Scan` it, since we may not have its Lockout | ||
| // Any handles inside this could not of been seen in step 1, so they'll be rooted anyway | ||
| if data.last_marked.load(Ordering::SeqCst) != 0 { | ||
| // Essential note! All non-new non-warranted data is automatically marked | ||
| // Thus we will never accidentally scan non-warranted data here | ||
| let previous_mark = | ||
| data.last_marked.swap(current_collection, Ordering::SeqCst); | ||
|
|
||
| // Since we've done an atomic swap, we know we've already scanned this iff it was marked | ||
| // (excluding data marked because we couldn't get its warrant, who's handles would be seen as roots) | ||
| // This stops us for scanning data more than once and, crucially, concurrently scanning the same data | ||
| if previous_mark != current_collection { | ||
| data.last_marked.store(current_collection, Ordering::SeqCst); | ||
|
|
||
| data.underlying_allocation.scan(|h| { | ||
| let mut should_enque = false; | ||
| h.handle_ref.v.underlying_data.with_data(|scanned_data| { | ||
| if scanned_data.last_marked.load(Ordering::SeqCst) | ||
| != current_collection | ||
| { | ||
| should_enque = true; | ||
| } | ||
| }); | ||
| if should_enque { | ||
| queue.enqueue(h.handle_ref.v); | ||
| } | ||
| }); | ||
| } | ||
| dfs_stack.into_par_iter().for_each(|(queue, data)| { | ||
| debug_assert!(!data.deallocated.load(Ordering::SeqCst)); | ||
|
|
||
| if Lockout::unsafe_exclusive_access_taken(&data) { | ||
| data.underlying_allocation.scan(|h| { | ||
| let ref_cnt = &h.data_ref.ref_cnt; | ||
| // We need to scan data that dynamically becomes rooted, so we use the `override_mark_as_rooted` | ||
| // flag to track what we've enqued to scan already. (So we can't just use `is_rooted` here.) | ||
| if !ref_cnt.was_overriden_as_rooted() { | ||
| // This is technically racy, since we check the rooting status, THEN mark as rooted/enqueue | ||
| // But that doesn't matter since the worse that can happen is that we enqueue the data twice | ||
| ref_cnt.override_mark_as_rooted(); | ||
| queue.enqueue(h.data_ref.clone()); | ||
| } | ||
| }) | ||
| }); | ||
| // We're done scanning things, and have established what is marked. Release the warrants | ||
| drop(warrants); | ||
| }); | ||
| } else { | ||
| // Someone else had this data during the collection, so it is clearly rooted | ||
| data.ref_cnt.override_mark_as_rooted(); | ||
| } | ||
| }); | ||
|
|
||
| // We are done scanning, so release any warrants | ||
| self.tracked_data.par_iter(|data| unsafe { | ||
| Lockout::try_release_exclusive_access_unsafe(&data); | ||
| }); | ||
|
|
||
| // Since new refcnts are created as rooted, and new data is created with new refcnts, we | ||
| // can safely treat the refcnt data as definitive | ||
|
|
||
| // Now cleanup by removing all the data that is done for | ||
| let to_drop = RwLock::new(Vec::new()); | ||
| let to_drop = self.dropper.get_buffer(); | ||
|
|
||
| self.tracked_data.data.par_retain(|data| { | ||
| // Mark the new data as in use for now | ||
| // This stops us deallocating data that was allocated during collection | ||
| if data.last_marked.load(Ordering::SeqCst) == 0 { | ||
| data.last_marked.store(current_collection, Ordering::SeqCst); | ||
| self.tracked_data.par_retain(|data| { | ||
| let is_marked = data.ref_cnt.is_rooted(); | ||
| if is_marked { | ||
| // this is marked so retain it | ||
| return true; | ||
| } | ||
|
|
||
| // If this is true, we just marked this data | ||
| if data.last_marked.load(Ordering::SeqCst) == current_collection { | ||
| // so retain it | ||
| true | ||
| } else { | ||
| // Otherwise we didn't mark it and it should be deallocated | ||
| // eprintln!("deallocating {:?}", data_ptr); | ||
| // Send it to the drop thread to be dropped | ||
| to_drop.write().push(data.clone()); | ||
| // Otherwise we didn't mark it and it should be deallocated | ||
| // eprintln!("deallocating {:?}", data_ptr); | ||
| // Send it to the drop thread to be dropped | ||
| to_drop.push(data.clone()); | ||
|
|
||
| // Don't retain this data | ||
| false | ||
| } | ||
| // Don't retain this data | ||
| false | ||
| }); | ||
|
|
||
| // Send off the data to be dropped in the background | ||
|
|
@@ -153,11 +130,6 @@ impl Collector { | |
| self.trigger | ||
| .set_data_count_after_collection(self.tracked_data_count()); | ||
|
|
||
| // update collection number | ||
| self.tracked_data | ||
| .current_collection_number | ||
| .fetch_add(1, Ordering::SeqCst); | ||
|
|
||
| drop(gc_guard); | ||
|
|
||
| trace!("Collection finished"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs commented back out