Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: expose coop as a public module #7116

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ macro_rules! cfg_not_coop {
}
}

macro_rules! cfg_pub_if_rt {
($($(#[$meta:meta])* fn $($inner:tt)*)*) => {
$(
$(#[$meta])*
#[cfg(feature = "rt")]
pub fn $($inner)*

$(#[$meta])*
#[cfg(not(feature = "rt"))]
pub(crate) fn $($inner)*
)*
}
}

macro_rules! cfg_has_atomic_u64 {
($($item:item)*) => {
$(
Expand Down
103 changes: 94 additions & 9 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Yield points for improved cooperative scheduling.
//! Utilities for improved cooperative scheduling.
//!
//! Documentation for this can be found in the [`tokio::task`] module.
//!
//! [`tokio::task`]: crate::task.
//! See the "Cooperative scheduling" section in the [task](crate::task#cooperative-scheduling) module.
maminrayej marked this conversation as resolved.
Show resolved Hide resolved

// ```ignore
// # use tokio_stream::{Stream, StreamExt};
Expand Down Expand Up @@ -107,11 +105,98 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
f()
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
// If the current budget cannot be accessed due to the thread-local being
// shutdown, then we assume there is budget remaining.
context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
cfg_pub_if_rt! {
/// Returns `true` if there is still budget left on the task.
///
/// Futures created by the tokio library functions are budget-aware and yield when there is no more
/// budget left, but not all futures will be budget-aware. Consider future `A` that polls two inner
/// futures `B` and `C`, and returns `Poll::Ready` when one of them is ready. If both inner futures
/// were budget-aware, at some point the budget would be depleted which would cause both futures to
/// return `Poll::Pending` resulting in `A` returning `Poll::Pending` as well. Yielding all the way
/// back to the runtime, the budget would be reset, and `B` and `C` could make progress again.
/// Now let's consider `B` is budget-aware, but `C` is not and is always ready. The budget will be
/// depleted as before, but now since `C` always returns `Poll::Ready`, `A` will always return
/// `Poll::Ready` as well. This way, `A` will not yield back to the runtime which will keep the budget
/// depleted and `B` not making progress.
maminrayej marked this conversation as resolved.
Show resolved Hide resolved
///
/// In these scenarios you could use [`has_budget_remaining`] to check whether the budget has been depleted
/// or not and act accordingly:
/// ```
/// # use std::future::{poll_fn, Future};
/// # use std::task::{Context, Poll};
/// # use std::pin::{pin, Pin};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use tokio::task::consume_budget;
/// # use tokio::runtime::coop::has_budget_remaining;
/// #
/// # #[tokio::main]
/// # async fn main() {
/// struct Greedy;
/// struct Aware;
/// struct Combined {
/// greedy: Greedy,
/// aware: Aware,
/// }
///
/// impl Future for Greedy {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
/// Poll::Ready(())
/// }
/// }
///
/// impl Future for Aware {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// pin!(consume_budget()).poll(cx)
/// }
/// }
///
/// impl Future for Combined {
/// type Output = ();
///
/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let this = Pin::into_inner(self);
///
/// if !has_budget_remaining() {
/// return Poll::Pending;
/// }
///
/// if Pin::new(&mut this.aware).poll(cx).is_ready() {
/// return Poll::Ready(());
/// } else {
/// return Pin::new(&mut this.greedy).poll(cx);
/// }
/// }
/// }
///
/// let did_yield = AtomicBool::new(false);
///
/// while !did_yield.load(Ordering::Relaxed) {
/// poll_fn(|cx| {
/// let combined = pin!(Combined {
/// greedy: Greedy,
/// aware: Aware,
/// });
///
/// if combined.poll(cx).is_pending() {
/// did_yield.store(true, Ordering::Relaxed);
/// }
///
/// Poll::Ready(())
/// })
/// .await;
/// }
/// # }
///```
#[inline(always)]
fn has_budget_remaining() -> bool {
// If the current budget cannot be accessed due to the thread-local being
// shutdown, then we assume there is budget remaining.
context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
}
}

cfg_rt_multi_thread! {
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ mod tests;

pub(crate) mod context;

pub(crate) mod coop;
cfg_rt! {
pub mod coop;
}
cfg_not_rt! {
pub(crate) mod coop;
}

pub(crate) mod park;

Expand Down
17 changes: 17 additions & 0 deletions tokio/tests/coop_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::runtime::coop::has_budget_remaining;
use tokio::task::consume_budget;
maminrayej marked this conversation as resolved.
Show resolved Hide resolved

/// Ensure that UDP sockets have functional budgeting
///
Expand Down Expand Up @@ -76,3 +78,18 @@ async fn coop_budget_udp_send_recv() {

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

#[tokio::test]
async fn test_has_budget_remaining() {
const BUDGET: usize = 128;
maminrayej marked this conversation as resolved.
Show resolved Hide resolved

// At the begining budget should be available
assert!(has_budget_remaining());

// Deplete the budget
for _ in 0..BUDGET {
consume_budget().await;
}

assert!(!has_budget_remaining());
}
Loading