tokio_buf/util/
collect.rs

1use super::FromBufStream;
2use BufStream;
3
4use futures::{Future, Poll};
5
6/// Consumes a buf stream, collecting the data into a single byte container.
7///
8/// `Collect` values are produced by `BufStream::collect`.
9#[derive(Debug)]
10pub struct Collect<T, U>
11where
12    T: BufStream,
13    U: FromBufStream<T::Item>,
14{
15    stream: T,
16    builder: Option<U::Builder>,
17}
18
19/// Errors returned from `Collect` future.
20#[derive(Debug)]
21pub struct CollectError<T, U> {
22    inner: Error<T, U>,
23}
24
25#[derive(Debug)]
26enum Error<T, U> {
27    Stream(T),
28    Collect(U),
29}
30
31impl<T, U> Collect<T, U>
32where
33    T: BufStream,
34    U: FromBufStream<T::Item>,
35{
36    pub(crate) fn new(stream: T) -> Collect<T, U> {
37        let builder = U::builder(&stream.size_hint());
38
39        Collect {
40            stream,
41            builder: Some(builder),
42        }
43    }
44}
45
46impl<T, U> Future for Collect<T, U>
47where
48    T: BufStream,
49    U: FromBufStream<T::Item>,
50{
51    type Item = U;
52    type Error = CollectError<T::Error, U::Error>;
53
54    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
55        loop {
56            let res = self.stream.poll_buf().map_err(|err| {
57                let inner = Error::Stream(err);
58                CollectError { inner }
59            });
60
61            match try_ready!(res) {
62                Some(mut buf) => {
63                    let builder = self.builder.as_mut().expect("cannot poll after done");
64
65                    U::extend(builder, &mut buf, &self.stream.size_hint()).map_err(|err| {
66                        let inner = Error::Collect(err);
67                        CollectError { inner }
68                    })?;
69                }
70                None => {
71                    let builder = self.builder.take().expect("cannot poll after done");
72                    let value = U::build(builder).map_err(|err| {
73                        let inner = Error::Collect(err);
74                        CollectError { inner }
75                    })?;
76                    return Ok(value.into());
77                }
78            }
79        }
80    }
81}
82
83// ===== impl CollectError =====
84
85impl<T, U> CollectError<T, U> {
86    /// Returns `true` if the error was caused by polling the stream.
87    pub fn is_stream_err(&self) -> bool {
88        match self.inner {
89            Error::Stream(_) => true,
90            _ => false,
91        }
92    }
93
94    /// Returns `true` if the error happened while collecting the data.
95    pub fn is_collect_err(&self) -> bool {
96        match self.inner {
97            Error::Collect(_) => true,
98            _ => false,
99        }
100    }
101}