futures_time/stream/
interval.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use async_io::Timer;
6use futures_core::stream::Stream;
7
8use crate::time::{Duration, Instant};
9
10/// Creates a new stream that yields at a set interval.
11///
12/// The stream first yields after `dur`, and continues to yield every
13/// `dur` after that. The stream accounts for time elapsed between calls, and
14/// will adjust accordingly to prevent time skews.
15///
16/// Each interval may be slightly longer than the specified duration, but never
17/// less.
18///
19/// Note that intervals are not intended for high resolution timers, but rather
20/// they will likely fire some granularity after the exact instant that they're
21/// otherwise indicated to fire at.
22pub fn interval(dur: Duration) -> Interval {
23    Interval {
24        timer: Timer::after(dur.into()),
25        interval: dur,
26    }
27}
28
29/// A stream representing notifications at fixed interval
30///
31/// This stream is created by the [`interval`] function. See its
32/// documentation for more.
33///
34/// [`interval`]: fn.interval.html
35#[must_use = "streams do nothing unless polled or .awaited"]
36#[derive(Debug)]
37pub struct Interval {
38    timer: Timer,
39    interval: Duration,
40}
41
42impl Stream for Interval {
43    type Item = Instant;
44
45    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let instant = match Pin::new(&mut self.timer).poll(cx) {
47            Poll::Ready(instant) => instant,
48            Poll::Pending => return Poll::Pending,
49        };
50        let interval = self.interval;
51        let _ = std::mem::replace(&mut self.timer, Timer::after(interval.into()));
52        Poll::Ready(Some(instant.into()))
53    }
54}