tokio_stream/stream_ext/
map.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`map`](super::StreamExt::map) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Map<St, F> {
12        #[pin]
13        stream: St,
14        f: F,
15    }
16}
17
18impl<St, F> fmt::Debug for Map<St, F>
19where
20    St: fmt::Debug,
21{
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("Map").field("stream", &self.stream).finish()
24    }
25}
26
27impl<St, F> Map<St, F> {
28    pub(super) fn new(stream: St, f: F) -> Self {
29        Map { stream, f }
30    }
31}
32
33impl<St, F, T> Stream for Map<St, F>
34where
35    St: Stream,
36    F: FnMut(St::Item) -> T,
37{
38    type Item = T;
39
40    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
41        self.as_mut()
42            .project()
43            .stream
44            .poll_next(cx)
45            .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
46    }
47
48    fn size_hint(&self) -> (usize, Option<usize>) {
49        self.stream.size_hint()
50    }
51}