tokio_stream/stream_ext/
skip_while.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{ready, Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct SkipWhile<St, F> {
12        #[pin]
13        stream: St,
14        predicate: Option<F>,
15    }
16}
17
18impl<St, F> fmt::Debug for SkipWhile<St, F>
19where
20    St: fmt::Debug,
21{
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("SkipWhile")
24            .field("stream", &self.stream)
25            .finish()
26    }
27}
28
29impl<St, F> SkipWhile<St, F> {
30    pub(super) fn new(stream: St, predicate: F) -> Self {
31        Self {
32            stream,
33            predicate: Some(predicate),
34        }
35    }
36}
37
38impl<St, F> Stream for SkipWhile<St, F>
39where
40    St: Stream,
41    F: FnMut(&St::Item) -> bool,
42{
43    type Item = St::Item;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let mut this = self.project();
47        if let Some(predicate) = this.predicate {
48            loop {
49                match ready!(this.stream.as_mut().poll_next(cx)) {
50                    Some(item) => {
51                        if !(predicate)(&item) {
52                            *this.predicate = None;
53                            return Poll::Ready(Some(item));
54                        }
55                    }
56                    None => return Poll::Ready(None),
57                }
58            }
59        } else {
60            this.stream.poll_next(cx)
61        }
62    }
63
64    fn size_hint(&self) -> (usize, Option<usize>) {
65        let (lower, upper) = self.stream.size_hint();
66
67        if self.predicate.is_some() {
68            return (0, upper);
69        }
70
71        (lower, upper)
72    }
73}