Skip to content

Commit ecd7cb1

Browse files
authored
Rollup merge of #79023 - yoshuawuyts:stream, r=KodrAus
Add `core::stream::Stream` [[Tracking issue: #79024](#79024)] This patch adds the `core::stream` submodule and implements `core::stream::Stream` in accordance with [RFC2996](rust-lang/rfcs#2996). The RFC hasn't been merged yet, but as requested by the libs team in rust-lang/rfcs#2996 (comment) I'm filing this PR to get the ball rolling. ## Documentatation The docs in this PR have been adapted from [`std::iter`](https://doc.rust-lang.org/std/iter/index.html), [`async_std::stream`](https://docs.rs/async-std/1.7.0/async_std/stream/index.html), and [`futures::stream::Stream`](https://docs.rs/futures/0.3.8/futures/stream/trait.Stream.html). Once this PR lands my plan is to follow this up with PRs to add helper methods such as `stream::repeat` which can be used to document more of the concepts that are currently missing. That will allow us to cover concepts such as "infinite streams" and "laziness" in more depth. ## Feature gate The feature gate for `Stream` is `stream_trait`. This matches the `#[lang = "future_trait"]` attribute name. The intention is that only the APIs defined in RFC2996 will use this feature gate, with future additions such as `stream::repeat` using their own feature gates. This is so we can ensure a smooth path towards stabilizing the `Stream` trait without needing to stabilize all the APIs in `core::stream` at once. But also don't start expanding the API until _after_ stabilization, as was the case with `std::future`. __edit:__ the feature gate has been changed to `async_stream` to match the feature gate proposed in the RFC. ## Conclusion This PR introduces `core::stream::{Stream, Next}` and re-exports it from `std` as `std::stream::{Stream, Next}`. Landing `Stream` in the stdlib has been a mult-year process; and it's incredibly exciting for this to finally happen! --- r? `````@KodrAus````` cc/ `````@rust-lang/wg-async-foundations````` `````@rust-lang/libs`````
2 parents 0248c6f + a1b1132 commit ecd7cb1

File tree

7 files changed

+271
-0
lines changed

7 files changed

+271
-0
lines changed

library/alloc/src/boxed.rs

+14
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ use core::ops::{
149149
};
150150
use core::pin::Pin;
151151
use core::ptr::{self, Unique};
152+
use core::stream::Stream;
152153
use core::task::{Context, Poll};
153154

154155
use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
@@ -1621,3 +1622,16 @@ where
16211622
F::poll(Pin::new(&mut *self), cx)
16221623
}
16231624
}
1625+
1626+
#[unstable(feature = "async_stream", issue = "79024")]
1627+
impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
1628+
type Item = S::Item;
1629+
1630+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1631+
Pin::new(&mut **self).poll_next(cx)
1632+
}
1633+
1634+
fn size_hint(&self) -> (usize, Option<usize>) {
1635+
(**self).size_hint()
1636+
}
1637+
}

library/alloc/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
#![feature(array_windows)]
8383
#![feature(allow_internal_unstable)]
8484
#![feature(arbitrary_self_types)]
85+
#![feature(async_stream)]
8586
#![feature(box_patterns)]
8687
#![feature(box_syntax)]
8788
#![feature(cfg_sanitize)]

library/core/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ pub mod panicking;
254254
pub mod pin;
255255
pub mod raw;
256256
pub mod result;
257+
#[unstable(feature = "async_stream", issue = "79024")]
258+
pub mod stream;
257259
pub mod sync;
258260

259261
pub mod fmt;

