futures_time/stream/
sample.rs

1use pin_project_lite::pin_project;
2
3use futures_core::stream::Stream;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7pin_project! {
8    /// Yield the last value received, if any, at each interval.
9    ///
10    /// If no value was emitted during the last interval, no value is emitted
11    /// and we skip to the next interval.
12    ///
13    /// This `struct` is created by the [`sample`] method on [`StreamExt`]. See its
14    /// documentation for more.
15    ///
16    /// [`sample`]: crate::stream::StreamExt::sample
17    /// [`StreamExt`]: crate::stream::StreamExt
18    #[derive(Debug)]
19    #[must_use = "streams do nothing unless polled or .awaited"]
20    pub struct Sample<S: Stream, I> {
21        #[pin]
22        stream: S,
23        #[pin]
24        interval: I,
25        state: State,
26        slot: Option<S::Item>,
27    }
28}
29
30impl<S: Stream, I> Sample<S, I> {
31    pub(crate) fn new(stream: S, interval: I) -> Self {
32        Self {
33            state: State::Streaming,
34            stream,
35            interval,
36            slot: None,
37        }
38    }
39}
40
41#[derive(Debug)]
42enum State {
43    /// The underlying stream is yielding items.
44    Streaming,
45    /// All timers have completed and all data has been yielded.
46    StreamDone,
47    /// The closing `Ready(None)` has been yielded.
48    AllDone,
49}
50
51impl<S: Stream, I: Stream> Stream for Sample<S, I> {
52    type Item = S::Item;
53
54    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        let mut this = self.project();
56
57        match this.state {
58            // The underlying stream is yielding items.
59            State::Streaming => {
60                // Poll the underlying stream until we get to `Poll::Pending`.
61                loop {
62                    match this.stream.as_mut().poll_next(cx) {
63                        Poll::Ready(Some(value)) => {
64                            let _ = this.slot.insert(value);
65                        }
66                        Poll::Ready(None) => {
67                            *this.state = State::StreamDone;
68                            break;
69                        }
70                        Poll::Pending => break,
71                    }
72                }
73
74                // After the stream, always poll the interval timer.
75                match this.interval.as_mut().poll_next(cx) {
76                    Poll::Ready(_) => {
77                        if let State::StreamDone = this.state {
78                            cx.waker().wake_by_ref();
79                        }
80                        match this.slot.take() {
81                            Some(item) => Poll::Ready(Some(item)),
82                            None => Poll::Pending,
83                        }
84                    }
85                    Poll::Pending => Poll::Pending,
86                }
87            }
88
89            // All streams have completed and all data has been yielded.
90            State::StreamDone => {
91                *this.state = State::AllDone;
92                Poll::Ready(None)
93            }
94
95            // The closing `Ready(None)` has been yielded.
96            State::AllDone => panic!("stream polled after completion"),
97        }
98    }
99}
100
101#[cfg(test)]
102mod test {
103    use crate::prelude::*;
104    use crate::time::Duration;
105    use futures_lite::prelude::*;
106
107    #[test]
108    fn smoke() {
109        async_io::block_on(async {
110            let interval = Duration::from_millis(100);
111            let throttle = Duration::from_millis(200);
112
113            let take = 4;
114            let expected = 2;
115
116            let mut counter = 0;
117            crate::stream::interval(interval)
118                .take(take)
119                .sample(throttle)
120                .for_each(|_| counter += 1)
121                .await;
122
123            assert_eq!(counter, expected);
124        })
125    }
126}