1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4 time::Duration,
5};
6
7use futures::{ready, Future, Stream};
8use pin_project::pin_project;
9
10use crate::{
11 time::{Sleep, TimersHandle, GLOBAL_TIMER},
12 MissedTickBehavior,
13};
14
15use super::Instant;
16
17pub fn interval(period: Duration) -> Interval {
18 Interval::new(&GLOBAL_TIMER, Instant::now(), period, "unknown interval")
19}
20
21pub fn interval_at(start: Instant, period: Duration) -> Interval {
22 Interval::new(&GLOBAL_TIMER, start, period, "unknown interval")
23}
24
25#[pin_project]
27#[derive(Debug)]
28pub struct Interval {
29 sleep: Pin<Box<Sleep>>,
30 period: Duration,
31 missed_tick_behavior: MissedTickBehavior,
32}
33
34impl Interval {
35 pub fn new(
37 handle: &TimersHandle,
38 start: Instant,
39 period: Duration,
40 label: &'static str,
41 ) -> Self {
42 Self {
43 sleep: Box::pin(Sleep::new(handle, start, label)),
44 period,
45 missed_tick_behavior: MissedTickBehavior::Burst,
46 }
47 }
48
49 pub async fn tick(&mut self) -> Instant {
50 futures::future::poll_fn(move |cx| self.poll_tick(cx)).await
51 }
52
53 pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
54 let deadline = self.sleep.deadline();
55
56 ready!(self.sleep.as_mut().poll(cx));
58
59 let now = Instant::now();
60
61 let new_deadline = self
64 .missed_tick_behavior
65 .next_timeout(deadline, now, self.period);
66
67 self.sleep.as_mut().reset(new_deadline);
70
71 Poll::Ready(deadline)
72 }
73}
74
75impl Stream for Interval {
76 type Item = Instant;
77
78 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 self.poll_tick(cx).map(Some)
80 }
81}
82
83#[cfg(test)]
84mod test {
85
86 use futures::StreamExt;
87
88 use crate::time::{assert_dur, setup_timers};
89
90 use super::*;
91
92 fn assert_interval(
93 start: Instant,
94 stream: impl Stream<Item = Instant> + Unpin,
95 expected: impl IntoIterator<Item = (Duration, Duration)>,
96 ) {
97 let mut expected_deadline = start;
98 let mut last = start;
99
100 for (i, (deadline, (expected_fixed, expected_wall))) in
101 futures::executor::block_on_stream(stream)
102 .zip(expected)
103 .enumerate()
104 {
105 let elapsed = last.elapsed();
106 last = Instant::now();
107
108 eprintln!("[{i}] Took: {elapsed:?}");
109
110 expected_deadline += expected_fixed;
111
112 #[cfg(not(miri))]
115 assert_dur(
116 deadline.duration_since(start),
117 expected_deadline.duration_since(start),
118 "next returned deadline",
119 );
120
121 #[cfg(not(miri))]
122 assert_dur(elapsed, expected_wall, "elapsed wall time");
123 }
124 }
125
126 #[test]
127 fn interval() {
128 let (handle, j) = setup_timers();
129
130 let now = Instant::now();
131
132 let expected = [
133 (Duration::ZERO, Duration::ZERO),
135 (Duration::from_millis(100), Duration::from_millis(100)),
136 (Duration::from_millis(100), Duration::from_millis(100)),
137 (Duration::from_millis(100), Duration::from_millis(100)),
138 (Duration::from_millis(100), Duration::from_millis(100)),
139 (Duration::from_millis(100), Duration::from_millis(100)),
140 (Duration::from_millis(100), Duration::from_millis(100)),
141 (Duration::from_millis(100), Duration::from_millis(100)),
142 ];
143
144 let interval = Interval::new(&handle, now, Duration::from_millis(100), "a");
145
146 #[cfg(not(miri))]
147 assert_interval(now, interval, expected);
148
149 drop(handle);
150
151 j.join().unwrap();
152 }
153
154 #[test]
155 fn interval_burst() {
156 let (handle, j) = setup_timers();
157
158 let now = Instant::now();
159
160 let delays = futures::stream::iter([
161 Duration::ZERO,
162 Duration::ZERO,
163 Duration::from_millis(150),
164 Duration::ZERO,
166 Duration::from_millis(50),
167 Duration::ZERO,
168 Duration::from_millis(350),
169 Duration::ZERO,
170 Duration::ZERO,
171 Duration::ZERO,
172 Duration::ZERO,
173 Duration::ZERO,
174 Duration::ZERO,
175 Duration::ZERO,
176 ])
177 .then(|d| Sleep::new(&handle, Instant::now() + d, "a"));
178
179 let expected = [
180 (Duration::ZERO, Duration::ZERO),
181 (Duration::from_millis(100), Duration::from_millis(100)),
183 (Duration::from_millis(100), Duration::from_millis(150)),
184 (Duration::from_millis(100), Duration::from_millis(50)),
186 (Duration::from_millis(100), Duration::from_millis(100)),
188 (Duration::from_millis(100), Duration::from_millis(100)),
189 (Duration::from_millis(100), Duration::from_millis(350)),
190 (Duration::from_millis(100), Duration::ZERO),
192 (Duration::from_millis(100), Duration::ZERO),
194 (Duration::from_millis(100), Duration::from_millis(50)),
196 (Duration::from_millis(100), Duration::from_millis(100)),
197 (Duration::from_millis(100), Duration::from_millis(100)),
198 (Duration::from_millis(100), Duration::from_millis(100)),
199 ];
200
201 let interval = Interval::new(&handle, now, Duration::from_millis(100), "b")
202 .zip(delays)
203 .map(|v| v.0);
204
205 #[cfg(not(miri))]
206 assert_interval(now, interval, expected);
207
208 drop(handle);
209 j.join().unwrap();
210 }
211}