Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/collector/sort_key/sort_key_computer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::cmp::Ordering;

use crate::collector::ComparableDoc;
use crate::collector::sort_key::ReverseOrder;
use crate::collector::top_score_collector::push_assuming_capacity;
use crate::{DocId, Order, Result, Score, SegmentReader};

/// A `SegmentSortKeyComputer` makes it possible to modify the default score
Expand All @@ -23,7 +25,8 @@ pub trait SegmentSortKeyComputer: 'static {
/// Returns true if the `SegmentSortKeyComputer` is a good candidate for the lazy evaluation
/// optimization. See [`SegmentSortKeyComputer::accept_score_lazy`].
fn is_lazy() -> bool {
false
// TODO: Without this, we don't currently have test coverage for laziness.
true
}

/// Implementing this method makes it possible to avoid computing
Expand All @@ -43,9 +46,9 @@ pub trait SegmentSortKeyComputer: 'static {
threshold: &Self::SegmentSortKey,
) -> Option<(std::cmp::Ordering, Self::SegmentSortKey)> {
let excluded_ordering = if REVERSE_ORDER {
Ordering::Greater
} else {
Ordering::Less
} else {
Ordering::Greater
};
let sort_key = self.sort_key(doc_id, score);
let cmp = sort_key.partial_cmp(threshold).unwrap_or(excluded_ordering);
Expand All @@ -56,6 +59,36 @@ pub trait SegmentSortKeyComputer: 'static {
}
}

/// Similar to `accept_sort_key_lazy`, but pushes results directly into the given buffer.
///
/// The buffer must have at least enough capacity for `docs` matches, or this method will
/// panic.
fn accept_sort_key_block_lazy<const REVERSE_ORDER: bool>(
&mut self,
docs: &[DocId],
threshold: &Self::SegmentSortKey,
output: &mut Vec<ComparableDoc<Self::SegmentSortKey, DocId, REVERSE_ORDER>>,
) {
let excluded_ordering = if REVERSE_ORDER {
Ordering::Less
} else {
Ordering::Greater
};
for &doc in docs {
let sort_key = self.sort_key(doc, 0.0);
let cmp = sort_key.partial_cmp(threshold).unwrap_or(excluded_ordering);
if cmp != excluded_ordering {
push_assuming_capacity(
ComparableDoc {
sort_key,
doc,
},
output,
);
}
}
}

/// Convert a segment level sort key into the global sort key.
fn convert_segment_sort_key(&self, sort_key: Self::SegmentSortKey) -> Self::SortKey;
}
Expand Down
5 changes: 5 additions & 0 deletions src/collector/sort_key_top_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ where TSegmentSortKeyComputer: 'static + SegmentSortKeyComputer
.collect_lazy(doc, score, &mut self.segment_sort_key_computer);
}

fn collect_block(&mut self, docs: &[DocId]) {
self.segment_collector
.collect_block_lazy(docs, &mut self.segment_sort_key_computer);
}

fn harvest(self) -> Self::Fruit {
let segment_hits: Vec<(TSegmentSortKeyComputer::SegmentSortKey, DocAddress)> =
self.segment_collector.harvest();
Expand Down
9 changes: 9 additions & 0 deletions src/collector/top_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ impl<T: PartialOrd + Clone> TopSegmentCollector<T> {
self.topn_computer.push(feature, doc);
}

#[inline]
pub fn collect_block_lazy(
&mut self,
docs: &[DocId],
segment_scorer: &mut impl SegmentSortKeyComputer<SegmentSortKey = T>,
) {
self.topn_computer.push_block_lazy(docs, segment_scorer);
}

#[inline]
pub fn collect_lazy(
&mut self,
Expand Down
67 changes: 62 additions & 5 deletions src/collector/top_score_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,9 @@ where
/// Create a new `TopNComputer`.
/// Internally it will allocate a buffer of size `2 * top_n`.
pub fn new(top_n: usize) -> Self {
let vec_cap = top_n.max(1) * 2;
// We ensure that there is always enough space to include an entire block in the buffer if
// need be, so that `push_block_lazy` can avoid checking capacity inside its loop.
let vec_cap = (top_n.max(1) * 2) + crate::COLLECT_BLOCK_BUFFER_LEN;
TopNComputer {
buffer: Vec::with_capacity(vec_cap),
top_n,
Expand Down Expand Up @@ -775,6 +777,12 @@ where TScore: PartialOrd + Clone
else {
return;
};

if self.buffer.len() == self.buffer.capacity() {
let median = self.truncate_top_n();
self.threshold = Some(median);
}

push_assuming_capacity(
ComparableDoc {
sort_key: feature,
Expand All @@ -789,13 +797,62 @@ where TScore: PartialOrd + Clone
self.push(feature, doc);
return;
}

#[inline(always)]
pub(crate) fn push_block_lazy<
TSegmentSortKeyComputer: SegmentSortKeyComputer<SegmentSortKey = TScore>,
>(
&mut self,
docs: &[DocId],
score_tweaker: &mut TSegmentSortKeyComputer,
) {
// If the addition of this block might push us over capacity, start by truncating: our
// capacity is larger than 2*n + COLLECT_BLOCK_BUFFER_LEN, so this always makes enough room
// for the entire block (although some of the block might be eliminated).
if self.buffer.len() + docs.len() > self.buffer.capacity() {
let median = self.truncate_top_n();
self.threshold = Some(median);
}

if let Some(last_median) = self.threshold.clone() {
if TSegmentSortKeyComputer::is_lazy() {
// We validated at the top of the method that we have capacity.
score_tweaker.accept_sort_key_block_lazy::<REVERSE_ORDER>(docs, &last_median, &mut self.buffer);
return;
}

// Eagerly push, with a threshold to compare to.
for &doc in docs {
let sort_key = score_tweaker.sort_key(doc, 0.0);

if !REVERSE_ORDER && sort_key > last_median {
continue;
}
if REVERSE_ORDER && sort_key < last_median {
continue;
}

// We validated at the top of the method that we have capacity.
let comparable_doc = ComparableDoc { doc, sort_key };
push_assuming_capacity(comparable_doc, &mut self.buffer);
}
} else {
// Eagerly push, without a threshold to compare to.
for &doc in docs {
let sort_key = score_tweaker.sort_key(doc, 0.0);
// We validated at the top of the method that we have capacity.
let comparable_doc = ComparableDoc { doc, sort_key };
push_assuming_capacity(comparable_doc, &mut self.buffer);
}
}
}
}

// Push an element provided there is enough capacity to do so.
//
// Panics if there is not enough capacity to add an element.
#[inline(always)]
fn push_assuming_capacity<T>(el: T, buf: &mut Vec<T>) {
pub fn push_assuming_capacity<T>(el: T, buf: &mut Vec<T>) {
let prev_len = buf.len();
assert!(prev_len < buf.capacity());
// This is mimicking the current (non-stabilized) implementation in std.
Expand Down Expand Up @@ -1509,11 +1566,11 @@ mod tests {
#[test]
fn test_top_field_collect_string_prop(
order in prop_oneof!(Just(Order::Desc), Just(Order::Asc)),
limit in 1..256_usize,
offset in 0..256_usize,
limit in 1..32_usize,
offset in 0..32_usize,
segments_terms in
proptest::collection::vec(
proptest::collection::vec(0..32_u8, 1..32_usize),
proptest::collection::vec(0..64_u8, 1..256_usize),
0..8_usize,
)
) {
Expand Down