futures_time/stream/
delay.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::stream::Stream;
6use pin_project_lite::pin_project;
7
8pin_project! {
9 #[derive(Debug)]
17 #[must_use = "streams do nothing unless polled or .awaited"]
18 pub struct Delay<S, D> {
19 #[pin]
20 stream: S,
21 #[pin]
22 deadline: D,
23 state: State,
24 }
25}
26
27#[derive(Debug)]
28enum State {
29 Timer,
30 Streaming,
31}
32
33impl<S, D> Delay<S, D> {
34 pub(super) fn new(stream: S, deadline: D) -> Self {
35 Delay {
36 stream,
37 deadline,
38 state: State::Timer,
39 }
40 }
41}
42
43impl<S, D> Stream for Delay<S, D>
44where
45 S: Stream,
46 D: Future,
47{
48 type Item = S::Item;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 let this = self.project();
52
53 match this.state {
54 State::Timer => match this.deadline.poll(cx) {
55 Poll::Pending => return Poll::Pending,
56 Poll::Ready(_) => {
57 *this.state = State::Streaming;
58 this.stream.poll_next(cx)
59 }
60 },
61 State::Streaming => this.stream.poll_next(cx),
62 }
63 }
64}