tokio_stream/stream_ext/
take_while.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`take_while`](super::StreamExt::take_while) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct TakeWhile<St, F> {
12        #[pin]
13        stream: St,
14        predicate: F,
15        done: bool,
16    }
17}
18
19impl<St, F> fmt::Debug for TakeWhile<St, F>
20where
21    St: fmt::Debug,
22{
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("TakeWhile")
25            .field("stream", &self.stream)
26            .field("done", &self.done)
27            .finish()
28    }
29}
30
31impl<St, F> TakeWhile<St, F> {
32    pub(super) fn new(stream: St, predicate: F) -> Self {
33        Self {
34            stream,
35            predicate,
36            done: false,
37        }
38    }
39}
40
41impl<St, F> Stream for TakeWhile<St, F>
42where
43    St: Stream,
44    F: FnMut(&St::Item) -> bool,
45{
46    type Item = St::Item;
47
48    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49        if !*self.as_mut().project().done {
50            self.as_mut().project().stream.poll_next(cx).map(|ready| {
51                let ready = ready.and_then(|item| {
52                    if !(self.as_mut().project().predicate)(&item) {
53                        None
54                    } else {
55                        Some(item)
56                    }
57                });
58
59                if ready.is_none() {
60                    *self.as_mut().project().done = true;
61                }
62
63                ready
64            })
65        } else {
66            Poll::Ready(None)
67        }
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        if self.done {
72            return (0, Some(0));
73        }
74
75        let (_, upper) = self.stream.size_hint();
76
77        (0, upper)
78    }
79}