wasm_timer/timer/delay.rs
1//! Support for creating futures that represent timeouts.
2//!
3//! This module contains the `Delay` type which is a future that will resolve
4//! at a particular point in the future.
5
6use std::fmt;
7use std::future::Future;
8use std::io;
9use std::pin::Pin;
10use std::sync::atomic::AtomicUsize;
11use std::sync::atomic::Ordering::SeqCst;
12use std::sync::{Arc, Mutex};
13use std::task::{Context, Poll};
14use std::time::Duration;
15
16use futures::task::AtomicWaker;
17
18use crate::Instant;
19use crate::timer::arc_list::Node;
20use crate::timer::{ScheduledTimer, TimerHandle};
21
22/// A future representing the notification that an elapsed duration has
23/// occurred.
24///
25/// This is created through the `Delay::new` or `Delay::new_at` methods
26/// indicating when the future should fire at. Note that these futures are not
27/// intended for high resolution timers, but rather they will likely fire some
28/// granularity after the exact instant that they're otherwise indicated to
29/// fire at.
30pub struct Delay {
31 state: Option<Arc<Node<ScheduledTimer>>>,
32 when: Instant,
33}
34
35impl Delay {
36 /// Creates a new future which will fire at `dur` time into the future.
37 ///
38 /// The returned object will be bound to the default timer for this thread.
39 /// The default timer will be spun up in a helper thread on first use.
40 #[inline]
41 pub fn new(dur: Duration) -> Delay {
42 Delay::new_at(Instant::now() + dur)
43 }
44
45 /// Creates a new future which will fire at the time specified by `at`.
46 ///
47 /// The returned object will be bound to the default timer for this thread.
48 /// The default timer will be spun up in a helper thread on first use.
49 #[inline]
50 pub fn new_at(at: Instant) -> Delay {
51 Delay::new_handle(at, Default::default())
52 }
53
54 /// Creates a new future which will fire at the time specified by `at`.
55 ///
56 /// The returned instance of `Delay` will be bound to the timer specified by
57 /// the `handle` argument.
58 pub fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
59 let inner = match handle.inner.upgrade() {
60 Some(i) => i,
61 None => {
62 return Delay {
63 state: None,
64 when: at,
65 }
66 }
67 };
68 let state = Arc::new(Node::new(ScheduledTimer {
69 at: Mutex::new(Some(at)),
70 state: AtomicUsize::new(0),
71 waker: AtomicWaker::new(),
72 inner: handle.inner,
73 slot: Mutex::new(None),
74 }));
75
76 // If we fail to actually push our node then we've become an inert
77 // timer, meaning that we'll want to immediately return an error from
78 // `poll`.
79 if inner.list.push(&state).is_err() {
80 return Delay {
81 state: None,
82 when: at,
83 };
84 }
85
86 inner.waker.wake();
87 Delay {
88 state: Some(state),
89 when: at,
90 }
91 }
92
93 /// Resets this timeout to an new timeout which will fire at the time
94 /// specified by `dur`.
95 ///
96 /// This is equivalent to calling `reset_at` with `Instant::now() + dur`
97 #[inline]
98 pub fn reset(&mut self, dur: Duration) {
99 self.reset_at(Instant::now() + dur)
100 }
101
102 /// Resets this timeout to an new timeout which will fire at the time
103 /// specified by `at`.
104 ///
105 /// This method is usable even of this instance of `Delay` has "already
106 /// fired". That is, if this future has resovled, calling this method means
107 /// that the future will still re-resolve at the specified instant.
108 ///
109 /// If `at` is in the past then this future will immediately be resolved
110 /// (when `poll` is called).
111 ///
112 /// Note that if any task is currently blocked on this future then that task
113 /// will be dropped. It is required to call `poll` again after this method
114 /// has been called to ensure tha ta task is blocked on this future.
115 #[inline]
116 pub fn reset_at(&mut self, at: Instant) {
117 self.when = at;
118 if self._reset(at).is_err() {
119 self.state = None
120 }
121 }
122
123 fn _reset(&mut self, at: Instant) -> Result<(), ()> {
124 let state = match self.state {
125 Some(ref state) => state,
126 None => return Err(()),
127 };
128 if let Some(timeouts) = state.inner.upgrade() {
129 let mut bits = state.state.load(SeqCst);
130 loop {
131 // If we've been invalidated, cancel this reset
132 if bits & 0b10 != 0 {
133 return Err(());
134 }
135 let new = bits.wrapping_add(0b100) & !0b11;
136 match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
137 Ok(_) => break,
138 Err(s) => bits = s,
139 }
140 }
141 *state.at.lock().unwrap() = Some(at);
142 // If we fail to push our node then we've become an inert timer, so
143 // we'll want to clear our `state` field accordingly
144 timeouts.list.push(state)?;
145 timeouts.waker.wake();
146 }
147
148 Ok(())
149 }
150}
151
152#[inline]
153pub fn fires_at(timeout: &Delay) -> Instant {
154 timeout.when
155}
156
157impl Future for Delay {
158 type Output = io::Result<()>;
159
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 let state = match self.state {
162 Some(ref state) => state,
163 None => {
164 let err = Err(io::Error::new(io::ErrorKind::Other, "timer has gone away"));
165 return Poll::Ready(err);
166 }
167 };
168
169 if state.state.load(SeqCst) & 1 != 0 {
170 return Poll::Ready(Ok(()));
171 }
172
173 state.waker.register(&cx.waker());
174
175 // Now that we've registered, do the full check of our own internal
176 // state. If we've fired the first bit is set, and if we've been
177 // invalidated the second bit is set.
178 match state.state.load(SeqCst) {
179 n if n & 0b01 != 0 => Poll::Ready(Ok(())),
180 n if n & 0b10 != 0 => Poll::Ready(Err(io::Error::new(
181 io::ErrorKind::Other,
182 "timer has gone away",
183 ))),
184 _ => Poll::Pending,
185 }
186 }
187}
188
189impl Drop for Delay {
190 fn drop(&mut self) {
191 let state = match self.state {
192 Some(ref s) => s,
193 None => return,
194 };
195 if let Some(timeouts) = state.inner.upgrade() {
196 *state.at.lock().unwrap() = None;
197 if timeouts.list.push(state).is_ok() {
198 timeouts.waker.wake();
199 }
200 }
201 }
202}
203
204impl fmt::Debug for Delay {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
206 f.debug_struct("Delay").field("when", &self.when).finish()
207 }
208}