library/core/src/stream/mod.rs

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//! Composable asynchronous iteration.
2+
//!
3+
//! If futures are asynchronous values, then streams are asynchronous
4+
//! iterators. If you've found yourself with an asynchronous collection of some kind,
5+
//! and needed to perform an operation on the elements of said collection,
6+
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
7+
//! asynchronous Rust code, so it's worth becoming familiar with them.
8+
//!
9+
//! Before explaining more, let's talk about how this module is structured:
10+
//!
11+
//! # Organization
12+
//!
13+
//! This module is largely organized by type:
14+
//!
15+
//! * [Traits] are the core portion: these traits define what kind of streams
16+
//! exist and what you can do with them. The methods of these traits are worth
17+
//! putting some extra study time into.
18+
//! * Functions provide some helpful ways to create some basic streams.
19+
//! * Structs are often the return types of the various methods on this
20+
//! module's traits. You'll usually want to look at the method that creates
21+
//! the `struct`, rather than the `struct` itself. For more detail about why,
22+
//! see '[Implementing Stream](#implementing-stream)'.
23+
//!
24+
//! [Traits]: #traits
25+
//!
26+
//! That's it! Let's dig into streams.
27+
//!
28+
//! # Stream
29+
//!
30+
//! The heart and soul of this module is the [`Stream`] trait. The core of
31+
//! [`Stream`] looks like this:
32+
//!
33+
//! ```
34+
//! # use core::task::{Context, Poll};
35+
//! # use core::pin::Pin;
36+
//! trait Stream {
37+
//! type Item;
38+
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
39+
//! }
40+
//! ```
41+
//!
42+
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
43+
//! method which is used when implementing a `Stream`, and a (to-be-implemented)
44+
//! `next` method which is used when consuming a stream. Consumers of `Stream`
45+
//! only need to consider `next`, which when called, returns a future which
46+
//! yields `Option<Stream::Item>`.
47+
//!
48+
//! The future returned by `next` will yield `Some(Item)` as long as there are
49+
//! elements, and once they've all been exhausted, will yield `None` to indicate
50+
//! that iteration is finished. If we're waiting on something asynchronous to
51+
//! resolve, the future will wait until the stream is ready to yield again.
52+
//!
53+
//! Individual streams may choose to resume iteration, and so calling `next`
54+
//! again may or may not eventually yield `Some(Item)` again at some point.
55+
//!
56+
//! [`Stream`]'s full definition includes a number of other methods as well,
57+
//! but they are default methods, built on top of [`poll_next`], and so you get
58+
//! them for free.
59+
//!
60+
//! [`Poll`]: super::task::Poll
61+
//! [`poll_next`]: Stream::poll_next
62+
//!
63+
//! # Implementing Stream
64+
//!
65+
//! Creating a stream of your own involves two steps: creating a `struct` to
66+
//! hold the stream's state, and then implementing [`Stream`] for that
67+
//! `struct`.
68+
//!
69+
//! Let's make a stream named `Counter` which counts from `1` to `5`:
70+
//!
71+
//! ```no_run
72+
//! #![feature(async_stream)]
73+
//! # use core::stream::Stream;
74+
//! # use core::task::{Context, Poll};
75+
//! # use core::pin::Pin;
76+
//!
77+
//! // First, the struct:
78+
//!
79+
//! /// A stream which counts from one to five
80+
//! struct Counter {
81+
//! count: usize,
82+
//! }
83+
//!
84+
//! // we want our count to start at one, so let's add a new() method to help.
85+
//! // This isn't strictly necessary, but is convenient. Note that we start
86+
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
87+
//! impl Counter {
88+
//! fn new() -> Counter {
89+
//! Counter { count: 0 }
90+
//! }
91+
//! }
92+
//!
93+
//! // Then, we implement `Stream` for our `Counter`:
94+
//!
95+
//! impl Stream for Counter {
96+
//! // we will be counting with usize
97+
//! type Item = usize;
98+
//!
99+
//! // poll_next() is the only required method
100+
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101+
//! // Increment our count. This is why we started at zero.
102+
//! self.count += 1;
103+
//!
104+
//! // Check to see if we've finished counting or not.
105+
//! if self.count < 6 {
106+
//! Poll::Ready(Some(self.count))
107+
//! } else {
108+
//! Poll::Ready(None)
109+
//! }
110+
//! }
111+
//! }
112+
//! ```
113+
//!
114+
//! # Laziness
115+
//!
116+
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
117+
//! whole lot. Nothing really happens until you call `next`. This is sometimes a
118+
//! source of confusion when creating a stream solely for its side effects. The
119+
//! compiler will warn us about this kind of behavior:
120+
//!
121+
//! ```text
122+
//! warning: unused result that must be used: streams do nothing unless polled
123+
//! ```
124+
125+
mod stream;
126+
127+
pub use stream::Stream;

