Skip to content

Commit 6aeeead

Browse files
committed
chore(cubestore): Set metastore deletion batch size based on upload_concurrency option
1 parent 71c1022 commit 6aeeead

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

rust/cubestore/cubestore/src/metastore/rocks_fs.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ pub struct BaseRocksStoreFs {
5555
name: &'static str,
5656
minimum_snapshots_count: u64,
5757
snapshots_lifetime: u64,
58-
remote_files_cleanup_batch_size: u64,
58+
// A copy of the upload-concurrency config -- we multiply this for our deletes.
59+
deletion_batch_size: u64,
5960
}
6061

6162
impl BaseRocksStoreFs {
@@ -65,13 +66,13 @@ impl BaseRocksStoreFs {
6566
) -> Arc<Self> {
6667
let minimum_snapshots_count = config.minimum_metastore_snapshots_count();
6768
let snapshots_lifetime = config.metastore_snapshots_lifetime();
68-
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
69+
let deletion_batch_size = Self::deletion_batch_size(config.as_ref());
6970
Arc::new(Self {
7071
remote_fs,
7172
name: "metastore",
7273
minimum_snapshots_count,
7374
snapshots_lifetime,
74-
remote_files_cleanup_batch_size,
75+
deletion_batch_size,
7576
})
7677
}
7778
pub fn new_for_cachestore(
@@ -80,16 +81,23 @@ impl BaseRocksStoreFs {
8081
) -> Arc<Self> {
8182
let minimum_snapshots_count = config.minimum_cachestore_snapshots_count();
8283
let snapshots_lifetime = config.cachestore_snapshots_lifetime();
83-
let remote_files_cleanup_batch_size = config.remote_files_cleanup_batch_size();
84+
let deletion_batch_size = Self::deletion_batch_size(config.as_ref());
8485
Arc::new(Self {
8586
remote_fs,
8687
name: "cachestore",
8788
minimum_snapshots_count,
8889
snapshots_lifetime,
89-
remote_files_cleanup_batch_size,
90+
deletion_batch_size,
9091
})
9192
}
9293

94+
fn deletion_batch_size(config: &dyn ConfigObj) -> u64 {
95+
// Pick a large enough batch size that batching is not a significant slowdown, given the
96+
// upload_concurrency parameter.
97+
const MULTIPLIER: u64 = 25;
98+
config.upload_concurrency().saturating_mul(MULTIPLIER)
99+
}
100+
93101
pub fn get_name(&self) -> &'static str {
94102
&self.name
95103
}
@@ -214,11 +222,7 @@ impl BaseRocksStoreFs {
214222
);
215223
}
216224

217-
for batch in to_delete.chunks(
218-
self.remote_files_cleanup_batch_size
219-
.try_into()
220-
.unwrap_or(usize::MAX),
221-
) {
225+
for batch in to_delete.chunks(self.deletion_batch_size.try_into().unwrap_or(usize::MAX)) {
222226
for v in join_all(
223227
batch
224228
.iter()

0 commit comments

Comments
 (0)