futures_timer/native/
delay.rs1use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::AtomicUsize;
10use std::sync::atomic::Ordering::SeqCst;
11use std::sync::{Arc, Mutex};
12use std::task::{Context, Poll};
13use std::time::{Duration, Instant};
14
15use super::arc_list::Node;
16use super::AtomicWaker;
17use super::{ScheduledTimer, TimerHandle};
18
19pub struct Delay {
27 state: Option<Arc<Node<ScheduledTimer>>>,
28}
29
30impl Delay {
31 #[inline]
36 pub fn new(dur: Duration) -> Delay {
37 Delay::new_handle(Instant::now() + dur, Default::default())
38 }
39
40 pub(crate) fn new_handle(at: Instant, handle: TimerHandle) -> Delay {
45 let inner = match handle.inner.upgrade() {
46 Some(i) => i,
47 None => return Delay { state: None },
48 };
49 let state = Arc::new(Node::new(ScheduledTimer {
50 at: Mutex::new(Some(at)),
51 state: AtomicUsize::new(0),
52 waker: AtomicWaker::new(),
53 inner: handle.inner,
54 slot: Mutex::new(None),
55 }));
56
57 if inner.list.push(&state).is_err() {
61 return Delay { state: None };
62 }
63
64 inner.waker.wake();
65 Delay { state: Some(state) }
66 }
67
68 #[inline]
71 pub fn reset(&mut self, dur: Duration) {
72 if self._reset(dur).is_err() {
73 self.state = None
74 }
75 }
76
77 fn _reset(&mut self, dur: Duration) -> Result<(), ()> {
78 let state = match self.state {
79 Some(ref state) => state,
80 None => return Err(()),
81 };
82 if let Some(timeouts) = state.inner.upgrade() {
83 let mut bits = state.state.load(SeqCst);
84 loop {
85 if bits & 0b10 != 0 {
87 return Err(());
88 }
89 let new = bits.wrapping_add(0b100) & !0b11;
90 match state.state.compare_exchange(bits, new, SeqCst, SeqCst) {
91 Ok(_) => break,
92 Err(s) => bits = s,
93 }
94 }
95 *state.at.lock().unwrap() = Some(Instant::now() + dur);
96 timeouts.list.push(state)?;
99 timeouts.waker.wake();
100 }
101
102 Ok(())
103 }
104}
105
106impl Future for Delay {
107 type Output = ();
108
109 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110 let state = match self.state {
111 Some(ref state) => state,
112 None => panic!("timer has gone away"),
113 };
114
115 if state.state.load(SeqCst) & 1 != 0 {
116 return Poll::Ready(());
117 }
118
119 state.waker.register(cx.waker());
120
121 match state.state.load(SeqCst) {
125 n if n & 0b01 != 0 => Poll::Ready(()),
126 n if n & 0b10 != 0 => panic!("timer has gone away"),
127 _ => Poll::Pending,
128 }
129 }
130}
131
132impl Drop for Delay {
133 fn drop(&mut self) {
134 let state = match self.state {
135 Some(ref s) => s,
136 None => return,
137 };
138 if let Some(timeouts) = state.inner.upgrade() {
139 *state.at.lock().unwrap() = None;
140 if timeouts.list.push(state).is_ok() {
141 timeouts.waker.wake();
142 }
143 }
144 }
145}
146
147impl fmt::Debug for Delay {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
149 f.debug_struct("Delay").finish()
150 }
151}