async_std/stream/stream/
flatten.rs

1use core::fmt;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::{IntoStream, Stream};
7use crate::task::{Context, Poll};
8
9pin_project! {
10    /// A stream that flattens one level of nesting in an stream of things that can be turned into
11    /// streams.
12    ///
13    /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
14    /// documentation for more.
15    ///
16    /// [`flatten`]: trait.Stream.html#method.flatten
17    /// [`Stream`]: trait.Stream.html
18    pub struct Flatten<S>
19    where
20        S: Stream,
21        S::Item: IntoStream,
22    {
23        #[pin]
24        stream: S,
25        #[pin]
26        inner_stream: Option<<S::Item as IntoStream>::IntoStream>,
27    }
28}
29
30impl<S> Flatten<S>
31where
32    S: Stream,
33    S::Item: IntoStream,
34{
35    pub(super) fn new(stream: S) -> Self {
36        Self {
37            stream,
38            inner_stream: None,
39        }
40    }
41}
42
43impl<S, U> Stream for Flatten<S>
44where
45    S: Stream,
46    S::Item: IntoStream<IntoStream = U, Item = U::Item>,
47    U: Stream,
48{
49    type Item = U::Item;
50
51    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let mut this = self.project();
53        loop {
54            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
55                match futures_core::ready!(inner.poll_next(cx)) {
56                    item @ Some(_) => return Poll::Ready(item),
57                    None => this.inner_stream.set(None),
58                }
59            }
60
61            match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
62                inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
63                None => return Poll::Ready(None),
64            }
65        }
66    }
67}
68
69impl<S, U> fmt::Debug for Flatten<S>
70where
71    S: fmt::Debug + Stream,
72    S::Item: IntoStream<IntoStream = U, Item = U::Item>,
73    U: fmt::Debug + Stream,
74{
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        f.debug_struct("Flatten")
77            .field("inner", &self.stream)
78            .finish()
79    }
80}