async_std/stream/stream/
flatten.rs1use core::fmt;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::{IntoStream, Stream};
7use crate::task::{Context, Poll};
8
9pin_project! {
10 pub struct Flatten<S>
19 where
20 S: Stream,
21 S::Item: IntoStream,
22 {
23 #[pin]
24 stream: S,
25 #[pin]
26 inner_stream: Option<<S::Item as IntoStream>::IntoStream>,
27 }
28}
29
30impl<S> Flatten<S>
31where
32 S: Stream,
33 S::Item: IntoStream,
34{
35 pub(super) fn new(stream: S) -> Self {
36 Self {
37 stream,
38 inner_stream: None,
39 }
40 }
41}
42
43impl<S, U> Stream for Flatten<S>
44where
45 S: Stream,
46 S::Item: IntoStream<IntoStream = U, Item = U::Item>,
47 U: Stream,
48{
49 type Item = U::Item;
50
51 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52 let mut this = self.project();
53 loop {
54 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
55 match futures_core::ready!(inner.poll_next(cx)) {
56 item @ Some(_) => return Poll::Ready(item),
57 None => this.inner_stream.set(None),
58 }
59 }
60
61 match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
62 inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
63 None => return Poll::Ready(None),
64 }
65 }
66 }
67}
68
69impl<S, U> fmt::Debug for Flatten<S>
70where
71 S: fmt::Debug + Stream,
72 S::Item: IntoStream<IntoStream = U, Item = U::Item>,
73 U: fmt::Debug + Stream,
74{
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 f.debug_struct("Flatten")
77 .field("inner", &self.stream)
78 .finish()
79 }
80}