Skip to content

Commit 9730317

Browse files
bdonlanBryan Donlan
andauthored
time: move DelayQueue to tokio-util (#2897)
This change is intended to do the minimum to unblock 0.3; as such, for now, we duplicate the internal `time::wheel` structures in tokio-util, rather than trying to refactor things at this stage. Co-authored-by: Bryan Donlan <[email protected]>
1 parent 02311dc commit 9730317

File tree

10 files changed

+674
-28
lines changed

10 files changed

+674
-28
lines changed

tokio-util/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ publish = false
2525
default = []
2626

2727
# Shorthand for enabling everything
28-
full = ["codec", "compat", "io"]
28+
full = ["codec", "compat", "io", "time"]
2929

3030
compat = ["futures-io",]
3131
codec = ["tokio/stream"]
32+
time = ["tokio/time","slab"]
3233
io = []
3334

3435
[dependencies]
@@ -40,6 +41,7 @@ futures-sink = "0.3.0"
4041
futures-io = { version = "0.3.0", optional = true }
4142
log = "0.4"
4243
pin-project-lite = "0.1.4"
44+
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
4345

4446
[dev-dependencies]
4547
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }

tokio-util/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ pub mod sync;
5353

5454
pub mod either;
5555

56+
#[cfg(feature = "time")]
57+
pub mod time;
58+
5659
#[cfg(any(feature = "io", feature = "codec"))]
5760
mod util {
5861
use tokio::io::{AsyncRead, ReadBuf};

tokio/src/time/delay_queue.rs renamed to tokio-util/src/time/delay_queue.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
//! [`DelayQueue`]: struct@DelayQueue
66
77
use crate::time::wheel::{self, Wheel};
8-
use crate::time::{sleep_until, Delay, Duration, Error, Instant};
8+
9+
use futures_core::ready;
10+
use tokio::time::{sleep_until, Delay, Duration, Error, Instant};
911

1012
use slab::Slab;
1113
use std::cmp;
@@ -50,8 +52,8 @@ use std::task::{self, Poll};
5052
///
5153
/// # Implementation
5254
///
53-
/// The [`DelayQueue`] is backed by a separate instance of the same timer wheel used internally by
54-
/// Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
55+
/// The [`DelayQueue`] is backed by a separate instance of a timer wheel similar to that used internally
56+
/// by Tokio's standalone timer utilities such as [`sleep`]. Because of this, it offers the same
5557
/// performance and scalability benefits.
5658
///
5759
/// State associated with each entry is stored in a [`slab`]. This amortizes the cost of allocation,
@@ -65,7 +67,8 @@ use std::task::{self, Poll};
6567
/// Using `DelayQueue` to manage cache entries.
6668
///
6769
/// ```rust,no_run
68-
/// use tokio::time::{delay_queue, DelayQueue, Error};
70+
/// use tokio::time::Error;
71+
/// use tokio_util::time::{DelayQueue, delay_queue};
6972
///
7073
/// use futures::ready;
7174
/// use std::collections::HashMap;
@@ -118,7 +121,7 @@ use std::task::{self, Poll};
118121
/// [`poll_expired`]: method@Self::poll_expired
119122
/// [`Stream::poll_expired`]: method@Self::poll_expired
120123
/// [`DelayQueue`]: struct@DelayQueue
121-
/// [`sleep`]: fn@super::sleep
124+
/// [`sleep`]: fn@tokio::time::sleep
122125
/// [`slab`]: slab
123126
/// [`capacity`]: method@Self::capacity
124127
/// [`reserve`]: method@Self::reserve
@@ -210,7 +213,7 @@ impl<T> DelayQueue<T> {
210213
/// # Examples
211214
///
212215
/// ```rust
213-
/// # use tokio::time::DelayQueue;
216+
/// # use tokio_util::time::DelayQueue;
214217
/// let delay_queue: DelayQueue<u32> = DelayQueue::new();
215218
/// ```
216219
pub fn new() -> DelayQueue<T> {
@@ -226,7 +229,7 @@ impl<T> DelayQueue<T> {
226229
/// # Examples
227230
///
228231
/// ```rust
229-
/// # use tokio::time::DelayQueue;
232+
/// # use tokio_util::time::DelayQueue;
230233
/// # use std::time::Duration;
231234
///
232235
/// # #[tokio::main]
@@ -281,7 +284,8 @@ impl<T> DelayQueue<T> {
281284
/// Basic usage
282285
///
283286
/// ```rust
284-
/// use tokio::time::{DelayQueue, Duration, Instant};
287+
/// use tokio::time::{Duration, Instant};
288+
/// use tokio_util::time::DelayQueue;
285289
///
286290
/// # #[tokio::main]
287291
/// # async fn main() {
@@ -391,7 +395,7 @@ impl<T> DelayQueue<T> {
391395
/// Basic usage
392396
///
393397
/// ```rust
394-
/// use tokio::time::DelayQueue;
398+
/// use tokio_util::time::DelayQueue;
395399
/// use std::time::Duration;
396400
///
397401
/// # #[tokio::main]
@@ -460,7 +464,7 @@ impl<T> DelayQueue<T> {
460464
/// Basic usage
461465
///
462466
/// ```rust
463-
/// use tokio::time::DelayQueue;
467+
/// use tokio_util::time::DelayQueue;
464468
/// use std::time::Duration;
465469
///
466470
/// # #[tokio::main]
@@ -503,7 +507,8 @@ impl<T> DelayQueue<T> {
503507
/// Basic usage
504508
///
505509
/// ```rust
506-
/// use tokio::time::{DelayQueue, Duration, Instant};
510+
/// use tokio::time::{Duration, Instant};
511+
/// use tokio_util::time::DelayQueue;
507512
///
508513
/// # #[tokio::main]
509514
/// # async fn main() {
@@ -559,7 +564,7 @@ impl<T> DelayQueue<T> {
559564
/// Basic usage
560565
///
561566
/// ```rust
562-
/// use tokio::time::DelayQueue;
567+
/// use tokio_util::time::DelayQueue;
563568
/// use std::time::Duration;
564569
///
565570
/// # #[tokio::main]
@@ -589,7 +594,7 @@ impl<T> DelayQueue<T> {
589594
/// # Examples
590595
///
591596
/// ```rust
592-
/// use tokio::time::DelayQueue;
597+
/// use tokio_util::time::DelayQueue;
593598
/// use std::time::Duration;
594599
///
595600
/// # #[tokio::main]
@@ -617,7 +622,7 @@ impl<T> DelayQueue<T> {
617622
/// # Examples
618623
///
619624
/// ```rust
620-
/// use tokio::time::DelayQueue;
625+
/// use tokio_util::time::DelayQueue;
621626
///
622627
/// let delay_queue: DelayQueue<i32> = DelayQueue::with_capacity(10);
623628
/// assert_eq!(delay_queue.capacity(), 10);
@@ -631,7 +636,7 @@ impl<T> DelayQueue<T> {
631636
/// # Examples
632637
///
633638
/// ```rust
634-
/// use tokio::time::DelayQueue;
639+
/// use tokio_util::time::DelayQueue;
635640
/// use std::time::Duration;
636641
///
637642
/// # #[tokio::main]
@@ -666,7 +671,7 @@ impl<T> DelayQueue<T> {
666671
/// # Examples
667672
///
668673
/// ```
669-
/// use tokio::time::DelayQueue;
674+
/// use tokio_util::time::DelayQueue;
670675
/// use std::time::Duration;
671676
///
672677
/// # #[tokio::main]
@@ -691,7 +696,7 @@ impl<T> DelayQueue<T> {
691696
/// # Examples
692697
///
693698
/// ```
694-
/// use tokio::time::DelayQueue;
699+
/// use tokio_util::time::DelayQueue;
695700
/// use std::time::Duration;
696701
///
697702
/// # #[tokio::main]

tokio-util/src/time/mod.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//! Additional utilities for tracking time.
2+
//!
3+
//! This module provides additional utilities for executing code after a set period
4+
//! of time. Currently there is only one:
5+
//!
6+
//! * `DelayQueue`: A queue where items are returned once the requested delay
7+
//! has expired.
8+
//!
9+
//! This type must be used from within the context of the `Runtime`.
10+
11+
use std::time::Duration;
12+
13+
mod wheel;
14+
15+
#[doc(inline)]
16+
pub mod delay_queue;
17+
18+
pub use delay_queue::DelayQueue;
19+
20+
// ===== Internal utils =====
21+
22+
enum Round {
23+
Up,
24+
Down,
25+
}
26+
27+
/// Convert a `Duration` to milliseconds, rounding up and saturating at
28+
/// `u64::MAX`.
29+
///
30+
/// The saturating is fine because `u64::MAX` milliseconds are still many
31+
/// million years.
32+
#[inline]
33+
fn ms(duration: Duration, round: Round) -> u64 {
34+
const NANOS_PER_MILLI: u32 = 1_000_000;
35+
const MILLIS_PER_SEC: u64 = 1_000;
36+
37+
// Round up.
38+
let millis = match round {
39+
Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI,
40+
Round::Down => duration.subsec_millis(),
41+
};
42+
43+
duration
44+
.as_secs()
45+
.saturating_mul(MILLIS_PER_SEC)
46+
.saturating_add(u64::from(millis))
47+
}

0 commit comments

Comments
 (0)