tokio_stream/stream_ext/
peekable.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use futures_core::Stream;
5use pin_project_lite::pin_project;
6
7use crate::stream_ext::Fuse;
8use crate::StreamExt;
9
10pin_project! {
11    /// Stream returned by the [`peekable`](super::StreamExt::peekable) method.
12    pub struct Peekable<T: Stream> {
13        peek: Option<T::Item>,
14        #[pin]
15        stream: Fuse<T>,
16    }
17}
18
19impl<T: Stream> Peekable<T> {
20    pub(crate) fn new(stream: T) -> Self {
21        let stream = stream.fuse();
22        Self { peek: None, stream }
23    }
24
25    /// Peek at the next item in the stream.
26    pub async fn peek(&mut self) -> Option<&T::Item>
27    where
28        T: Unpin,
29    {
30        if let Some(ref it) = self.peek {
31            Some(it)
32        } else {
33            self.peek = self.next().await;
34            self.peek.as_ref()
35        }
36    }
37}
38
39impl<T: Stream> Stream for Peekable<T> {
40    type Item = T::Item;
41
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        let this = self.project();
44        if let Some(it) = this.peek.take() {
45            Poll::Ready(Some(it))
46        } else {
47            this.stream.poll_next(cx)
48        }
49    }
50}