-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move stream items into tokio-stream
#3277
Conversation
This change removes all references to `Stream` from within the `tokio` crate and moves them into a new `tokio-stream` crate. Most types have had their `impl Stream` removed as well in-favor of their inherent methods. Closes #2870
rx: Rx, | ||
rx: Pin<Box<dyn Stream<Item = String> + Send>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This raises a question. The Rx
type is not used in a way that expects a Stream
, rather just in a way that expects something with a poll_next
method. Currently the impl Stream
is just removed, and the private poll_recv
method is not made public.
What approach should we be taking here? I kinda feel like we should be exposing the poll_recv
method, as the type supports it perfectly fine (there is only one receiver, so there are no intrusiveness concerns), but then we have both poll_next
and poll_recv
when we add back the Stream
impl in ≥ half a year.
(edit for future readers: a poll_recv
method has since been added)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this example need to use poll_*
APIs at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we should rewrite it.
You're missing the |
/// // Convert the channels to a `Stream`. | ||
/// let rx1 = Box::pin(async_stream::stream! { | ||
/// while let Some(item) = rx1.recv().await { | ||
/// yield item; | ||
/// } | ||
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to submit a proposal for how to improve this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you change?
// TODO: could probably avoid this, but why not. | ||
let mut rx = Box::pin(rx); | ||
|
||
rt.spawn(async move { while rx.next().await.is_some() {} }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid it, you could use tokio::pin!
, but it would need to be inside the spawned task. Not that it matters much as it is a test.
The main thing missing before I approve this is a review of the generated rustdoc, but I can't do this yet as it does not currently compile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Nothing major stands out. Left a few comments inline. @Darksonn also covered stuff 👍
@@ -381,10 +381,6 @@ cfg_signal_internal! { | |||
pub(crate) mod signal; | |||
} | |||
|
|||
cfg_stream! { | |||
pub mod stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Should we keep an empty module w/ documentation explaining the stream functionality will return once Stream
stabilizes in 1.0 and tokio-stream
is the current work around? This way, searching for stream
in the API docs will find something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to coordinate between the text in the tokio::stream
module and in tokio-stream/src/lib.rs
. Doesn't have to be this PR though.
//! [`tokio::io`]: crate::io | ||
//! [`AsyncRead`]: crate::io::AsyncRead | ||
//! [`AsyncWrite`]: crate::io::AsyncWrite | ||
//! [tokio-util]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The link is broken.
//! [tokio-util]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html | |
//! [`tokio-util`]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html |
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) | ||
))] | ||
#![cfg_attr(docsrs, feature(doc_cfg))] | ||
|
||
//! Stream utilities for Tokio. | ||
//! | ||
//! A `Stream` is an asynchronous sequence of values. It can be thought of as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below this line (can't select further down), the crate calls itself a module.
/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html | ||
/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should work as intra-doc links, no? The tokio-stream crates does depend on tokio.
@Darksonn had some additional comments, but they can be addressed later (even post 1.0). As this PR is blocking others, I am going to merge it 👍. Thanks! |
I think @Darksonn's comment from #3277 (comment) was lost -- we're missing an implementation for |
Nah, it's there: https://github.com/tokio-rs/tokio/blob/master/tokio-stream/src/wrappers/read_dir.rs The docs on docs.rs are just generated wrong in the current version. |
This change removes all references to
Stream
fromwithin the
tokio
crate and moves them into a newtokio-stream
crate. Most types have had theirimpl Stream
removed as well in-favor of theirinherent methods.
Closes #2870