broker_tokio/time/
delay.rs

1use crate::time::driver::Registration;
2use crate::time::{Duration, Instant};
3
4use std::future::Future;
5use std::pin::Pin;
6use std::task::{self, Poll};
7
8/// Wait until `deadline` is reached.
9///
10/// No work is performed while awaiting on the delay to complete. The delay
11/// operates at millisecond granularity and should not be used for tasks that
12/// require high-resolution timers.
13///
14/// # Cancellation
15///
16/// Canceling a delay is done by dropping the returned future. No additional
17/// cleanup work is required.
18pub fn delay_until(deadline: Instant) -> Delay {
19    let registration = Registration::new(deadline, Duration::from_millis(0));
20    Delay { registration }
21}
22
23/// Wait until `duration` has elapsed.
24///
25/// Equivalent to `delay_until(Instant::now() + duration)`. An asynchronous
26/// analog to `std::thread::sleep`.
27///
28/// No work is performed while awaiting on the delay to complete. The delay
29/// operates at millisecond granularity and should not be used for tasks that
30/// require high-resolution timers.
31///
32/// # Cancellation
33///
34/// Canceling a delay is done by dropping the returned future. No additional
35/// cleanup work is required.
36pub fn delay_for(duration: Duration) -> Delay {
37    delay_until(Instant::now() + duration)
38}
39
40/// Future returned by [`delay_until`](delay_until) and
41/// [`delay_for`](delay_for).
42#[derive(Debug)]
43#[must_use = "futures do nothing unless you `.await` or poll them"]
44pub struct Delay {
45    /// The link between the `Delay` instance and the timer that drives it.
46    ///
47    /// This also stores the `deadline` value.
48    registration: Registration,
49}
50
51impl Delay {
52    pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
53        let registration = Registration::new(deadline, duration);
54        Delay { registration }
55    }
56
57    /// Returns the instant at which the future will complete.
58    pub fn deadline(&self) -> Instant {
59        self.registration.deadline()
60    }
61
62    /// Returns true if the `Delay` has elapsed
63    ///
64    /// A `Delay` is elapsed when the requested duration has elapsed.
65    pub fn is_elapsed(&self) -> bool {
66        self.registration.is_elapsed()
67    }
68
69    /// Reset the `Delay` instance to a new deadline.
70    ///
71    /// Calling this function allows changing the instant at which the `Delay`
72    /// future completes without having to create new associated state.
73    ///
74    /// This function can be called both before and after the future has
75    /// completed.
76    pub fn reset(&mut self, deadline: Instant) {
77        self.registration.reset(deadline);
78    }
79}
80
81impl Future for Delay {
82    type Output = ();
83
84    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
85        // `poll_elapsed` can return an error in two cases:
86        //
87        // - AtCapacity: this is a pathlogical case where far too many
88        //   delays have been scheduled.
89        // - Shutdown: No timer has been setup, which is a mis-use error.
90        //
91        // Both cases are extremely rare, and pretty accurately fit into
92        // "logic errors", so we just panic in this case. A user couldn't
93        // really do much better if we passed the error onwards.
94        match ready!(self.registration.poll_elapsed(cx)) {
95            Ok(()) => Poll::Ready(()),
96            Err(e) => panic!("timer error: {}", e),
97        }
98    }
99}