ambient_sys/time/
interval.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4    time::Duration,
5};
6
7use futures::{ready, Future, Stream};
8use pin_project::pin_project;
9
10use crate::{
11    time::{Sleep, TimersHandle, GLOBAL_TIMER},
12    MissedTickBehavior,
13};
14
15use super::Instant;
16
17pub fn interval(period: Duration) -> Interval {
18    Interval::new(&GLOBAL_TIMER, Instant::now(), period, "unknown interval")
19}
20
21pub fn interval_at(start: Instant, period: Duration) -> Interval {
22    Interval::new(&GLOBAL_TIMER, start, period, "unknown interval")
23}
24
25/// Ticks at a fixed interval.
26#[pin_project]
27#[derive(Debug)]
28pub struct Interval {
29    sleep: Pin<Box<Sleep>>,
30    period: Duration,
31    missed_tick_behavior: MissedTickBehavior,
32}
33
34impl Interval {
35    /// Creates a new interval which will fire at `start` and then every `period` duration.
36    pub fn new(
37        handle: &TimersHandle,
38        start: Instant,
39        period: Duration,
40        label: &'static str,
41    ) -> Self {
42        Self {
43            sleep: Box::pin(Sleep::new(handle, start, label)),
44            period,
45            missed_tick_behavior: MissedTickBehavior::Burst,
46        }
47    }
48
49    pub async fn tick(&mut self) -> Instant {
50        futures::future::poll_fn(move |cx| self.poll_tick(cx)).await
51    }
52
53    pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
54        let deadline = self.sleep.deadline();
55
56        // Wait until the next tick
57        ready!(self.sleep.as_mut().poll(cx));
58
59        let now = Instant::now();
60
61        // Calculate the next deadline
62        // let new_deadline = deadline + self.period;
63        let new_deadline = self
64            .missed_tick_behavior
65            .next_timeout(deadline, now, self.period);
66
67        // Reset the timer
68        // Note: will not be registered until the interval is polled again
69        self.sleep.as_mut().reset(new_deadline);
70
71        Poll::Ready(deadline)
72    }
73}
74
75impl Stream for Interval {
76    type Item = Instant;
77
78    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        self.poll_tick(cx).map(Some)
80    }
81}
82
83#[cfg(test)]
84mod test {
85
86    use futures::StreamExt;
87
88    use crate::time::{assert_dur, setup_timers};
89
90    use super::*;
91
92    fn assert_interval(
93        start: Instant,
94        stream: impl Stream<Item = Instant> + Unpin,
95        expected: impl IntoIterator<Item = (Duration, Duration)>,
96    ) {
97        let mut expected_deadline = start;
98        let mut last = start;
99
100        for (i, (deadline, (expected_fixed, expected_wall))) in
101            futures::executor::block_on_stream(stream)
102                .zip(expected)
103                .enumerate()
104        {
105            let elapsed = last.elapsed();
106            last = Instant::now();
107
108            eprintln!("[{i}] Took: {elapsed:?}");
109
110            expected_deadline += expected_fixed;
111
112            // What the deadline should have been
113            // Compare the returned deadline to the expected one
114            #[cfg(not(miri))]
115            assert_dur(
116                deadline.duration_since(start),
117                expected_deadline.duration_since(start),
118                "next returned deadline",
119            );
120
121            #[cfg(not(miri))]
122            assert_dur(elapsed, expected_wall, "elapsed wall time");
123        }
124    }
125
126    #[test]
127    fn interval() {
128        let (handle, j) = setup_timers();
129
130        let now = Instant::now();
131
132        let expected = [
133            // First tick is immediate
134            (Duration::ZERO, Duration::ZERO),
135            (Duration::from_millis(100), Duration::from_millis(100)),
136            (Duration::from_millis(100), Duration::from_millis(100)),
137            (Duration::from_millis(100), Duration::from_millis(100)),
138            (Duration::from_millis(100), Duration::from_millis(100)),
139            (Duration::from_millis(100), Duration::from_millis(100)),
140            (Duration::from_millis(100), Duration::from_millis(100)),
141            (Duration::from_millis(100), Duration::from_millis(100)),
142        ];
143
144        let interval = Interval::new(&handle, now, Duration::from_millis(100), "a");
145
146        #[cfg(not(miri))]
147        assert_interval(now, interval, expected);
148
149        drop(handle);
150
151        j.join().unwrap();
152    }
153
154    #[test]
155    fn interval_burst() {
156        let (handle, j) = setup_timers();
157
158        let now = Instant::now();
159
160        let delays = futures::stream::iter([
161            Duration::ZERO,
162            Duration::ZERO,
163            Duration::from_millis(150),
164            // Duration::from_millis(50),
165            Duration::ZERO,
166            Duration::from_millis(50),
167            Duration::ZERO,
168            Duration::from_millis(350),
169            Duration::ZERO,
170            Duration::ZERO,
171            Duration::ZERO,
172            Duration::ZERO,
173            Duration::ZERO,
174            Duration::ZERO,
175            Duration::ZERO,
176        ])
177        .then(|d| Sleep::new(&handle, Instant::now() + d, "a"));
178
179        let expected = [
180            (Duration::ZERO, Duration::ZERO),
181            // Normal tick
182            (Duration::from_millis(100), Duration::from_millis(100)),
183            (Duration::from_millis(100), Duration::from_millis(150)),
184            // 50 ms behind
185            (Duration::from_millis(100), Duration::from_millis(50)),
186            // In phase
187            (Duration::from_millis(100), Duration::from_millis(100)),
188            (Duration::from_millis(100), Duration::from_millis(100)),
189            (Duration::from_millis(100), Duration::from_millis(350)),
190            // 250 ms behind
191            (Duration::from_millis(100), Duration::ZERO),
192            // 150 ms behind
193            (Duration::from_millis(100), Duration::ZERO),
194            // 50 ms behind
195            (Duration::from_millis(100), Duration::from_millis(50)),
196            (Duration::from_millis(100), Duration::from_millis(100)),
197            (Duration::from_millis(100), Duration::from_millis(100)),
198            (Duration::from_millis(100), Duration::from_millis(100)),
199        ];
200
201        let interval = Interval::new(&handle, now, Duration::from_millis(100), "b")
202            .zip(delays)
203            .map(|v| v.0);
204
205        #[cfg(not(miri))]
206        assert_interval(now, interval, expected);
207
208        drop(handle);
209        j.join().unwrap();
210    }
211}