broker_tokio/time/mod.rs
1//! Utilities for tracking time.
2//!
3//! This module provides a number of types for executing code after a set period
4//! of time.
5//!
6//! * `Delay` is a future that does no work and completes at a specific `Instant`
7//! in time.
8//!
9//! * `Interval` is a stream yielding a value at a fixed period. It is
10//! initialized with a `Duration` and repeatedly yields each time the duration
11//! elapses.
12//!
13//! * `Timeout`: Wraps a future or stream, setting an upper bound to the amount
14//! of time it is allowed to execute. If the future or stream does not
15//! complete in time, then it is canceled and an error is returned.
16//!
17//! * `DelayQueue`: A queue where items are returned once the requested delay
18//! has expired.
19//!
20//! These types are sufficient for handling a large number of scenarios
21//! involving time.
22//!
23//! These types must be used from within the context of the `Runtime`.
24//!
25//! # Examples
26//!
27//! Wait 100ms and print "Hello World!"
28//!
29//! ```
30//! use tokio::time::delay_for;
31//!
32//! use std::time::Duration;
33//!
34//!
35//! #[tokio::main]
36//! async fn main() {
37//! delay_for(Duration::from_millis(100)).await;
38//! println!("100 ms have elapsed");
39//! }
40//! ```
41//!
42//! Require that an operation takes no more than 300ms. Note that this uses the
43//! `timeout` function on the `FutureExt` trait. This trait is included in the
44//! prelude.
45//!
46//! ```
47//! use tokio::time::{timeout, Duration};
48//!
49//! async fn long_future() {
50//! // do work here
51//! }
52//!
53//! # async fn dox() {
54//! let res = timeout(Duration::from_secs(1), long_future()).await;
55//!
56//! if res.is_err() {
57//! println!("operation timed out");
58//! }
59//! # }
60//! ```
61
62mod clock;
63pub(crate) use self::clock::Clock;
64#[cfg(feature = "test-util")]
65pub use clock::{advance, pause, resume};
66
67pub mod delay_queue;
68#[doc(inline)]
69pub use delay_queue::DelayQueue;
70
71mod delay;
72pub use delay::{delay_for, delay_until, Delay};
73
74pub(crate) mod driver;
75
76mod error;
77pub use error::Error;
78
79mod instant;
80pub use self::instant::Instant;
81
82mod interval;
83pub use interval::{interval, interval_at, Interval};
84
85mod timeout;
86#[doc(inline)]
87pub use timeout::{timeout, timeout_at, Elapsed, Timeout};
88
89cfg_stream! {
90 mod throttle;
91 pub use throttle::{throttle, Throttle};
92}
93
94mod wheel;
95
96#[cfg(test)]
97#[cfg(not(loom))]
98mod tests;
99
100// Re-export for convenience
101pub use std::time::Duration;
102
103// ===== Internal utils =====
104
105enum Round {
106 Up,
107 Down,
108}
109
110/// Convert a `Duration` to milliseconds, rounding up and saturating at
111/// `u64::MAX`.
112///
113/// The saturating is fine because `u64::MAX` milliseconds are still many
114/// million years.
115#[inline]
116fn ms(duration: Duration, round: Round) -> u64 {
117 const NANOS_PER_MILLI: u32 = 1_000_000;
118 const MILLIS_PER_SEC: u64 = 1_000;
119
120 // Round up.
121 let millis = match round {
122 Round::Up => (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI,
123 Round::Down => duration.subsec_millis(),
124 };
125
126 duration
127 .as_secs()
128 .saturating_mul(MILLIS_PER_SEC)
129 .saturating_add(u64::from(millis))
130}