1use bytes::Buf;
2use futures::{Async, Poll, Stream};
3use BufStream;
4
5pub fn stream<T>(stream: T) -> FromStream<T>
11where
12 T: Stream,
13 T::Item: Buf,
14{
15 FromStream { stream }
16}
17
18#[derive(Debug)]
20pub struct FromStream<T> {
21 stream: T,
22}
23
24impl<T> BufStream for FromStream<T>
25where
26 T: Stream,
27 T::Item: Buf,
28{
29 type Item = T::Item;
30 type Error = T::Error;
31
32 fn poll_buf(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
33 self.stream.poll()
34 }
35}
36
37#[derive(Debug)]
39pub struct IntoStream<T> {
40 buf: T,
41}
42
43impl<T> IntoStream<T> {
44 pub fn new(buf: T) -> Self {
46 IntoStream { buf }
47 }
48
49 pub fn get_ref(&self) -> &T {
51 &self.buf
52 }
53
54 pub fn get_mut(&mut self) -> &mut T {
56 &mut self.buf
57 }
58
59 pub fn into_inner(self) -> T {
61 self.buf
62 }
63}
64
65impl<T: BufStream> Stream for IntoStream<T> {
66 type Item = T::Item;
67 type Error = T::Error;
68
69 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
70 match self.buf.poll_buf()? {
71 Async::Ready(Some(buf)) => Ok(Async::Ready(Some(buf))),
72 Async::Ready(None) => Ok(Async::Ready(None)),
73 Async::NotReady => Ok(Async::NotReady),
74 }
75 }
76}