Skip to content

Commit 34cdcc7

Browse files
authored
macros: docs about select! alternatives (#7110)
1 parent 8e13417 commit 34cdcc7

File tree

2 files changed

+148
-0
lines changed

2 files changed

+148
-0
lines changed

tokio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
133133
futures = { version = "0.3.0", features = ["async-await"] }
134134
mockall = "0.11.1"
135135
async-stream = "0.3"
136+
futures-concurrency = "7.6.3"
136137

137138
[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
138139
socket2 = "0.5.5"

tokio/src/macros/select.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,153 @@ macro_rules! doc {
398398
/// }
399399
/// }
400400
/// ```
401+
/// # Alternatives from the Ecosystem
402+
///
403+
/// The `select!` macro is a powerful tool for managing multiple asynchronous
404+
/// branches, enabling tasks to run concurrently within the same thread. However,
405+
/// its use can introduce challenges, particularly around cancellation safety, which
406+
/// can lead to subtle and hard-to-debug errors. For many use cases, ecosystem
407+
/// alternatives may be preferable as they mitigate these concerns by offering
408+
/// clearer syntax, more predictable control flow, and reducing the need to manually
409+
/// handle issues like fuse semantics or cancellation safety.
410+
///
411+
/// ## Merging Streams
412+
///
413+
/// For cases where `loop { select! { ... } }` is used to poll multiple tasks,
414+
/// stream merging offers a concise alternative, inherently handle cancellation-safe
415+
/// processing, removing the risk of data loss. Libraries such as [`tokio_stream`],
416+
/// [`futures::stream`] and [`futures_concurrency`] provide tools for merging
417+
/// streams and handling their outputs sequentially.
418+
///
419+
/// [`tokio_stream`]: https://docs.rs/tokio-stream/latest/tokio_stream/
420+
/// [`futures::stream`]: https://docs.rs/futures/latest/futures/stream/
421+
/// [`futures_concurrency`]: https://docs.rs/futures-concurrency/latest/futures_concurrency/
422+
///
423+
/// ### Example with `select!`
424+
///
425+
/// ```
426+
/// struct File;
427+
/// struct Channel;
428+
/// struct Socket;
429+
///
430+
/// impl Socket {
431+
/// async fn read_packet(&mut self) -> Vec<u8> {
432+
/// vec![]
433+
/// }
434+
/// }
435+
///
436+
/// async fn read_send(_file: &mut File, _channel: &mut Channel) {
437+
/// // do work that is not cancel safe
438+
/// }
439+
///
440+
/// #[tokio::main]
441+
/// async fn main() {
442+
/// // open our IO types
443+
/// let mut file = File;
444+
/// let mut channel = Channel;
445+
/// let mut socket = Socket;
446+
///
447+
/// loop {
448+
/// tokio::select! {
449+
/// _ = read_send(&mut file, &mut channel) => { /* ... */ },
450+
/// _data = socket.read_packet() => { /* ... */ }
451+
/// _ = futures::future::ready(()) => break
452+
/// }
453+
/// }
454+
/// }
455+
///
456+
/// ```
457+
///
458+
/// ### Moving to `merge`
459+
///
460+
/// By using merge, you can unify multiple asynchronous tasks into a single stream,
461+
/// eliminating the need to manage tasks manually and reducing the risk of
462+
/// unintended behavior like data loss.
463+
///
464+
/// ```
465+
/// use std::pin::pin;
466+
///
467+
/// use futures::stream::unfold;
468+
/// use tokio_stream::StreamExt;
469+
///
470+
/// struct File;
471+
/// struct Channel;
472+
/// struct Socket;
473+
///
474+
/// impl Socket {
475+
/// async fn read_packet(&mut self) -> Vec<u8> {
476+
/// vec![]
477+
/// }
478+
/// }
479+
///
480+
/// async fn read_send(_file: &mut File, _channel: &mut Channel) {
481+
/// // do work that is not cancel safe
482+
/// }
483+
///
484+
/// enum Message {
485+
/// Stop,
486+
/// Sent,
487+
/// Data(Vec<u8>),
488+
/// }
489+
///
490+
/// #[tokio::main]
491+
/// async fn main() {
492+
/// // open our IO types
493+
/// let file = File;
494+
/// let channel = Channel;
495+
/// let socket = Socket;
496+
///
497+
/// let a = unfold((file, channel), |(mut file, mut channel)| async {
498+
/// read_send(&mut file, &mut channel).await;
499+
/// Some((Message::Sent, (file, channel)))
500+
/// });
501+
/// let b = unfold(socket, |mut socket| async {
502+
/// let data = socket.read_packet().await;
503+
/// Some((Message::Data(data), socket))
504+
/// });
505+
/// let c = tokio_stream::iter([Message::Stop]);
506+
///
507+
/// let mut s = pin!(a.merge(b).merge(c));
508+
/// while let Some(msg) = s.next().await {
509+
/// match msg {
510+
/// Message::Data(_data) => { /* ... */ }
511+
/// Message::Sent => continue,
512+
/// Message::Stop => break,
513+
/// }
514+
/// }
515+
/// }
516+
/// ```
517+
///
518+
/// ## Racing Futures
519+
///
520+
/// If you need to wait for the first completion among several asynchronous tasks,
521+
/// ecosystem utilities such as
522+
/// [`futures`](https://docs.rs/futures/latest/futures/),
523+
/// [`futures-lite`](https://docs.rs/futures-lite/latest/futures_lite/) or
524+
/// [`futures-concurrency`](https://docs.rs/futures-concurrency/latest/futures_concurrency/)
525+
/// provide streamlined syntax for racing futures:
526+
///
527+
/// - [`futures_concurrency::future::Race`](https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Race.html)
528+
/// - [`futures::select`](https://docs.rs/futures/latest/futures/macro.select.html)
529+
/// - [`futures::stream::select_all`](https://docs.rs/futures/latest/futures/stream/select_all/index.html) (for streams)
530+
/// - [`futures_lite::future::or`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.or.html)
531+
/// - [`futures_lite::future::race`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.race.html)
532+
///
533+
/// ```
534+
/// use futures_concurrency::future::Race;
535+
///
536+
/// #[tokio::main]
537+
/// async fn main() {
538+
/// let task_a = async { Ok("ok") };
539+
/// let task_b = async { Err("error") };
540+
/// let result = (task_a, task_b).race().await;
541+
///
542+
/// match result {
543+
/// Ok(output) => println!("First task completed with: {output}"),
544+
/// Err(err) => eprintln!("Error occurred: {err}"),
545+
/// }
546+
/// }
547+
/// ```
401548
#[macro_export]
402549
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
403550
$select

0 commit comments

Comments
 (0)