futures_time/stream/
delay.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::stream::Stream;
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Delay execution of a stream once for the specified duration.
10    ///
11    /// This `struct` is created by the [`delay`] method on [`StreamExt`]. See its
12    /// documentation for more.
13    ///
14    /// [`delay`]: crate::stream::StreamExt::delay
15    /// [`StreamExt`]: crate::stream::StreamExt
16    #[derive(Debug)]
17    #[must_use = "streams do nothing unless polled or .awaited"]
18    pub struct Delay<S, D> {
19        #[pin]
20        stream: S,
21        #[pin]
22        deadline: D,
23        state: State,
24    }
25}
26
27#[derive(Debug)]
28enum State {
29    Timer,
30    Streaming,
31}
32
33impl<S, D> Delay<S, D> {
34    pub(super) fn new(stream: S, deadline: D) -> Self {
35        Delay {
36            stream,
37            deadline,
38            state: State::Timer,
39        }
40    }
41}
42
43impl<S, D> Stream for Delay<S, D>
44where
45    S: Stream,
46    D: Future,
47{
48    type Item = S::Item;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        let this = self.project();
52
53        match this.state {
54            State::Timer => match this.deadline.poll(cx) {
55                Poll::Pending => return Poll::Pending,
56                Poll::Ready(_) => {
57                    *this.state = State::Streaming;
58                    this.stream.poll_next(cx)
59                }
60            },
61            State::Streaming => this.stream.poll_next(cx),
62        }
63    }
64}