futures_time/stream/
buffer.rs1use std::mem;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use core::task::{Context, Poll};
7use futures_core::stream::Stream;
8
9pin_project! {
10 #[derive(Debug)]
18 #[must_use = "streams do nothing unless polled or .awaited"]
19 pub struct Buffer<S: Stream, I> {
20 #[pin]
21 stream: S,
22 #[pin]
23 interval: I,
24 slot: Vec<S::Item>,
25 state: State,
26 }
27}
28
29impl<S: Stream, I> Buffer<S, I> {
30 pub(crate) fn new(stream: S, interval: I) -> Self {
31 Self {
32 stream,
33 interval,
34 slot: vec![],
35 state: State::Streaming,
36 }
37 }
38}
39
40#[derive(Debug)]
41enum State {
42 Streaming,
44 StreamDone,
46 TimerDone,
48 AllDone,
50}
51
52impl<S: Stream, I: Stream> Stream for Buffer<S, I> {
53 type Item = Vec<S::Item>;
54
55 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56 let mut this = self.project();
57
58 match this.state {
59 State::Streaming => {
61 loop {
63 match this.stream.as_mut().poll_next(cx) {
64 Poll::Ready(Some(value)) => this.slot.push(value),
65 Poll::Ready(None) => {
66 *this.state = State::StreamDone;
67 break;
68 }
69 Poll::Pending => break,
70 }
71 }
72
73 this.interval.as_mut().poll_next(cx).map(move |_| {
75 if let State::StreamDone = this.state {
76 *this.state = State::TimerDone;
77 cx.waker().wake_by_ref();
78 }
79 Some(mem::take(&mut *this.slot))
80 })
81 }
82
83 State::StreamDone => this.interval.as_mut().poll_next(cx).map(|_| {
85 cx.waker().wake_by_ref();
86 *this.state = State::TimerDone;
87 Some(mem::take(&mut *this.slot))
88 }),
89
90 State::TimerDone => {
92 *this.state = State::AllDone;
93 Poll::Ready(None)
94 }
95
96 State::AllDone => panic!("stream polled after completion"),
98 }
99 }
100}
101
102#[cfg(test)]
103mod test {
104 use crate::prelude::*;
105 use crate::time::Duration;
106 use futures_lite::prelude::*;
107
108 #[test]
109 fn buffer_all_values() {
110 async_io::block_on(async {
111 let interval = Duration::from_millis(5);
112 let buffer = Duration::from_millis(20);
113
114 let mut counter = 0;
115 crate::stream::interval(interval)
116 .take(10)
117 .buffer(buffer)
118 .for_each(|buf| counter += buf.len())
119 .await;
120
121 assert_eq!(counter, 10);
122 })
123 }
124
125 #[test]
126 fn no_debounces_hit() {
127 async_io::block_on(async {
128 let interval = Duration::from_millis(20);
129 let buffer = Duration::from_millis(10);
130
131 let mut counter = 0;
132 crate::stream::interval(interval)
133 .take(10)
134 .buffer(buffer)
135 .for_each(|buf| counter += buf.len())
136 .await;
137
138 assert_eq!(counter, 10);
139 })
140 }
141}