tokio_buf/util/
stream.rs

1use bytes::Buf;
2use futures::{Async, Poll, Stream};
3use BufStream;
4
5/// Converts a `Stream` of `Buf` types into a `BufStream`.
6///
7/// While `Stream` and `BufStream` are very similar, they are not identical. The
8/// `stream` function returns a `BufStream` that is backed by the provided
9/// `Stream` type.
10pub fn stream<T>(stream: T) -> FromStream<T>
11where
12    T: Stream,
13    T::Item: Buf,
14{
15    FromStream { stream }
16}
17
18/// `BufStream` returned by the [`stream`] function.
19#[derive(Debug)]
20pub struct FromStream<T> {
21    stream: T,
22}
23
24impl<T> BufStream for FromStream<T>
25where
26    T: Stream,
27    T::Item: Buf,
28{
29    type Item = T::Item;
30    type Error = T::Error;
31
32    fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
33        self.stream.poll()
34    }
35}
36
37/// Converts a `BufStream` into a `Stream`.
38#[derive(Debug)]
39pub struct IntoStream<T> {
40    buf: T,
41}
42
43impl<T> IntoStream<T> {
44    /// Create a new `Stream` from the provided `BufStream`.
45    pub fn new(buf: T) -> Self {
46        IntoStream { buf }
47    }
48
49    /// Get a reference to the inner `BufStream`.
50    pub fn get_ref(&self) -> &T {
51        &self.buf
52    }
53
54    /// Get a mutable reference to the inner `BufStream`
55    pub fn get_mut(&mut self) -> &mut T {
56        &mut self.buf
57    }
58
59    /// Get the inner `BufStream`.
60    pub fn into_inner(self) -> T {
61        self.buf
62    }
63}
64
65impl<T: BufStream> Stream for IntoStream<T> {
66    type Item = T::Item;
67    type Error = T::Error;
68
69    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
70        match self.buf.poll_buf()? {
71            Async::Ready(Some(buf)) => Ok(Async::Ready(Some(buf))),
72            Async::Ready(None) => Ok(Async::Ready(None)),
73            Async::NotReady => Ok(Async::NotReady),
74        }
75    }
76}