Skip to content

Commit 68457b0

Browse files
committed
Add ParallelIterator::take_any_while and skip_any_while
These are like `Iterator::take_while` and `skip_while`, but the `any` names emphasize that they're not a straightforward parallelization, as they does not honor the original iterator order.
1 parent a0d0a50 commit 68457b0

File tree

5 files changed

+408
-0
lines changed

5 files changed

+408
-0
lines changed

src/iter/mod.rs

+72
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,13 @@ mod repeat;
142142
mod rev;
143143
mod skip;
144144
mod skip_any;
145+
mod skip_any_while;
145146
mod splitter;
146147
mod step_by;
147148
mod sum;
148149
mod take;
149150
mod take_any;
151+
mod take_any_while;
150152
mod try_fold;
151153
mod try_reduce;
152154
mod try_reduce_with;
@@ -188,10 +190,12 @@ pub use self::{
188190
rev::Rev,
189191
skip::Skip,
190192
skip_any::SkipAny,
193+
skip_any_while::SkipAnyWhile,
191194
splitter::{split, Split},
192195
step_by::StepBy,
193196
take::Take,
194197
take_any::TakeAny,
198+
take_any_while::TakeAnyWhile,
195199
try_fold::{TryFold, TryFoldWith},
196200
update::Update,
197201
while_some::WhileSome,
@@ -2248,6 +2252,74 @@ pub trait ParallelIterator: Sized + Send {
22482252
SkipAny::new(self, n)
22492253
}
22502254

2255+
/// Creates an iterator that takes elements from *anywhere* in the original iterator
2256+
/// until the given `predicate` returns `false`.
2257+
///
2258+
/// The `predicate` may be anything -- e.g. it could be checking a fact about the item, a
2259+
/// global condition unrelated to the item itself, or some combination thereof.
2260+
///
2261+
/// If parallel calls to the `predicate` race and give different results, then the
2262+
/// `true` results will still take those particular items, while respecting the `false`
2263+
/// result from elsewhere to skip any further items.
2264+
///
2265+
/// This is similar to [`Iterator::take_while`] without being constrained to the original
2266+
/// iterator order. The taken items will still maintain their relative order where that is
2267+
/// visible in `collect`, `reduce`, and similar outputs.
2268+
///
2269+
/// # Examples
2270+
///
2271+
/// ```
2272+
/// use rayon::prelude::*;
2273+
///
2274+
/// let result: Vec<_> = (0..100)
2275+
/// .into_par_iter()
2276+
/// .take_any_while(|x| *x < 50)
2277+
/// .collect();
2278+
///
2279+
/// assert!(result.len() <= 50);
2280+
/// assert!(result.windows(2).all(|w| w[0] < w[1]));
2281+
/// ```
2282+
fn take_any_while<P>(self, predicate: P) -> TakeAnyWhile<Self, P>
2283+
where
2284+
P: Fn(&Self::Item) -> bool + Sync + Send,
2285+
{
2286+
TakeAnyWhile::new(self, predicate)
2287+
}
2288+
2289+
/// Creates an iterator that skips elements from *anywhere* in the original iterator
2290+
/// until the given `predicate` returns `false`.
2291+
///
2292+
/// The `predicate` may be anything -- e.g. it could be checking a fact about the item, a
2293+
/// global condition unrelated to the item itself, or some combination thereof.
2294+
///
2295+
/// If parallel calls to the `predicate` race and give different results, then the
2296+
/// `true` results will still skip those particular items, while respecting the `false`
2297+
/// result from elsewhere to skip any further items.
2298+
///
2299+
/// This is similar to [`Iterator::skip_while`] without being constrained to the original
2300+
/// iterator order. The remaining items will still maintain their relative order where that is
2301+
/// visible in `collect`, `reduce`, and similar outputs.
2302+
///
2303+
/// # Examples
2304+
///
2305+
/// ```
2306+
/// use rayon::prelude::*;
2307+
///
2308+
/// let result: Vec<_> = (0..100)
2309+
/// .into_par_iter()
2310+
/// .skip_any_while(|x| *x < 50)
2311+
/// .collect();
2312+
///
2313+
/// assert!(result.len() >= 50);
2314+
/// assert!(result.windows(2).all(|w| w[0] < w[1]));
2315+
/// ```
2316+
fn skip_any_while<P>(self, predicate: P) -> SkipAnyWhile<Self, P>
2317+
where
2318+
P: Fn(&Self::Item) -> bool + Sync + Send,
2319+
{
2320+
SkipAnyWhile::new(self, predicate)
2321+
}
2322+
22512323
/// Internal method used to define the behavior of this parallel
22522324
/// iterator. You should not need to call this directly.
22532325
///

src/iter/skip_any_while.rs

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use super::plumbing::*;
2+
use super::*;
3+
use std::fmt;
4+
use std::sync::atomic::{AtomicBool, Ordering};
5+
6+
/// `SkipAnyWhile` is an iterator that skips over elements from anywhere in `I`
7+
/// until the callback returns `false`.
8+
/// This struct is created by the [`skip_any_while()`] method on [`ParallelIterator`]
9+
///
10+
/// [`skip_any_while()`]: trait.ParallelIterator.html#method.skip_any_while
11+
/// [`ParallelIterator`]: trait.ParallelIterator.html
12+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13+
#[derive(Clone)]
14+
pub struct SkipAnyWhile<I: ParallelIterator, P> {
15+
base: I,
16+
predicate: P,
17+
}
18+
19+
impl<I: ParallelIterator + fmt::Debug, P> fmt::Debug for SkipAnyWhile<I, P> {
20+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21+
f.debug_struct("SkipAnyWhile")
22+
.field("base", &self.base)
23+
.finish()
24+
}
25+
}
26+
27+
impl<I, P> SkipAnyWhile<I, P>
28+
where
29+
I: ParallelIterator,
30+
{
31+
/// Creates a new `SkipAnyWhile` iterator.
32+
pub(super) fn new(base: I, predicate: P) -> Self {
33+
SkipAnyWhile { base, predicate }
34+
}
35+
}
36+
37+
impl<I, P> ParallelIterator for SkipAnyWhile<I, P>
38+
where
39+
I: ParallelIterator,
40+
P: Fn(&I::Item) -> bool + Sync + Send,
41+
{
42+
type Item = I::Item;
43+
44+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
45+
where
46+
C: UnindexedConsumer<Self::Item>,
47+
{
48+
let consumer1 = SkipAnyWhileConsumer {
49+
base: consumer,
50+
predicate: &self.predicate,
51+
skipping: &AtomicBool::new(true),
52+
};
53+
self.base.drive_unindexed(consumer1)
54+
}
55+
}
56+
57+
/// ////////////////////////////////////////////////////////////////////////
58+
/// Consumer implementation
59+
60+
struct SkipAnyWhileConsumer<'p, C, P> {
61+
base: C,
62+
predicate: &'p P,
63+
skipping: &'p AtomicBool,
64+
}
65+
66+
impl<'p, T, C, P> Consumer<T> for SkipAnyWhileConsumer<'p, C, P>
67+
where
68+
C: Consumer<T>,
69+
P: Fn(&T) -> bool + Sync,
70+
{
71+
type Folder = SkipAnyWhileFolder<'p, C::Folder, P>;
72+
type Reducer = C::Reducer;
73+
type Result = C::Result;
74+
75+
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
76+
let (left, right, reducer) = self.base.split_at(index);
77+
(
78+
SkipAnyWhileConsumer { base: left, ..self },
79+
SkipAnyWhileConsumer {
80+
base: right,
81+
..self
82+
},
83+
reducer,
84+
)
85+
}
86+
87+
fn into_folder(self) -> Self::Folder {
88+
SkipAnyWhileFolder {
89+
base: self.base.into_folder(),
90+
predicate: self.predicate,
91+
skipping: self.skipping,
92+
}
93+
}
94+
95+
fn full(&self) -> bool {
96+
self.base.full()
97+
}
98+
}
99+
100+
impl<'p, T, C, P> UnindexedConsumer<T> for SkipAnyWhileConsumer<'p, C, P>
101+
where
102+
C: UnindexedConsumer<T>,
103+
P: Fn(&T) -> bool + Sync,
104+
{
105+
fn split_off_left(&self) -> Self {
106+
SkipAnyWhileConsumer {
107+
base: self.base.split_off_left(),
108+
..*self
109+
}
110+
}
111+
112+
fn to_reducer(&self) -> Self::Reducer {
113+
self.base.to_reducer()
114+
}
115+
}
116+
117+
struct SkipAnyWhileFolder<'p, C, P> {
118+
base: C,
119+
predicate: &'p P,
120+
skipping: &'p AtomicBool,
121+
}
122+
123+
fn skip<T>(item: &T, skipping: &AtomicBool, predicate: &impl Fn(&T) -> bool) -> bool {
124+
if !skipping.load(Ordering::Relaxed) {
125+
return false;
126+
}
127+
if predicate(item) {
128+
return true;
129+
}
130+
skipping.store(false, Ordering::Relaxed);
131+
false
132+
}
133+
134+
impl<'p, T, C, P> Folder<T> for SkipAnyWhileFolder<'p, C, P>
135+
where
136+
C: Folder<T>,
137+
P: Fn(&T) -> bool + 'p,
138+
{
139+
type Result = C::Result;
140+
141+
fn consume(mut self, item: T) -> Self {
142+
if !skip(&item, self.skipping, self.predicate) {
143+
self.base = self.base.consume(item);
144+
}
145+
self
146+
}
147+
148+
fn consume_iter<I>(mut self, iter: I) -> Self
149+
where
150+
I: IntoIterator<Item = T>,
151+
{
152+
self.base = self.base.consume_iter(
153+
iter.into_iter()
154+
.skip_while(move |x| skip(x, self.skipping, self.predicate)),
155+
);
156+
self
157+
}
158+
159+
fn complete(self) -> C::Result {
160+
self.base.complete()
161+
}
162+
163+
fn full(&self) -> bool {
164+
self.base.full()
165+
}
166+
}

0 commit comments

Comments
 (0)