futures_timer/native/
delay.rs

1//! Support for creating futures that represent timeouts.
2//!
3//! This module contains the `Delay` type which is a future that will resolve
4//! at a particular point in the future.
5
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::SeqCst;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15use super::arc_list::Node;
16use super::AtomicWaker;
17use super::{ScheduledTimer, TimerHandle};
18
19/// A future representing the notification that an elapsed duration has
20/// occurred.
21///
22/// This is created through the `Delay::new` method indicating when the future should fire.
23/// Note that these futures are not intended for high resolution timers, but rather they will
24/// likely fire some granularity after the exact instant that they're otherwise indicated to fire
25/// at.
26pub struct Delay {
27    state: Option<Arc<Node<ScheduledTimer>>>,
28}
29
30impl Delay {
31    /// Creates a new future which will fire at `dur` time into the future.
32    ///
33    /// The returned object will be bound to the default timer for this thread.
34    /// The default timer will be spun up in a helper thread on first use.
35    #[inline]
36    pub fn new(dur: Duration) -> Delay {
37        Delay::new_handle(Instant::now() + dur, Default::default())
38    }
39
40    /// Creates a new future which will fire at the time specified by `at`.
41    ///
42    /// The returned instance of `Delay` will be bound to the timer specified by
43    /// the `handle` argument.
44    pub(crate) fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
45        let inner = match handle.inner.upgrade() {
46            Some(i) => i,
47            None => return Delay { state: None },
48        };
49        let state = Arc::new(Node::new(ScheduledTimer {
50            at: Mutex::new(Some(at)),
51            state: AtomicUsize::new(0),
52            waker: AtomicWaker::new(),
53            inner: handle.inner,
54            slot: Mutex::new(None),
55        }));
56
57        // If we fail to actually push our node then we've become an inert
58        // timer, meaning that we'll want to immediately return an error from
59        // `poll`.
60        if inner.list.push(&state).is_err() {
61            return Delay { state: None };
62        }
63
64        inner.waker.wake();
65        Delay { state: Some(state) }
66    }
67
68    /// Resets this timeout to an new timeout which will fire at the time
69    /// specified by `at`.
70    #[inline]
71    pub fn reset(&mut self, dur: Duration) {
72        if self._reset(dur).is_err() {
73            self.state = None
74        }
75    }
76
77    fn _reset(&mut self, dur: Duration) -> Result<(), ()> {
78        let state = match self.state {
79            Some(ref state) => state,
80            None => return Err(()),
81        };
82        if let Some(timeouts) = state.inner.upgrade() {
83            let mut bits = state.state.load(SeqCst);
84            loop {
85                // If we've been invalidated, cancel this reset
86                if bits & 0b10 != 0 {
87                    return Err(());
88                }
89                let new = bits.wrapping_add(0b100) & !0b11;
90                match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
91                    Ok(_) => break,
92                    Err(s) => bits = s,
93                }
94            }
95            *state.at.lock().unwrap() = Some(Instant::now() + dur);
96            // If we fail to push our node then we've become an inert timer, so
97            // we'll want to clear our `state` field accordingly
98            timeouts.list.push(state)?;
99            timeouts.waker.wake();
100        }
101
102        Ok(())
103    }
104}
105
106impl Future for Delay {
107    type Output = ();
108
109    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110        let state = match self.state {
111            Some(ref state) => state,
112            None => panic!("timer has gone away"),
113        };
114
115        if state.state.load(SeqCst) & 1 != 0 {
116            return Poll::Ready(());
117        }
118
119        state.waker.register(cx.waker());
120
121        // Now that we've registered, do the full check of our own internal
122        // state. If we've fired the first bit is set, and if we've been
123        // invalidated the second bit is set.
124        match state.state.load(SeqCst) {
125            n if n & 0b01 != 0 => Poll::Ready(()),
126            n if n & 0b10 != 0 => panic!("timer has gone away"),
127            _ => Poll::Pending,
128        }
129    }
130}
131
132impl Drop for Delay {
133    fn drop(&mut self) {
134        let state = match self.state {
135            Some(ref s) => s,
136            None => return,
137        };
138        if let Some(timeouts) = state.inner.upgrade() {
139            *state.at.lock().unwrap() = None;
140            if timeouts.list.push(state).is_ok() {
141                timeouts.waker.wake();
142            }
143        }
144    }
145}
146
147impl fmt::Debug for Delay {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
149        f.debug_struct("Delay").finish()
150    }
151}