tokio_stream/stream_ext/
then.rs

1use crate::Stream;
2
3use core::fmt;
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Stream for the [`then`](super::StreamExt::then) method.
11    #[must_use = "streams do nothing unless polled"]
12    pub struct Then<St, Fut, F> {
13        #[pin]
14        stream: St,
15        #[pin]
16        future: Option<Fut>,
17        f: F,
18    }
19}
20
21impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
22where
23    St: fmt::Debug,
24{
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        f.debug_struct("Then")
27            .field("stream", &self.stream)
28            .finish()
29    }
30}
31
32impl<St, Fut, F> Then<St, Fut, F> {
33    pub(super) fn new(stream: St, f: F) -> Self {
34        Then {
35            stream,
36            future: None,
37            f,
38        }
39    }
40}
41
42impl<St, F, Fut> Stream for Then<St, Fut, F>
43where
44    St: Stream,
45    Fut: Future,
46    F: FnMut(St::Item) -> Fut,
47{
48    type Item = Fut::Output;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
51        let mut me = self.project();
52
53        loop {
54            if let Some(future) = me.future.as_mut().as_pin_mut() {
55                match future.poll(cx) {
56                    Poll::Ready(item) => {
57                        me.future.set(None);
58                        return Poll::Ready(Some(item));
59                    }
60                    Poll::Pending => return Poll::Pending,
61                }
62            }
63
64            match me.stream.as_mut().poll_next(cx) {
65                Poll::Ready(Some(item)) => {
66                    me.future.set(Some((me.f)(item)));
67                }
68                Poll::Ready(None) => return Poll::Ready(None),
69                Poll::Pending => return Poll::Pending,
70            }
71        }
72    }
73
74    fn size_hint(&self) -> (usize, Option<usize>) {
75        let future_len = usize::from(self.future.is_some());
76        let (lower, upper) = self.stream.size_hint();
77
78        let lower = lower.saturating_add(future_len);
79        let upper = upper.and_then(|upper| upper.checked_add(future_len));
80
81        (lower, upper)
82    }
83}