library/core/src/stream/stream/mod.rs

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use crate::ops::DerefMut;
2+
use crate::pin::Pin;
3+
use crate::task::{Context, Poll};
4+
5+
/// An interface for dealing with asynchronous iterators.
6+
///
7+
/// This is the main stream trait. For more about the concept of streams
8+
/// generally, please see the [module-level documentation]. In particular, you
9+
/// may want to know how to [implement `Stream`][impl].
10+
///
11+
/// [module-level documentation]: index.html
12+
/// [impl]: index.html#implementing-stream
13+
#[unstable(feature = "async_stream", issue = "79024")]
14+
#[must_use = "streams do nothing unless polled"]
15+
pub trait Stream {
16+
/// The type of items yielded by the stream.
17+
type Item;
18+
19+
/// Attempt to pull out the next value of this stream, registering the
20+
/// current task for wakeup if the value is not yet available, and returning
21+
/// `None` if the stream is exhausted.
22+
///
23+
/// # Return value
24+
///
25+
/// There are several possible return values, each indicating a distinct
26+
/// stream state:
27+
///
28+
/// - `Poll::Pending` means that this stream's next value is not ready
29+
/// yet. Implementations will ensure that the current task will be notified
30+
/// when the next value may be ready.
31+
///
32+
/// - `Poll::Ready(Some(val))` means that the stream has successfully
33+
/// produced a value, `val`, and may produce further values on subsequent
34+
/// `poll_next` calls.
35+
///
36+
/// - `Poll::Ready(None)` means that the stream has terminated, and
37+
/// `poll_next` should not be invoked again.
38+
///
39+
/// # Panics
40+
///
41+
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
42+
/// `poll_next` method again may panic, block forever, or cause other kinds of
43+
/// problems; the `Stream` trait places no requirements on the effects of
44+
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
45+
/// Rust's usual rules apply: calls must never cause undefined behavior
46+
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
47+
/// regardless of the stream's state.
48+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
49+
50+
/// Returns the bounds on the remaining length of the stream.
51+
///
52+
/// Specifically, `size_hint()` returns a tuple where the first element
53+
/// is the lower bound, and the second element is the upper bound.
54+
///
55+
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
56+
/// A [`None`] here means that either there is no known upper bound, or the
57+
/// upper bound is larger than [`usize`].
58+
///
59+
/// # Implementation notes
60+
///
61+
/// It is not enforced that a stream implementation yields the declared
62+
/// number of elements. A buggy stream may yield less than the lower bound
63+
/// or more than the upper bound of elements.
64+
///
65+
/// `size_hint()` is primarily intended to be used for optimizations such as
66+
/// reserving space for the elements of the stream, but must not be
67+
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
68+
/// implementation of `size_hint()` should not lead to memory safety
69+
/// violations.
70+
///
71+
/// That said, the implementation should provide a correct estimation,
72+
/// because otherwise it would be a violation of the trait's protocol.
73+
///
74+
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
75+
/// stream.
76+
#[inline]
77+
fn size_hint(&self) -> (usize, Option<usize>) {
78+
(0, None)
79+
}
80+
}
81+
82+
#[unstable(feature = "async_stream", issue = "79024")]
83+
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
84+
type Item = S::Item;
85+
86+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87+
S::poll_next(Pin::new(&mut **self), cx)
88+
}
89+
90+
fn size_hint(&self) -> (usize, Option<usize>) {
91+
(**self).size_hint()
92+
}
93+
}
94+
95+
#[unstable(feature = "async_stream", issue = "79024")]
96+
impl<P> Stream for Pin<P>
97+
where
98+
P: DerefMut + Unpin,
99+
P::Target: Stream,
100+
{
101+
type Item = <P::Target as Stream>::Item;
102+
103+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104+
self.get_mut().as_mut().poll_next(cx)
105+
}
106+
107+
fn size_hint(&self) -> (usize, Option<usize>) {
108+
(**self).size_hint()
109+
}
110+
}

library/std/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@
224224
#![feature(allocator_internals)]
225225
#![feature(allow_internal_unsafe)]
226226
#![feature(allow_internal_unstable)]
227+
#![feature(async_stream)]
227228
#![feature(arbitrary_self_types)]
228229
#![feature(array_error_internals)]
229230
#![feature(asm)]
@@ -450,6 +451,8 @@ pub use core::ptr;
450451
pub use core::raw;
451452
#[stable(feature = "rust1", since = "1.0.0")]
452453
pub use core::result;
454+
#[unstable(feature = "async_stream", issue = "79024")]
455+
pub use core::stream;
453456
#[stable(feature = "i128", since = "1.26.0")]
454457
#[allow(deprecated, deprecated_in_future)]
455458
pub use core::u128;

library/std/src/panic.rs

+14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::panicking;
1212
use crate::pin::Pin;
1313
use crate::ptr::{NonNull, Unique};
1414
use crate::rc::Rc;
15+
use crate::stream::Stream;
1516
use crate::sync::atomic;
1617
use crate::sync::{Arc, Mutex, RwLock};
1718
use crate::task::{Context, Poll};
@@ -340,6 +341,19 @@ impl<F: Future> Future for AssertUnwindSafe<F> {
340341
}
341342
}
342343

344+
#[unstable(feature = "async_stream", issue = "79024")]
345+
impl<S: Stream> Stream for AssertUnwindSafe<S> {
346+
type Item = S::Item;
347+
348+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
349+
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
350+
}
351+
352+
fn size_hint(&self) -> (usize, Option<usize>) {
353+
self.0.size_hint()
354+
}
355+
}
356+
343357
/// Invokes a closure, capturing the cause of an unwinding panic if one occurs.
344358
///
345359
/// This function will return `Ok` with the closure's result if the closure

0 commit comments

Comments
 (0)