futures_time/stream/
park.rs1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::channel::Parker;
5
6use futures_core::{ready, Stream};
7use pin_project_lite::pin_project;
8
9pin_project! {
10 #[must_use = "futures do nothing unless polled or .awaited"]
18 pub struct Park<S, I>
19 where
20 S: Stream,
21 I: Stream<Item = Parker>,
22 {
23 #[pin]
24 stream: S,
25 #[pin]
26 interval: I,
27 state: State,
28 }
29}
30
31#[derive(Debug)]
33enum State {
34 Active,
36 Suspended,
38 NoChannel,
40 Completed,
42}
43
44impl<S, I> Park<S, I>
45where
46 S: Stream,
47 I: Stream<Item = Parker>,
48{
49 pub(super) fn new(stream: S, interval: I) -> Self {
50 Self {
51 stream,
52 interval,
53 state: State::Suspended,
54 }
55 }
56}
57
58impl<S, I> Stream for Park<S, I>
59where
60 S: Stream,
61 I: Stream<Item = Parker>,
62{
63 type Item = S::Item;
64
65 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 let mut this = self.project();
67 loop {
68 match this.state {
69 State::Suspended => match ready!(this.interval.as_mut().poll_next(cx)) {
70 Some(Parker::Park) => return Poll::Pending,
71 Some(Parker::Unpark) => *this.state = State::Active,
72 None => *this.state = State::NoChannel,
73 },
74 State::Active => {
75 if let Poll::Ready(Some(Parker::Park)) = this.interval.as_mut().poll_next(cx) {
76 *this.state = State::Suspended;
77 return Poll::Pending;
78 }
79 match ready!(this.stream.as_mut().poll_next(cx)) {
80 Some(value) => return Poll::Ready(Some(value)),
81 None => {
82 *this.state = State::Completed;
83 return Poll::Ready(None);
84 }
85 }
86 }
87 State::NoChannel => match ready!(this.stream.as_mut().poll_next(cx)) {
88 Some(value) => return Poll::Ready(Some(value)),
89 None => {
90 *this.state = State::Completed;
91 return Poll::Ready(None);
92 }
93 },
94 State::Completed => panic!("future polled after completing"),
95 }
96 }
97 }
98}
99
100