futures_time/future/
park.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use crate::channel::Parker;
6
7use futures_core::{ready, Stream};
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Suspend or resume execution of a future.
12    ///
13    /// This `struct` is created by the [`park`] method on [`FutureExt`]. See its
14    /// documentation for more.
15    ///
16    /// [`park`]: crate::future::FutureExt::park
17    /// [`FutureExt`]: crate::future::FutureExt
18    #[must_use = "futures do nothing unless polled or .awaited"]
19    pub struct Park<F, I>
20    where
21        F: Future,
22        I: Stream<Item = Parker>
23    {
24        #[pin]
25        future: F,
26        #[pin]
27        interval: I,
28        state: State,
29    }
30}
31
32/// The internal state
33#[derive(Debug)]
34enum State {
35    /// Actively polling the future.
36    Active,
37    /// The future has been paused, so we wait for a signal from the channel.
38    Suspended,
39    /// The channel has been dropped, no more need to check it!
40    NoChannel,
41    /// The future has completed.
42    Completed,
43}
44
45impl<F, I> Park<F, I>
46where
47    F: Future,
48    I: Stream<Item = Parker>,
49{
50    pub(super) fn new(future: F, interval: I) -> Self {
51        Self {
52            future,
53            interval,
54            state: State::Suspended,
55        }
56    }
57}
58
59impl<F, I> Future for Park<F, I>
60where
61    F: Future,
62    I: Stream<Item = Parker>,
63{
64    type Output = F::Output;
65
66    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67        let mut this = self.project();
68        loop {
69            match this.state {
70                State::Suspended => match ready!(this.interval.as_mut().poll_next(cx)) {
71                    Some(Parker::Park) => return Poll::Pending,
72                    Some(Parker::Unpark) => *this.state = State::Active,
73                    None => *this.state = State::NoChannel,
74                },
75                State::Active => {
76                    if let Poll::Ready(Some(Parker::Park)) = this.interval.as_mut().poll_next(cx) {
77                        *this.state = State::Suspended;
78                        return Poll::Pending;
79                    }
80                    let value = ready!(this.future.as_mut().poll(cx));
81                    *this.state = State::Completed;
82                    return Poll::Ready(value);
83                }
84                State::NoChannel => {
85                    let value = ready!(this.future.as_mut().poll(cx));
86                    *this.state = State::Completed;
87                    return Poll::Ready(value);
88                }
89                State::Completed => panic!("future polled after completing"),
90            }
91        }
92    }
93}
94
95// NOTE(yosh): we should probably test this, but I'm too tired today lol.