futures_util/stream/
concat.rs

1use core::mem;
2use core::fmt::{Debug, Formatter, Result as FmtResult};
3use core::default::Default;
4
5use futures_core::{Async, Future, Poll, Stream};
6use futures_core::task;
7
8/// A stream combinator to concatenate the results of a stream into the first
9/// yielded item.
10///
11/// This structure is produced by the `Stream::concat` method.
12#[must_use = "streams do nothing unless polled"]
13pub struct Concat<S>
14    where S: Stream,
15{
16    inner: ConcatSafe<S>
17}
18
19impl<S: Debug> Debug for Concat<S> where S: Stream, S::Item: Debug {
20    fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
21        fmt.debug_struct("Concat")
22            .field("inner", &self.inner)
23            .finish()
24    }
25}
26
27pub fn new<S>(s: S) -> Concat<S>
28    where S: Stream,
29          S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
30{
31    Concat {
32        inner: new_safe(s)
33    }
34}
35
36impl<S> Future for Concat<S>
37    where S: Stream,
38          S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
39
40{
41    type Item = S::Item;
42    type Error = S::Error;
43
44    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
45        self.inner.poll(cx).map(|a| {
46            match a {
47                Async::Pending => Async::Pending,
48                Async::Ready(None) => Async::Ready(Default::default()),
49                Async::Ready(Some(e)) => Async::Ready(e)
50            }
51        })
52    }
53}
54
55
56#[derive(Debug)]
57struct ConcatSafe<S>
58    where S: Stream,
59{
60    stream: S,
61    extend: Inner<S::Item>,
62}
63
64fn new_safe<S>(s: S) -> ConcatSafe<S>
65    where S: Stream,
66          S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
67{
68    ConcatSafe {
69        stream: s,
70        extend: Inner::First,
71    }
72}
73
74impl<S> Future for ConcatSafe<S>
75    where S: Stream,
76          S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
77
78{
79    type Item = Option<S::Item>;
80    type Error = S::Error;
81
82    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
83        loop {
84            match self.stream.poll_next(cx) {
85                Ok(Async::Ready(Some(i))) => {
86                    match self.extend {
87                        Inner::First => {
88                            self.extend = Inner::Extending(i);
89                        },
90                        Inner::Extending(ref mut e) => {
91                            e.extend(i);
92                        },
93                        Inner::Done => unreachable!(),
94                    }
95                },
96                Ok(Async::Ready(None)) => {
97                    match mem::replace(&mut self.extend, Inner::Done) {
98                        Inner::First => return Ok(Async::Ready(None)),
99                        Inner::Extending(e) => return Ok(Async::Ready(Some(e))),
100                        Inner::Done => panic!("cannot poll Concat again")
101                    }
102                },
103                Ok(Async::Pending) => return Ok(Async::Pending),
104                Err(e) => {
105                    self.extend = Inner::Done;
106                    return Err(e)
107                }
108            }
109        }
110    }
111}
112
113
114#[derive(Debug)]
115enum Inner<E> {
116    First,
117    Extending(E),
118    Done,
119}