|
1 | | -use std::collections::HashSet; |
| 1 | +use std::{collections::HashSet, pin::Pin, sync::Arc}; |
2 | 2 |
|
3 | 3 | use bao_tree::ChunkRanges; |
4 | 4 | use genawaiter::sync::{Co, Gen}; |
5 | 5 | use n0_future::{Stream, StreamExt}; |
6 | | -use tracing::{debug, error, warn}; |
| 6 | +use tracing::{debug, error, info, warn}; |
7 | 7 |
|
8 | 8 | use crate::{api::Store, Hash, HashAndFormat}; |
9 | 9 |
|
@@ -130,14 +130,31 @@ fn gc_sweep<'a>( |
130 | 130 | }) |
131 | 131 | } |
132 | 132 |
|
133 | | -#[derive(Debug, Clone)] |
| 133 | +#[derive(derive_more::Debug, Clone)] |
134 | 134 | pub struct GcConfig { |
135 | 135 | pub interval: std::time::Duration, |
| 136 | + #[debug("ProtectCallback")] |
| 137 | + pub add_protected: Option<ProtectCb>, |
136 | 138 | } |
137 | 139 |
|
| 140 | +#[derive(Debug)] |
| 141 | +pub enum ProtectOutcome { |
| 142 | + Continue, |
| 143 | + Skip, |
| 144 | +} |
| 145 | + |
| 146 | +pub type ProtectCb = Arc< |
| 147 | + dyn for<'a> Fn( |
| 148 | + &'a mut HashSet<Hash>, |
| 149 | + ) |
| 150 | + -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>> |
| 151 | + + Send |
| 152 | + + Sync |
| 153 | + + 'static, |
| 154 | +>; |
| 155 | + |
138 | 156 | pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> { |
139 | 157 | { |
140 | | - live.clear(); |
141 | 158 | store.clear_protected().await?; |
142 | 159 | let mut stream = gc_mark(store, live); |
143 | 160 | while let Some(ev) = stream.next().await { |
@@ -179,7 +196,17 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api: |
179 | 196 | pub async fn run_gc(store: Store, config: GcConfig) { |
180 | 197 | let mut live = HashSet::new(); |
181 | 198 | loop { |
| 199 | + live.clear(); |
182 | 200 | tokio::time::sleep(config.interval).await; |
| 201 | + if let Some(ref cb) = config.add_protected { |
| 202 | + match (cb)(&mut live).await { |
| 203 | + ProtectOutcome::Continue => {} |
| 204 | + ProtectOutcome::Skip => { |
| 205 | + info!("Skip gc run: protect callback indicated skip"); |
| 206 | + continue; |
| 207 | + } |
| 208 | + } |
| 209 | + } |
183 | 210 | if let Err(e) = gc_run_once(&store, &mut live).await { |
184 | 211 | error!("error during gc run: {e}"); |
185 | 212 | break; |
@@ -288,6 +315,7 @@ mod tests { |
288 | 315 | assert!(!data_path.exists()); |
289 | 316 | assert!(!outboard_path.exists()); |
290 | 317 | } |
| 318 | + live.clear(); |
291 | 319 | // create a large partial file and check that the data and outboard file as well as |
292 | 320 | // the sizes and bitfield files are deleted by gc |
293 | 321 | { |
|
0 commit comments