futures_time/stream/
debounce.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::ready;
5use futures_core::stream::Stream;
6use pin_project_lite::pin_project;
7
8use crate::future::Timer;
9
10pin_project! {
11    /// Debounce the stream.
12    ///
13    /// This `struct` is created by the [`debounce`] method on [`StreamExt`]. See its
14    /// documentation for more.
15    ///
16    /// [`debounce`]: crate::stream::StreamExt::debounce
17    /// [`StreamExt`]: crate::stream::StreamExt
18    #[derive(Debug)]
19    #[must_use = "streams do nothing unless polled or .awaited"]
20    pub struct Debounce<S: Stream, D> {
21        #[pin]
22        stream: S,
23        #[pin]
24        deadline: D,
25        slot: Option<S::Item>,
26        state: State,
27    }
28}
29
30/// Internal state.
31#[derive(Debug)]
32enum State {
33    /// We're actively streaming and may have data.
34    Streaming,
35    /// The stream has ended, but we need to send the final `Ready(Some(Item))`
36    /// and `Ready(None)` messages.
37    FinalItem,
38    /// The stream has ended, but we need to send the final `Ready(None)` message.
39    SendingNone,
40    /// The stream has completed.
41    Finished,
42}
43
44impl<S: Stream, D> Debounce<S, D> {
45    pub(crate) fn new(stream: S, deadline: D) -> Self {
46        Self {
47            stream,
48            deadline,
49            slot: None,
50            state: State::Streaming,
51        }
52    }
53}
54
55impl<S, D> Stream for Debounce<S, D>
56where
57    S: Stream,
58    D: Timer,
59{
60    type Item = S::Item;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63        let mut this = self.project();
64
65        // See if we need to get more data from the stream.
66        if let State::Streaming = this.state {
67            match this.stream.poll_next(cx) {
68                Poll::Ready(Some(item)) => {
69                    *this.slot = Some(item);
70                    this.deadline.as_mut().reset_timer();
71                }
72                Poll::Ready(None) => match *this.slot {
73                    Some(_) => *this.state = State::FinalItem,
74                    None => *this.state = State::SendingNone,
75                },
76                _ => {}
77            };
78        }
79
80        // Handle the timer.
81        match this.state {
82            State::Streaming => match this.slot.is_some() {
83                true => {
84                    ready!(this.deadline.as_mut().poll(cx));
85                    Poll::Ready(this.slot.take())
86                }
87                false => Poll::Pending,
88            },
89
90            State::FinalItem => {
91                let _ = futures_core::ready!(this.deadline.as_mut().poll(cx));
92                *this.state = State::SendingNone;
93                cx.waker().wake_by_ref();
94                Poll::Ready(this.slot.take())
95            }
96
97            State::SendingNone => {
98                *this.state = State::Finished;
99                Poll::Ready(None)
100            }
101            State::Finished => panic!("stream polled after completion"),
102        }
103    }
104}
105
106#[cfg(test)]
107mod test {
108    use crate::prelude::*;
109    use crate::time::Duration;
110    use futures_lite::prelude::*;
111
112    #[test]
113    fn all_values_debounce() {
114        async_io::block_on(async {
115            let interval = Duration::from_millis(10);
116            let debounce = Duration::from_millis(20);
117
118            let mut counter = 0;
119            crate::stream::interval(interval)
120                .take(10)
121                .debounce(debounce)
122                .for_each(|_| counter += 1)
123                .await;
124
125            assert_eq!(counter, 1);
126        })
127    }
128
129    #[test]
130    fn no_debounces_hit() {
131        async_io::block_on(async {
132            let interval = Duration::from_millis(40);
133            let debounce = Duration::from_millis(10);
134
135            let mut counter = 0;
136            crate::stream::interval(interval)
137                .take(10)
138                .debounce(debounce)
139                .for_each(|_| counter += 1)
140                .await;
141
142            assert_eq!(counter, 10);
143        })
144    }
145}