tokio_stream/stream_ext/
skip.rs

1use crate::Stream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{ready, Context, Poll};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Stream for the [`skip`](super::StreamExt::skip) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Skip<St> {
12        #[pin]
13        stream: St,
14        remaining: usize,
15    }
16}
17
18impl<St> fmt::Debug for Skip<St>
19where
20    St: fmt::Debug,
21{
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("Skip")
24            .field("stream", &self.stream)
25            .finish()
26    }
27}
28
29impl<St> Skip<St> {
30    pub(super) fn new(stream: St, remaining: usize) -> Self {
31        Self { stream, remaining }
32    }
33}
34
35impl<St> Stream for Skip<St>
36where
37    St: Stream,
38{
39    type Item = St::Item;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        loop {
43            match ready!(self.as_mut().project().stream.poll_next(cx)) {
44                Some(e) => {
45                    if self.remaining == 0 {
46                        return Poll::Ready(Some(e));
47                    }
48                    *self.as_mut().project().remaining -= 1;
49                }
50                None => return Poll::Ready(None),
51            }
52        }
53    }
54
55    fn size_hint(&self) -> (usize, Option<usize>) {
56        let (lower, upper) = self.stream.size_hint();
57
58        let lower = lower.saturating_sub(self.remaining);
59        let upper = upper.map(|x| x.saturating_sub(self.remaining));
60
61        (lower, upper)
62    }
63}