diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 2a86432c10d9b..192cd39af4ada 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -947,8 +947,8 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=u64::MAX)), }), ("compact_max_block_selection", DefaultSettingValue { - value: UserSettingValue::UInt64(10000), - desc: "Limits the maximum number of blocks that can be selected during a compact operation.", + value: UserSettingValue::UInt64(1000), + desc: "Limits the maximum number of imperfect blocks that can be selected during a compact operation.", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(2..=u64::MAX)), diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 7381afa4ca691..037384237d3a0 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -207,14 +207,10 @@ impl SnapshotGenerator for AppendGenerator { // If imperfect_count is larger, SLIGHTLY increase the number of blocks // eligible for auto-compaction, this adjustment is intended to help reduce // fragmentation over time. - // - // To prevent the off-by-one mistake, we need to add 1 to it; - // this way, the potentially previously left non-compacted segment will - // also be included. let compact_num_block_hint = std::cmp::min( imperfect_count, (auto_compaction_imperfect_blocks_threshold as f64 * 1.5).ceil() as u64, - ) + 1; + ); info!("set compact_num_block_hint to {compact_num_block_hint }"); self.ctx .set_compaction_num_block_hint(table_info.name.as_str(), compact_num_block_hint); diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 769140540786a..ab029f2ed6208 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -118,6 +118,7 @@ impl BlockCompactMutator { let mut segment_idx = 0; let mut is_end = false; + let mut stop_after_next = false; let mut parts = Vec::new(); let chunk_size = max_threads * 4; for chunk in segment_locations.chunks(chunk_size) { @@ -154,10 +155,25 @@ impl BlockCompactMutator { checker.generate_part(segments, &mut parts); } - if checker.is_limit_reached(num_segment_limit, num_block_limit) { + if stop_after_next { is_end = true; break; } + + match checker.is_limit_reached(num_segment_limit, num_block_limit) { + CompactLimitState::Continue => {} + CompactLimitState::ReachedBlockLimit => { + // When the block limit is reached, we allow one more iteration + // to include the next segment in compaction. + // This "+1" behavior ensures that previously un-compacted segments + // near the boundary are not skipped due to strict block counting. + stop_after_next = true; + } + CompactLimitState::ReachedSegmentLimit => { + is_end = true; + break; + } + } } // Status. @@ -303,6 +319,16 @@ impl BlockCompactMutator { } } +// CompactLimitState indicates the current compaction progress state. +pub enum CompactLimitState { + /// Continue collecting more segments and blocks. + Continue, + /// Hit the block threshold — take one more segment before stopping. + ReachedBlockLimit, + /// Hit the segment threshold — stop immediately. + ReachedSegmentLimit, +} + pub struct SegmentCompactChecker { thresholds: BlockThresholds, segments: Vec<(SegmentIndex, Arc)>, @@ -310,7 +336,7 @@ pub struct SegmentCompactChecker { cluster_key_id: Option, compacted_segment_cnt: usize, - compacted_block_cnt: u64, + compacted_imperfect_block_cnt: u64, } impl SegmentCompactChecker { @@ -320,8 +346,8 @@ impl SegmentCompactChecker { total_block_count: 0, thresholds, cluster_key_id, - compacted_block_cnt: 0, compacted_segment_cnt: 0, + compacted_imperfect_block_cnt: 0, } } @@ -360,9 +386,9 @@ impl SegmentCompactChecker { } self.compacted_segment_cnt += segments.len(); - self.compacted_block_cnt += segments + self.compacted_imperfect_block_cnt += segments .iter() - .map(|(_, info)| info.summary.block_count) + .map(|(_, info)| info.summary.block_count - info.summary.perfect_block_count) .sum::(); true } @@ -415,15 +441,31 @@ impl SegmentCompactChecker { self.generate_part(final_segments, parts); } - pub fn is_limit_reached(&self, num_segment_limit: usize, num_block_limit: usize) -> bool { - let residual_segment_cnt = self.segments.len(); - let residual_block_cnt: u64 = self - .segments - .iter() - .map(|(_, info)| info.summary.block_count) - .sum(); - self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit - || self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64 + /// Check if compaction limit is reached. + pub fn is_limit_reached( + &self, + num_segment_limit: usize, + num_block_limit: usize, + ) -> CompactLimitState { + // Stop immediately if the number of compacted segments reaches limit + if self.compacted_segment_cnt + self.segments.len() >= num_segment_limit { + return CompactLimitState::ReachedSegmentLimit; + } + + // Count the total number of imperfect blocks (those that still need compaction). + let compacted_imperfect_block_cnt = + self.segments + .iter() + .fold(self.compacted_imperfect_block_cnt, |mut acc, (_, info)| { + acc += info.summary.block_count - info.summary.perfect_block_count; + acc + }); + // If the imperfect block count exceeds the limit, signal "take one more". + if compacted_imperfect_block_cnt >= num_block_limit as u64 { + CompactLimitState::ReachedBlockLimit + } else { + CompactLimitState::Continue + } } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 31683958c8b44..4422a29bb2d6f 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -52,6 +52,7 @@ use opendal::Operator; use crate::io::MetaReaders; use crate::operations::common::BlockMetaIndex as BlockIndex; +use crate::operations::mutation::mutator::block_compact_mutator::CompactLimitState; use crate::operations::mutation::SegmentCompactChecker; use crate::operations::BlockCompactMutator; use crate::operations::CompactLazyPartInfo; @@ -413,7 +414,7 @@ impl ReclusterMutator { let mut parts = Vec::new(); let mut checker = SegmentCompactChecker::new(self.block_thresholds, Some(self.cluster_key_id)); - + let mut stop_after_next = false; for (loc, compact_segment) in compact_segments.into_iter() { recluster_blocks_count += compact_segment.summary.block_count; let segments_vec = checker.add(loc.segment_idx, compact_segment); @@ -421,9 +422,19 @@ impl ReclusterMutator { checker.generate_part(segments, &mut parts); } - if checker.is_limit_reached(num_segment_limit, num_block_limit) { + if stop_after_next { break; } + + match checker.is_limit_reached(num_segment_limit, num_block_limit) { + CompactLimitState::Continue => {} + CompactLimitState::ReachedBlockLimit => { + stop_after_next = true; + } + CompactLimitState::ReachedSegmentLimit => { + break; + } + } } // finalize the compaction. checker.finalize(&mut parts); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test index 51dfb9e866df6..7c9404369b6e0 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test @@ -108,3 +108,35 @@ select info:average_depth from clustering_information('i15760', 't1') statement ok drop table t1 all; + +#ISSUE 18859 +statement ok +create table t2(a int) row_per_block=5; + +statement ok +insert into t2 select number from numbers(2); + +statement ok +insert into t2 select number from numbers(12); + +statement ok +insert into t2 select number from numbers(2); + +query T +select block_count, row_count from fuse_segment('i15760', 't2'); +---- +1 2 +2 12 +1 2 + +statement ok +insert into t2 select number from numbers(2); + +# after auto compaction +query T +select block_count, row_count from fuse_segment('i15760', 't2'); +---- +3 18 + +statement ok +drop table t2 all;