wasm_timer/timer/
delay.rs

1//! Support for creating futures that represent timeouts.
2//!
3//! This module contains the `Delay` type which is a future that will resolve
4//! at a particular point in the future.
5
6use std::fmt;
7use std::future::Future;
8use std::io;
9use std::pin::Pin;
10use std::sync::atomic::AtomicUsize;
11use std::sync::atomic::Ordering::SeqCst;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use futures::task::AtomicWaker;
17
18use crate::Instant;
19use crate::timer::arc_list::Node;
20use crate::timer::{ScheduledTimer, TimerHandle};
21
22/// A future representing the notification that an elapsed duration has
23/// occurred.
24///
25/// This is created through the `Delay::new` or `Delay::new_at` methods
26/// indicating when the future should fire at.  Note that these futures are not
27/// intended for high resolution timers, but rather they will likely fire some
28/// granularity after the exact instant that they're otherwise indicated to
29/// fire at.
30pub struct Delay {
31    state: Option<Arc<Node<ScheduledTimer>>>,
32    when: Instant,
33}
34
35impl Delay {
36    /// Creates a new future which will fire at `dur` time into the future.
37    ///
38    /// The returned object will be bound to the default timer for this thread.
39    /// The default timer will be spun up in a helper thread on first use.
40    #[inline]
41    pub fn new(dur: Duration) -> Delay {
42        Delay::new_at(Instant::now() + dur)
43    }
44
45    /// Creates a new future which will fire at the time specified by `at`.
46    ///
47    /// The returned object will be bound to the default timer for this thread.
48    /// The default timer will be spun up in a helper thread on first use.
49    #[inline]
50    pub fn new_at(at: Instant) -> Delay {
51        Delay::new_handle(at, Default::default())
52    }
53
54    /// Creates a new future which will fire at the time specified by `at`.
55    ///
56    /// The returned instance of `Delay` will be bound to the timer specified by
57    /// the `handle` argument.
58    pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
59        let inner = match handle.inner.upgrade() {
60            Some(i) => i,
61            None => {
62                return Delay {
63                    state: None,
64                    when: at,
65                }
66            }
67        };
68        let state = Arc::new(Node::new(ScheduledTimer {
69            at: Mutex::new(Some(at)),
70            state: AtomicUsize::new(0),
71            waker: AtomicWaker::new(),
72            inner: handle.inner,
73            slot: Mutex::new(None),
74        }));
75
76        // If we fail to actually push our node then we've become an inert
77        // timer, meaning that we'll want to immediately return an error from
78        // `poll`.
79        if inner.list.push(&state).is_err() {
80            return Delay {
81                state: None,
82                when: at,
83            };
84        }
85
86        inner.waker.wake();
87        Delay {
88            state: Some(state),
89            when: at,
90        }
91    }
92
93    /// Resets this timeout to an new timeout which will fire at the time
94    /// specified by `dur`.
95    ///
96    /// This is equivalent to calling `reset_at` with `Instant::now() + dur`
97    #[inline]
98    pub fn reset(&mut self, dur: Duration) {
99        self.reset_at(Instant::now() + dur)
100    }
101
102    /// Resets this timeout to an new timeout which will fire at the time
103    /// specified by `at`.
104    ///
105    /// This method is usable even of this instance of `Delay` has "already
106    /// fired". That is, if this future has resovled, calling this method means
107    /// that the future will still re-resolve at the specified instant.
108    ///
109    /// If `at` is in the past then this future will immediately be resolved
110    /// (when `poll` is called).
111    ///
112    /// Note that if any task is currently blocked on this future then that task
113    /// will be dropped. It is required to call `poll` again after this method
114    /// has been called to ensure tha ta task is blocked on this future.
115    #[inline]
116    pub fn reset_at(&mut self, at: Instant) {
117        self.when = at;
118        if self._reset(at).is_err() {
119            self.state = None
120        }
121    }
122
123    fn _reset(&mut self, at: Instant) -> Result<(), ()> {
124        let state = match self.state {
125            Some(ref state) => state,
126            None => return Err(()),
127        };
128        if let Some(timeouts) = state.inner.upgrade() {
129            let mut bits = state.state.load(SeqCst);
130            loop {
131                // If we've been invalidated, cancel this reset
132                if bits & 0b10 != 0 {
133                    return Err(());
134                }
135                let new = bits.wrapping_add(0b100) & !0b11;
136                match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
137                    Ok(_) => break,
138                    Err(s) => bits = s,
139                }
140            }
141            *state.at.lock().unwrap() = Some(at);
142            // If we fail to push our node then we've become an inert timer, so
143            // we'll want to clear our `state` field accordingly
144            timeouts.list.push(state)?;
145            timeouts.waker.wake();
146        }
147
148        Ok(())
149    }
150}
151
152#[inline]
153pub fn fires_at(timeout: &Delay) -> Instant {
154    timeout.when
155}
156
157impl Future for Delay {
158    type Output = io::Result<()>;
159
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        let state = match self.state {
162            Some(ref state) => state,
163            None => {
164                let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
165                return Poll::Ready(err);
166            }
167        };
168
169        if state.state.load(SeqCst) & 1 != 0 {
170            return Poll::Ready(Ok(()));
171        }
172
173        state.waker.register(&cx.waker());
174
175        // Now that we've registered, do the full check of our own internal
176        // state. If we've fired the first bit is set, and if we've been
177        // invalidated the second bit is set.
178        match state.state.load(SeqCst) {
179            n if n & 0b01 != 0 => Poll::Ready(Ok(())),
180            n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
181                io::ErrorKind::Other,
182                "timer has gone away",
183            ))),
184            _ => Poll::Pending,
185        }
186    }
187}
188
189impl Drop for Delay {
190    fn drop(&mut self) {
191        let state = match self.state {
192            Some(ref s) => s,
193            None => return,
194        };
195        if let Some(timeouts) = state.inner.upgrade() {
196            *state.at.lock().unwrap() = None;
197            if timeouts.list.push(state).is_ok() {
198                timeouts.waker.wake();
199            }
200        }
201    }
202}
203
204impl fmt::Debug for Delay {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
206        f.debug_struct("Delay").field("when", &self.when).finish()
207    }
208}