futures_time/future/
park.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use crate::channel::Parker;
6
7use futures_core::{ready, Stream};
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[must_use = "futures do nothing unless polled or .awaited"]
19 pub struct Park<F, I>
20 where
21 F: Future,
22 I: Stream<Item = Parker>
23 {
24 #[pin]
25 future: F,
26 #[pin]
27 interval: I,
28 state: State,
29 }
30}
31
32#[derive(Debug)]
34enum State {
35 Active,
37 Suspended,
39 NoChannel,
41 Completed,
43}
44
45impl<F, I> Park<F, I>
46where
47 F: Future,
48 I: Stream<Item = Parker>,
49{
50 pub(super) fn new(future: F, interval: I) -> Self {
51 Self {
52 future,
53 interval,
54 state: State::Suspended,
55 }
56 }
57}
58
59impl<F, I> Future for Park<F, I>
60where
61 F: Future,
62 I: Stream<Item = Parker>,
63{
64 type Output = F::Output;
65
66 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67 let mut this = self.project();
68 loop {
69 match this.state {
70 State::Suspended => match ready!(this.interval.as_mut().poll_next(cx)) {
71 Some(Parker::Park) => return Poll::Pending,
72 Some(Parker::Unpark) => *this.state = State::Active,
73 None => *this.state = State::NoChannel,
74 },
75 State::Active => {
76 if let Poll::Ready(Some(Parker::Park)) = this.interval.as_mut().poll_next(cx) {
77 *this.state = State::Suspended;
78 return Poll::Pending;
79 }
80 let value = ready!(this.future.as_mut().poll(cx));
81 *this.state = State::Completed;
82 return Poll::Ready(value);
83 }
84 State::NoChannel => {
85 let value = ready!(this.future.as_mut().poll(cx));
86 *this.state = State::Completed;
87 return Poll::Ready(value);
88 }
89 State::Completed => panic!("future polled after completing"),
90 }
91 }
92 }
93}
94
95