futures_time/stream/
park.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::channel::Parker;
5
6use futures_core::{ready, Stream};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Suspend or resume execution of a stream.
11    ///
12    /// This `struct` is created by the [`park`] method on [`StreamExt`]. See its
13    /// documentation for more.
14    ///
15    /// [`park`]: crate::future::FutureExt::park
16    /// [`StreamExt`]: crate::future::StreamExt
17    #[must_use = "futures do nothing unless polled or .awaited"]
18    pub struct Park<S, I>
19    where
20        S: Stream,
21        I: Stream<Item = Parker>,
22    {
23        #[pin]
24        stream: S,
25        #[pin]
26        interval: I,
27        state: State,
28    }
29}
30
31/// The internal state
32#[derive(Debug)]
33enum State {
34    /// Actively polling the future.
35    Active,
36    /// The future has been paused, so we wait for a signal from the channel.
37    Suspended,
38    /// The channel has been dropped, no more need to check it!
39    NoChannel,
40    /// The future has completed.
41    Completed,
42}
43
44impl<S, I> Park<S, I>
45where
46    S: Stream,
47    I: Stream<Item = Parker>,
48{
49    pub(super) fn new(stream: S, interval: I) -> Self {
50        Self {
51            stream,
52            interval,
53            state: State::Suspended,
54        }
55    }
56}
57
58impl<S, I> Stream for Park<S, I>
59where
60    S: Stream,
61    I: Stream<Item = Parker>,
62{
63    type Item = S::Item;
64
65    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        let mut this = self.project();
67        loop {
68            match this.state {
69                State::Suspended => match ready!(this.interval.as_mut().poll_next(cx)) {
70                    Some(Parker::Park) => return Poll::Pending,
71                    Some(Parker::Unpark) => *this.state = State::Active,
72                    None => *this.state = State::NoChannel,
73                },
74                State::Active => {
75                    if let Poll::Ready(Some(Parker::Park)) = this.interval.as_mut().poll_next(cx) {
76                        *this.state = State::Suspended;
77                        return Poll::Pending;
78                    }
79                    match ready!(this.stream.as_mut().poll_next(cx)) {
80                        Some(value) => return Poll::Ready(Some(value)),
81                        None => {
82                            *this.state = State::Completed;
83                            return Poll::Ready(None);
84                        }
85                    }
86                }
87                State::NoChannel => match ready!(this.stream.as_mut().poll_next(cx)) {
88                    Some(value) => return Poll::Ready(Some(value)),
89                    None => {
90                        *this.state = State::Completed;
91                        return Poll::Ready(None);
92                    }
93                },
94                State::Completed => panic!("future polled after completing"),
95            }
96        }
97    }
98}
99
100// NOTE(yosh): we should probably test this, but I'm too tired today lol.