futures_time/stream/
debounce.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::ready;
5use futures_core::stream::Stream;
6use pin_project_lite::pin_project;
7
8use crate::future::Timer;
9
10pin_project! {
11 #[derive(Debug)]
19 #[must_use = "streams do nothing unless polled or .awaited"]
20 pub struct Debounce<S: Stream, D> {
21 #[pin]
22 stream: S,
23 #[pin]
24 deadline: D,
25 slot: Option<S::Item>,
26 state: State,
27 }
28}
29
30#[derive(Debug)]
32enum State {
33 Streaming,
35 FinalItem,
38 SendingNone,
40 Finished,
42}
43
44impl<S: Stream, D> Debounce<S, D> {
45 pub(crate) fn new(stream: S, deadline: D) -> Self {
46 Self {
47 stream,
48 deadline,
49 slot: None,
50 state: State::Streaming,
51 }
52 }
53}
54
55impl<S, D> Stream for Debounce<S, D>
56where
57 S: Stream,
58 D: Timer,
59{
60 type Item = S::Item;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let mut this = self.project();
64
65 if let State::Streaming = this.state {
67 match this.stream.poll_next(cx) {
68 Poll::Ready(Some(item)) => {
69 *this.slot = Some(item);
70 this.deadline.as_mut().reset_timer();
71 }
72 Poll::Ready(None) => match *this.slot {
73 Some(_) => *this.state = State::FinalItem,
74 None => *this.state = State::SendingNone,
75 },
76 _ => {}
77 };
78 }
79
80 match this.state {
82 State::Streaming => match this.slot.is_some() {
83 true => {
84 ready!(this.deadline.as_mut().poll(cx));
85 Poll::Ready(this.slot.take())
86 }
87 false => Poll::Pending,
88 },
89
90 State::FinalItem => {
91 let _ = futures_core::ready!(this.deadline.as_mut().poll(cx));
92 *this.state = State::SendingNone;
93 cx.waker().wake_by_ref();
94 Poll::Ready(this.slot.take())
95 }
96
97 State::SendingNone => {
98 *this.state = State::Finished;
99 Poll::Ready(None)
100 }
101 State::Finished => panic!("stream polled after completion"),
102 }
103 }
104}
105
106#[cfg(test)]
107mod test {
108 use crate::prelude::*;
109 use crate::time::Duration;
110 use futures_lite::prelude::*;
111
112 #[test]
113 fn all_values_debounce() {
114 async_io::block_on(async {
115 let interval = Duration::from_millis(10);
116 let debounce = Duration::from_millis(20);
117
118 let mut counter = 0;
119 crate::stream::interval(interval)
120 .take(10)
121 .debounce(debounce)
122 .for_each(|_| counter += 1)
123 .await;
124
125 assert_eq!(counter, 1);
126 })
127 }
128
129 #[test]
130 fn no_debounces_hit() {
131 async_io::block_on(async {
132 let interval = Duration::from_millis(40);
133 let debounce = Duration::from_millis(10);
134
135 let mut counter = 0;
136 crate::stream::interval(interval)
137 .take(10)
138 .debounce(debounce)
139 .for_each(|_| counter += 1)
140 .await;
141
142 assert_eq!(counter, 10);
143 })
144 }
145}