tokio_stream/stream_ext/
filter.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{ready, Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream returned by the [`filter`](super::StreamExt::filter) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Filter<St, F> {
12        #[pin]
13        stream: St,
14        f: F,
15    }
16}
17
18impl<St, F> fmt::Debug for Filter<St, F>
19where
20    St: fmt::Debug,
21{
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("Filter")
24            .field("stream", &self.stream)
25            .finish()
26    }
27}
28
29impl<St, F> Filter<St, F> {
30    pub(super) fn new(stream: St, f: F) -> Self {
31        Self { stream, f }
32    }
33}
34
35impl<St, F> Stream for Filter<St, F>
36where
37    St: Stream,
38    F: FnMut(&St::Item) -> bool,
39{
40    type Item = St::Item;
41
42    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
43        loop {
44            match ready!(self.as_mut().project().stream.poll_next(cx)) {
45                Some(e) => {
46                    if (self.as_mut().project().f)(&e) {
47                        return Poll::Ready(Some(e));
48                    }
49                }
50                None => return Poll::Ready(None),
51            }
52        }
53    }
54
55    fn size_hint(&self) -> (usize, Option<usize>) {
56        (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
57    }
58}