Skip to content

feat(tokio/macros): add biased mode to join! and try_join! #7307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 95 additions & 18 deletions tokio/src/macros/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ macro_rules! doc {
/// The supplied futures are stored inline and do not require allocating a
/// `Vec`.
///
/// ### Runtime characteristics
/// ## Runtime characteristics
///
/// By running all async expressions on the current task, the expressions are
/// able to run **concurrently** but not in **parallel**. This means all
Expand All @@ -32,6 +32,25 @@ macro_rules! doc {
///
/// [`tokio::spawn`]: crate::spawn
///
/// ## Fairness
///
/// By default, `join!`'s generated future rotates which contained
/// future is polled first whenever it is woken.
///
/// This behavior can be overridden by adding `biased;` to the beginning of the
/// macro usage. See the examples for details. This will cause `join` to poll
/// the futures in the order they appear from top to bottom.
///
/// You may want this if your futures may interact in a way where known polling order is significant.
///
/// But there is an important caveat to this mode. It becomes your responsibility
/// to ensure that the polling order of your futures is fair. If for example you
/// are joining a stream and a shutdown future, and the stream has a
/// huge volume of messages that takes a long time to finish processing per poll, you should
/// place the shutdown future earlier in the `join!` list to ensure that it is
/// always polled, and will not be delayed due to the stream future taking a long time to return
/// `Poll::Pending`.
Comment on lines +46 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that a shutdown future is an amazing example here. That makes sense for select!, but you wouldn't really have a shutdown future in join!. Not sure what a better example would be, though.

Copy link
Member Author

@jlizen jlizen May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a real-world example usage involving a shutdown signal: https://github.com/awslabs/aws-lambda-rust-runtime/blob/main/lambda-runtime/src/lib.rs#L227

The distinction here is that the other task is a server future, not a stream future, meaning it will generally keep running and generating new request futures. So we need to continue driving it. Even if the shutdown future will only ever come up once and then resolve, we still need to check it first every time.

I guess technically the server future will generally be dispatching work to read from sockets rather than processing the messages directly (ie not long polls as a concern), but anyway you want to shut down before dispatching that work.

Anyway I'll tweak wording from stream -> server and finesse it a bit more.

Copy link
Member Author

@jlizen jlizen May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also renaming the section Fairness -> Poll Ordering, it's not really about fairness anymore in the example given.

Copy link
Member Author

@jlizen jlizen May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I changed it back. My initial rework was basically restating that deterministic ordering is useful. But it's important for readers to consider the case that one future might take a long time to poll and starve the other, that was the point of the fairness discussion.

Based on the above lambda runtime link, I do think there are cases where you would be joining a shutdown future, so it's probably still relevant? Anyway I think it does a good job of illustrating the fairness risk even if the specific use case isn't the most universal?

Glad to keep thinking about it though if not sold and try to come up with another example.

///
/// # Examples
///
/// Basic join with two branches
Expand All @@ -54,6 +73,30 @@ macro_rules! doc {
/// // do something with the values
/// }
/// ```
///
/// Using the `biased;` mode to control polling order.
///
/// ```
/// async fn do_stuff_async() {
/// // async work
/// }
///
/// async fn more_async_work() {
/// // more here
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (first, second) = tokio::join!(
/// biased;
/// do_stuff_async(),
/// more_async_work()
/// );
///
/// // do something with the values
/// }
/// ```

#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
$join
Expand All @@ -62,12 +105,16 @@ macro_rules! doc {

#[cfg(doc)]
doc! {macro_rules! join {
($($future:expr),*) => { unimplemented!() }
($(biased;)? $($future:expr),*) => { unimplemented!() }
}}

#[cfg(not(doc))]
doc! {macro_rules! join {
(@ {
// Type of rotator that controls which inner future to start with
// when polling our output future.
rotator=$rotator:ty;

// One `_` for each branch in the `join!` macro. This is not used once
// normalization is complete.
( $($count:tt)* )
Expand Down Expand Up @@ -96,25 +143,19 @@ doc! {macro_rules! join {
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
let mut futures = &mut futures;

// Each time the future created by poll_fn is polled, a different future will be polled first
// to ensure every future passed to join! gets a chance to make progress even if
// one of the futures consumes the whole budget.
//
// This is number of futures that will be skipped in the first loop
// iteration the next time.
let mut skip_next_time: u32 = 0;
const COUNT: u32 = $($total)*;

poll_fn(move |cx| {
const COUNT: u32 = $($total)*;
// Each time the future created by poll_fn is polled, if not using biased mode,
// a different future is polled first to ensure every future passed to join!
// can make progress even if one of the futures consumes the whole budget.
let mut rotator = <$rotator>::default();

poll_fn(move |cx| {
let mut is_pending = false;

let mut to_run = COUNT;

// The number of futures that will be skipped in the first loop iteration.
let mut skip = skip_next_time;

skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
let mut skip = rotator.num_skip();

// This loop runs twice and the first `skip` futures
// are not polled in the first iteration.
Expand Down Expand Up @@ -164,15 +205,51 @@ doc! {macro_rules! join {

// ===== Normalize =====

(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
$crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
(@ { rotator=$rotator:ty; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
$crate::join!(@{ rotator=$rotator; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
};

// ===== Entry point =====
( biased; $($e:expr),+ $(,)?) => {
$crate::join!(@{ rotator=$crate::macros::support::BiasedRotator; () (0) } $($e,)*)
};

( $($e:expr),+ $(,)?) => {
$crate::join!(@{ () (0) } $($e,)*)
$crate::join!(@{ rotator=$crate::macros::support::Rotator<COUNT>; () (0) } $($e,)*)
};

(biased;) => { async {}.await };

() => { async {}.await }
}}

/// Rotates by one each [`Self::num_skip`] call up to COUNT - 1.
#[derive(Default, Debug)]
pub struct Rotator<const COUNT: u32> {
next: u32,
}

impl<const COUNT: u32> Rotator<COUNT> {
/// Rotates by one each [`Self::num_skip`] call up to COUNT - 1
#[inline]
pub fn num_skip(&mut self) -> u32 {
let num_skip = self.next;
self.next += 1;
if self.next == COUNT {
self.next = 0;
}
num_skip
}
}

/// [`Self::num_skip`] always returns 0.
#[derive(Default, Debug)]
pub struct BiasedRotator {}

impl BiasedRotator {
/// Always returns 0.
#[inline]
pub fn num_skip(&mut self) -> u32 {
0
}
}
2 changes: 2 additions & 0 deletions tokio/src/macros/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ cfg_macros! {

pub use std::future::poll_fn;

pub use crate::macros::join::{BiasedRotator, Rotator};

#[doc(hidden)]
pub fn thread_rng_n(n: u32) -> u32 {
crate::runtime::context::thread_rng_n(n)
Expand Down
91 changes: 72 additions & 19 deletions tokio/src/macros/try_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ macro_rules! doc {
/// The supplied futures are stored inline and do not require allocating a
/// `Vec`.
///
/// ### Runtime characteristics
/// ## Runtime characteristics
///
/// By running all async expressions on the current task, the expressions are
/// able to run **concurrently** but not in **parallel**. This means all
Expand All @@ -30,6 +30,25 @@ macro_rules! doc {
///
/// [`tokio::spawn`]: crate::spawn
///
/// ## Fairness
///
/// By default, `try_join!`'s generated future rotates which
/// contained future is polled first whenever it is woken.
///
/// This behavior can be overridden by adding `biased;` to the beginning of the
/// macro usage. See the examples for details. This will cause `try_join` to poll
/// the futures in the order they appear from top to bottom.
///
/// You may want this if your futures may interact in a way where known polling order is significant.
///
/// But there is an important caveat to this mode. It becomes your responsibility
/// to ensure that the polling order of your futures is fair. If for example you
/// are joining a stream and a shutdown future, and the stream has a
/// huge volume of messages that takes a long time to finish processing per poll, you should
/// place the shutdown future earlier in the `try_join!` list to ensure that it is
/// always polled, and will not be delayed due to the stream future taking a long time to return
/// `Poll::Pending`.
///
/// # Examples
///
/// Basic `try_join` with two branches.
Expand Down Expand Up @@ -100,6 +119,37 @@ macro_rules! doc {
/// }
/// }
/// ```
/// Using the `biased;` mode to control polling order.
///
/// ```
/// async fn do_stuff_async() -> Result<(), &'static str> {
/// // async work
/// # Ok(())
/// }
///
/// async fn more_async_work() -> Result<(), &'static str> {
/// // more here
/// # Ok(())
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let res = tokio::try_join!(
/// biased;
/// do_stuff_async(),
/// more_async_work()
/// );
///
/// match res {
/// Ok((first, second)) => {
/// // do something with the values
/// }
/// Err(err) => {
/// println!("processing failed; error = {}", err);
/// }
/// }
/// }
/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
$try_join
Expand All @@ -108,12 +158,16 @@ macro_rules! doc {

#[cfg(doc)]
doc! {macro_rules! try_join {
($($future:expr),*) => { unimplemented!() }
($(biased;)? $($future:expr),*) => { unimplemented!() }
}}

#[cfg(not(doc))]
doc! {macro_rules! try_join {
(@ {
// Type of rotator that controls which inner future to start with
// when polling our output future.
rotator=$rotator:ty;

// One `_` for each branch in the `try_join!` macro. This is not used once
// normalization is complete.
( $($count:tt)* )
Expand Down Expand Up @@ -142,25 +196,19 @@ doc! {macro_rules! try_join {
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
let mut futures = &mut futures;

// Each time the future created by poll_fn is polled, a different future will be polled first
// to ensure every future passed to join! gets a chance to make progress even if
// one of the futures consumes the whole budget.
//
// This is number of futures that will be skipped in the first loop
// iteration the next time.
let mut skip_next_time: u32 = 0;
const COUNT: u32 = $($total)*;

poll_fn(move |cx| {
const COUNT: u32 = $($total)*;
// Each time the future created by poll_fn is polled, if not using biased mode,
// a different future is polled first to ensure every future passed to try_join!
// can make progress even if one of the futures consumes the whole budget.
let mut rotator = <$rotator>::default();

poll_fn(move |cx| {
let mut is_pending = false;

let mut to_run = COUNT;

// The number of futures that will be skipped in the first loop iteration
let mut skip = skip_next_time;

skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
// The number of futures that will be skipped in the first loop iteration.
let mut skip = rotator.num_skip();

// This loop runs twice and the first `skip` futures
// are not polled in the first iteration.
Expand Down Expand Up @@ -216,15 +264,20 @@ doc! {macro_rules! try_join {

// ===== Normalize =====

(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
$crate::try_join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
(@ { rotator=$rotator:ty; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
$crate::try_join!(@{ rotator=$rotator; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
};

// ===== Entry point =====
( biased; $($e:expr),+ $(,)?) => {
$crate::try_join!(@{ rotator=$crate::macros::support::BiasedRotator; () (0) } $($e,)*)
};

( $($e:expr),+ $(,)?) => {
$crate::try_join!(@{ () (0) } $($e,)*)
$crate::try_join!(@{ rotator=$crate::macros::support::Rotator<COUNT>; () (0) } $($e,)*)
};

(biased;) => { async { Ok(()) }.await };

() => { async { Ok(()) }.await }
}}
Loading