tokio_stream/stream_ext/
map_while.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`map_while`](super::StreamExt::map_while) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct MapWhile<St, F> {
12        #[pin]
13        stream: St,
14        f: F,
15    }
16}
17
18impl<St, F> fmt::Debug for MapWhile<St, F>
19where
20    St: fmt::Debug,
21{
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("MapWhile")
24            .field("stream", &self.stream)
25            .finish()
26    }
27}
28
29impl<St, F> MapWhile<St, F> {
30    pub(super) fn new(stream: St, f: F) -> Self {
31        MapWhile { stream, f }
32    }
33}
34
35impl<St, F, T> Stream for MapWhile<St, F>
36where
37    St: Stream,
38    F: FnMut(St::Item) -> Option<T>,
39{
40    type Item = T;
41
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
43        let me = self.project();
44        let f = me.f;
45        me.stream.poll_next(cx).map(|opt| opt.and_then(f))
46    }
47
48    fn size_hint(&self) -> (usize, Option<usize>) {
49        let (_, upper) = self.stream.size_hint();
50        (0, upper)
51    }
52}