tokio_stream/
stream_close.rs

1use crate::Stream;
2use pin_project_lite::pin_project;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6pin_project! {
7    /// A `Stream` that wraps the values in an `Option`.
8    ///
9    /// Whenever the wrapped stream yields an item, this stream yields that item
10    /// wrapped in `Some`. When the inner stream ends, then this stream first
11    /// yields a `None` item, and then this stream will also end.
12    ///
13    /// # Example
14    ///
15    /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
16    ///
17    /// ```
18    /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
19    ///
20    /// #[tokio::main]
21    /// async fn main() {
22    ///     let mut map = StreamMap::new();
23    ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
24    ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
25    ///     map.insert(0, stream);
26    ///     map.insert(1, stream2);
27    ///     while let Some((key, val)) = map.next().await {
28    ///         match val {
29    ///             Some(val) => println!("got {val:?} from stream {key:?}"),
30    ///             None => println!("stream {key:?} closed"),
31    ///         }
32    ///     }
33    /// }
34    /// ```
35    #[must_use = "streams do nothing unless polled"]
36    pub struct StreamNotifyClose<S> {
37        #[pin]
38        inner: Option<S>,
39    }
40}
41
42impl<S> StreamNotifyClose<S> {
43    /// Create a new `StreamNotifyClose`.
44    pub fn new(stream: S) -> Self {
45        Self {
46            inner: Some(stream),
47        }
48    }
49
50    /// Get back the inner `Stream`.
51    ///
52    /// Returns `None` if the stream has reached its end.
53    pub fn into_inner(self) -> Option<S> {
54        self.inner
55    }
56}
57
58impl<S> Stream for StreamNotifyClose<S>
59where
60    S: Stream,
61{
62    type Item = Option<S::Item>;
63
64    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        // We can't invoke poll_next after it ended, so we unset the inner stream as a marker.
66        match self
67            .as_mut()
68            .project()
69            .inner
70            .as_pin_mut()
71            .map(|stream| S::poll_next(stream, cx))
72        {
73            Some(Poll::Ready(Some(item))) => Poll::Ready(Some(Some(item))),
74            Some(Poll::Ready(None)) => {
75                self.project().inner.set(None);
76                Poll::Ready(Some(None))
77            }
78            Some(Poll::Pending) => Poll::Pending,
79            None => Poll::Ready(None),
80        }
81    }
82
83    #[inline]
84    fn size_hint(&self) -> (usize, Option<usize>) {
85        if let Some(inner) = &self.inner {
86            // We always return +1 because when there's stream there's atleast one more item.
87            let (l, u) = inner.size_hint();
88            (l.saturating_add(1), u.and_then(|u| u.checked_add(1)))
89        } else {
90            (0, Some(0))
91        }
92    }
93}