async_std/stream/stream/
chain.rs

1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use super::fuse::Fuse;
6use crate::stream::stream::StreamExt;
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11    /// A stream that chains two streams one after another.
12    ///
13    /// This `struct` is created by the [`chain`] method on [`Stream`]. See its
14    /// documentation for more.
15    ///
16    /// [`chain`]: trait.Stream.html#method.chain
17    /// [`Stream`]: trait.Stream.html
18    #[derive(Debug)]
19    pub struct Chain<S, U> {
20        #[pin]
21        first: Fuse<S>,
22        #[pin]
23        second: Fuse<U>,
24    }
25}
26
27impl<S: Stream, U: Stream> Chain<S, U> {
28    pub(super) fn new(first: S, second: U) -> Self {
29        Self {
30            first: first.fuse(),
31            second: second.fuse(),
32        }
33    }
34}
35
36impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
37    type Item = S::Item;
38
39    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40        let mut this = self.project();
41        if !this.first.done {
42            let next = futures_core::ready!(this.first.as_mut().poll_next(cx));
43            if let Some(next) = next {
44                return Poll::Ready(Some(next));
45            }
46        }
47
48        if !this.second.done {
49            let next = futures_core::ready!(this.second.as_mut().poll_next(cx));
50            if let Some(next) = next {
51                return Poll::Ready(Some(next));
52            }
53        }
54
55        if this.first.done && this.second.done {
56            return Poll::Ready(None);
57        }
58
59        Poll::Pending
60    }
61}