futures_util/stream/
concat.rs1use core::mem;
2use core::fmt::{Debug, Formatter, Result as FmtResult};
3use core::default::Default;
4
5use futures_core::{Async, Future, Poll, Stream};
6use futures_core::task;
7
8#[must_use = "streams do nothing unless polled"]
13pub struct Concat<S>
14 where S: Stream,
15{
16 inner: ConcatSafe<S>
17}
18
19impl<S: Debug> Debug for Concat<S> where S: Stream, S::Item: Debug {
20 fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
21 fmt.debug_struct("Concat")
22 .field("inner", &self.inner)
23 .finish()
24 }
25}
26
27pub fn new<S>(s: S) -> Concat<S>
28 where S: Stream,
29 S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
30{
31 Concat {
32 inner: new_safe(s)
33 }
34}
35
36impl<S> Future for Concat<S>
37 where S: Stream,
38 S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
39
40{
41 type Item = S::Item;
42 type Error = S::Error;
43
44 fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
45 self.inner.poll(cx).map(|a| {
46 match a {
47 Async::Pending => Async::Pending,
48 Async::Ready(None) => Async::Ready(Default::default()),
49 Async::Ready(Some(e)) => Async::Ready(e)
50 }
51 })
52 }
53}
54
55
56#[derive(Debug)]
57struct ConcatSafe<S>
58 where S: Stream,
59{
60 stream: S,
61 extend: Inner<S::Item>,
62}
63
64fn new_safe<S>(s: S) -> ConcatSafe<S>
65 where S: Stream,
66 S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
67{
68 ConcatSafe {
69 stream: s,
70 extend: Inner::First,
71 }
72}
73
74impl<S> Future for ConcatSafe<S>
75 where S: Stream,
76 S::Item: Extend<<<S as Stream>::Item as IntoIterator>::Item> + IntoIterator,
77
78{
79 type Item = Option<S::Item>;
80 type Error = S::Error;
81
82 fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
83 loop {
84 match self.stream.poll_next(cx) {
85 Ok(Async::Ready(Some(i))) => {
86 match self.extend {
87 Inner::First => {
88 self.extend = Inner::Extending(i);
89 },
90 Inner::Extending(ref mut e) => {
91 e.extend(i);
92 },
93 Inner::Done => unreachable!(),
94 }
95 },
96 Ok(Async::Ready(None)) => {
97 match mem::replace(&mut self.extend, Inner::Done) {
98 Inner::First => return Ok(Async::Ready(None)),
99 Inner::Extending(e) => return Ok(Async::Ready(Some(e))),
100 Inner::Done => panic!("cannot poll Concat again")
101 }
102 },
103 Ok(Async::Pending) => return Ok(Async::Pending),
104 Err(e) => {
105 self.extend = Inner::Done;
106 return Err(e)
107 }
108 }
109 }
110 }
111}
112
113
114#[derive(Debug)]
115enum Inner<E> {
116 First,
117 Extending(E),
118 Done,
119}