tokio_stream/stream_ext/
take.rs

1use crate::Stream;
2
3use core::cmp;
4use core::fmt;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Stream for the [`take`](super::StreamExt::take) method.
11    #[must_use = "streams do nothing unless polled"]
12    pub struct Take<St> {
13        #[pin]
14        stream: St,
15        remaining: usize,
16    }
17}
18
19impl<St> fmt::Debug for Take<St>
20where
21    St: fmt::Debug,
22{
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("Take")
25            .field("stream", &self.stream)
26            .finish()
27    }
28}
29
30impl<St> Take<St> {
31    pub(super) fn new(stream: St, remaining: usize) -> Self {
32        Self { stream, remaining }
33    }
34}
35
36impl<St> Stream for Take<St>
37where
38    St: Stream,
39{
40    type Item = St::Item;
41
42    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        if *self.as_mut().project().remaining > 0 {
44            self.as_mut().project().stream.poll_next(cx).map(|ready| {
45                match &ready {
46                    Some(_) => {
47                        *self.as_mut().project().remaining -= 1;
48                    }
49                    None => {
50                        *self.as_mut().project().remaining = 0;
51                    }
52                }
53                ready
54            })
55        } else {
56            Poll::Ready(None)
57        }
58    }
59
60    fn size_hint(&self) -> (usize, Option<usize>) {
61        if self.remaining == 0 {
62            return (0, Some(0));
63        }
64
65        let (lower, upper) = self.stream.size_hint();
66
67        let lower = cmp::min(lower, self.remaining);
68
69        let upper = match upper {
70            Some(x) if x < self.remaining => Some(x),
71            _ => Some(self.remaining),
72        };
73
74        (lower, upper)
75    }
76}