futures_time/stream/
sample.rs1use pin_project_lite::pin_project;
2
3use futures_core::stream::Stream;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7pin_project! {
8 #[derive(Debug)]
19 #[must_use = "streams do nothing unless polled or .awaited"]
20 pub struct Sample<S: Stream, I> {
21 #[pin]
22 stream: S,
23 #[pin]
24 interval: I,
25 state: State,
26 slot: Option<S::Item>,
27 }
28}
29
30impl<S: Stream, I> Sample<S, I> {
31 pub(crate) fn new(stream: S, interval: I) -> Self {
32 Self {
33 state: State::Streaming,
34 stream,
35 interval,
36 slot: None,
37 }
38 }
39}
40
41#[derive(Debug)]
42enum State {
43 Streaming,
45 StreamDone,
47 AllDone,
49}
50
51impl<S: Stream, I: Stream> Stream for Sample<S, I> {
52 type Item = S::Item;
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 let mut this = self.project();
56
57 match this.state {
58 State::Streaming => {
60 loop {
62 match this.stream.as_mut().poll_next(cx) {
63 Poll::Ready(Some(value)) => {
64 let _ = this.slot.insert(value);
65 }
66 Poll::Ready(None) => {
67 *this.state = State::StreamDone;
68 break;
69 }
70 Poll::Pending => break,
71 }
72 }
73
74 match this.interval.as_mut().poll_next(cx) {
76 Poll::Ready(_) => {
77 if let State::StreamDone = this.state {
78 cx.waker().wake_by_ref();
79 }
80 match this.slot.take() {
81 Some(item) => Poll::Ready(Some(item)),
82 None => Poll::Pending,
83 }
84 }
85 Poll::Pending => Poll::Pending,
86 }
87 }
88
89 State::StreamDone => {
91 *this.state = State::AllDone;
92 Poll::Ready(None)
93 }
94
95 State::AllDone => panic!("stream polled after completion"),
97 }
98 }
99}
100
101#[cfg(test)]
102mod test {
103 use crate::prelude::*;
104 use crate::time::Duration;
105 use futures_lite::prelude::*;
106
107 #[test]
108 fn smoke() {
109 async_io::block_on(async {
110 let interval = Duration::from_millis(100);
111 let throttle = Duration::from_millis(200);
112
113 let take = 4;
114 let expected = 2;
115
116 let mut counter = 0;
117 crate::stream::interval(interval)
118 .take(take)
119 .sample(throttle)
120 .for_each(|_| counter += 1)
121 .await;
122
123 assert_eq!(counter, expected);
124 })
125 }
126}