diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index c15d6d456b..5340c9cadf 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -1002,8 +1002,8 @@ mod tests { let index_writer: IndexWriter = index.writer_for_tests().unwrap(); assert_eq!( format!("{:?}", index_writer.get_merge_policy()), - "LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \ - min_layer_size: 10000, level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }" + "LogMergePolicy { min_num_segments: 8, target_segment_size: 10000000, min_layer_size: \ + 10000, level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }" ); let merge_policy = Box::::default(); index_writer.set_merge_policy(merge_policy); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 86a8fd3417..d02d3b9ccf 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -8,7 +8,7 @@ use crate::index::SegmentMeta; const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75; const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000; const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8; -const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000; +const DEFAULT_TARGET_SEGMENT_SIZE: usize = 10_000_000; // The default value of 1 means that deletes are not taken in account when // identifying merge candidates. This is not a very sensible default: it was // set like that for backward compatibility and might change in the near future. @@ -19,7 +19,7 @@ const DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE: f32 = 1.0f32; #[derive(Debug, Clone)] pub struct LogMergePolicy { min_num_segments: usize, - max_docs_before_merge: usize, + target_segment_size: usize, min_layer_size: u32, level_log_size: f64, del_docs_ratio_before_merge: f32, @@ -35,11 +35,11 @@ impl LogMergePolicy { self.min_num_segments = min_num_segments; } - /// Set the maximum number docs in a segment for it to be considered for - /// merging. A segment can still reach more than max_docs, by merging many - /// smaller ones. - pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) { - self.max_docs_before_merge = max_docs_merge_size; + /// Set the target number of documents to have in a segment, a segment can have up to + /// `(target_segment_size * 2) - 2` documents, but the policy will try to keep them as close as + /// possible to `target_segment_size` + pub fn set_target_segment_size(&mut self, target_segment_size: usize) { + self.target_segment_size = target_segment_size; } /// Set the minimum segment size under which all segment belong @@ -76,54 +76,89 @@ impl LogMergePolicy { self.del_docs_ratio_before_merge = del_docs_ratio_before_merge; } - fn has_segment_above_deletes_threshold(&self, level: &[&SegmentMeta]) -> bool { - level - .iter() - .any(|segment| deletes_ratio(segment) > self.del_docs_ratio_before_merge) - } -} - -fn deletes_ratio(segment: &SegmentMeta) -> f32 { - if segment.max_doc() == 0 { - return 0f32; + fn segment_above_deletes_threshold(&self, segment: &SegmentMeta) -> bool { + match segment.max_doc() { + 0 => false, + _ => { + (segment.num_deleted_docs() as f32 / segment.max_doc() as f32) + > self.del_docs_ratio_before_merge + } + } } - segment.num_deleted_docs() as f32 / segment.max_doc() as f32 } impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - let size_sorted_segments = segments + // Filter for segments that have less than the target number of docs, count total unmerged + // docs, and sort in descending order + let mut unmerged_docs = 0; + let mut levels = segments .iter() - .filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32)) - .sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc())) - .collect::>(); - - if size_sorted_segments.is_empty() { - return vec![]; - } - - let mut current_max_log_size = f64::MAX; - let mut levels = vec![]; - for (_, merge_group) in &size_sorted_segments.into_iter().chunk_by(|segment| { - let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2(); - if segment_log_size < (current_max_log_size - self.level_log_size) { - // update current_max_log_size to create a new group - current_max_log_size = segment_log_size; + .map(|seg| (seg.num_docs() as usize, seg)) + .filter(|(docs, _)| *docs < self.target_segment_size) + .inspect(|(docs, _)| unmerged_docs += docs) + .sorted_by(|(a, _), (b, _)| b.cmp(a)) + .collect_vec(); + + // If there are enough unmerged documents to create a new segment of the target size, + // then create a merge candidate for them. + let mut candidates = Vec::new(); + if unmerged_docs >= self.target_segment_size { + let mut batch_docs = 0; + let mut batch = Vec::new(); + // Start with the smallest segments and add them to the batch until we reach the target + while let Some((docs, seg)) = levels.pop() { + batch_docs += docs; + batch.push(seg); + + // If the current batch has enough documents to be merged, create a merge + // candidate and push it to candidates + if batch_docs >= self.target_segment_size { + unmerged_docs -= batch_docs; + batch_docs = 0; + candidates.push(MergeCandidate( + // drain to reuse the buffer + batch.drain(..).map(|seg| seg.id()).collect(), + )); + // If there aren't enough documents to create another segment of the target size + // then break + if unmerged_docs <= self.target_segment_size { + break; + } + } } - // return current_max_log_size to be grouped to the current group - current_max_log_size - }) { - levels.push(merge_group.collect::>()); } + let mut current_max_log_size = f64::MAX; + let mut batch = Vec::new(); levels .iter() - .filter(|level| { - level.len() >= self.min_num_segments - || self.has_segment_above_deletes_threshold(level) + .chunk_by(|(docs, _)| { + let segment_log_size = f64::from(self.clip_min_size(*docs as u32)).log2(); + if segment_log_size < (current_max_log_size - self.level_log_size) { + // update current_max_log_size to create a new group + current_max_log_size = segment_log_size; + } + current_max_log_size }) - .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) - .collect() + .into_iter() + .for_each(|(_, group)| { + let mut hit_delete_threshold = false; + group.for_each(|(_, seg)| { + batch.push(seg.id()); + if !hit_delete_threshold && self.segment_above_deletes_threshold(seg) { + hit_delete_threshold = true; + } + }); + + if batch.len() >= self.min_num_segments || hit_delete_threshold { + candidates.push(MergeCandidate(std::mem::take(&mut batch))); + } else { + batch.clear(); + } + }); + + candidates } } @@ -131,7 +166,7 @@ impl Default for LogMergePolicy { fn default() -> LogMergePolicy { LogMergePolicy { min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE, - max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE, + target_segment_size: DEFAULT_TARGET_SEGMENT_SIZE, min_layer_size: DEFAULT_MIN_LAYER_SIZE, level_log_size: DEFAULT_LEVEL_LOG_SIZE, del_docs_ratio_before_merge: DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE, @@ -163,7 +198,7 @@ mod tests { { let mut log_merge_policy = LogMergePolicy::default(); log_merge_policy.set_min_num_segments(1); - log_merge_policy.set_max_docs_before_merge(1); + log_merge_policy.set_target_segment_size(1); log_merge_policy.set_min_layer_size(0); let mut index_writer = index.writer_for_tests()?; @@ -214,7 +249,7 @@ mod tests { fn test_merge_policy() -> LogMergePolicy { let mut log_merge_policy = LogMergePolicy::default(); log_merge_policy.set_min_num_segments(3); - log_merge_policy.set_max_docs_before_merge(100_000); + log_merge_policy.set_target_segment_size(100_000); log_merge_policy.set_min_layer_size(2); log_merge_policy } @@ -318,14 +353,64 @@ mod tests { create_random_segment_meta(1_500_000), ]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); - // Do not include large segments - assert_eq!(result_list.len(), 1); - assert_eq!(result_list[0].0.len(), 3); + // All segments at or above target size, so we expect nothing + assert!(result_list.is_empty()); + } - // Making sure merge policy points to the correct index of the original input - assert_eq!(result_list[0].0[0], test_input[2].id()); - assert_eq!(result_list[0].0[1], test_input[4].id()); - assert_eq!(result_list[0].0[2], test_input[5].id()); + #[test] + fn test_skip_merge_large_segments() { + // All of these should be merged into a single segment since 2 * 49_999 < 100_000 + let test_input_merge_all = vec![ + create_random_segment_meta(49_999), + create_random_segment_meta(49_999), + create_random_segment_meta(49_999), + ]; + + // Only two of these should be merged since 2 * 50_000 >= 100_000, then the third is left + let test_input_merge_two = vec![ + create_random_segment_meta(50_000), + create_random_segment_meta(50_000), + create_random_segment_meta(50_000), + ]; + + let result_list_merge_all = + test_merge_policy().compute_merge_candidates(&test_input_merge_all); + let result_list_merge_two = + test_merge_policy().compute_merge_candidates(&test_input_merge_two); + + assert_eq!(result_list_merge_all[0].0.len(), 3); + assert_eq!(result_list_merge_two[0].0.len(), 2); + } + + #[test] + fn test_skip_merge_small_segments() { + // Test that we skip log merges if there are enough unmerged documents to reach the target + // size + let test_input = vec![ + create_random_segment_meta(75_000), + create_random_segment_meta(75_000), + create_random_segment_meta(5_000), + create_random_segment_meta(5_000), + create_random_segment_meta(5_000), + create_random_segment_meta(5_000), + create_random_segment_meta(5_000), + ]; + + let result_list = test_merge_policy().compute_merge_candidates(&test_input); + + // Should have a single merge with all of the small segments and only one of the large + // segments + assert_eq!(result_list.len(), 1); + assert_eq!(result_list[0].0.len(), 6); + assert!(result_list[0].0.contains(&test_input[2].id())); + assert!(result_list[0].0.contains(&test_input[3].id())); + assert!(result_list[0].0.contains(&test_input[4].id())); + assert!(result_list[0].0.contains(&test_input[5].id())); + assert!(result_list[0].0.contains(&test_input[6].id())); + assert!( + result_list[0].0.contains(&test_input[0].id()) + || result_list[0].0.contains(&test_input[1].id()) + ); } #[test]