http_body_util/
stream.rs

1use bytes::Buf;
2use futures_core::{ready, stream::Stream};
3use http_body::{Body, Frame};
4use pin_project_lite::pin_project;
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10pin_project! {
11    /// A body created from a [`Stream`].
12    #[derive(Clone, Copy, Debug)]
13    pub struct StreamBody<S> {
14        #[pin]
15        stream: S,
16    }
17}
18
19impl<S> StreamBody<S> {
20    /// Create a new `StreamBody`.
21    pub fn new(stream: S) -> Self {
22        Self { stream }
23    }
24}
25
26impl<S, D, E> Body for StreamBody<S>
27where
28    S: Stream<Item = Result<Frame<D>, E>>,
29    D: Buf,
30{
31    type Data = D;
32    type Error = E;
33
34    fn poll_frame(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38        match self.project().stream.poll_next(cx) {
39            Poll::Ready(Some(result)) => Poll::Ready(Some(result)),
40            Poll::Ready(None) => Poll::Ready(None),
41            Poll::Pending => Poll::Pending,
42        }
43    }
44}
45
46impl<S: Stream> Stream for StreamBody<S> {
47    type Item = S::Item;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        self.project().stream.poll_next(cx)
51    }
52
53    fn size_hint(&self) -> (usize, Option<usize>) {
54        self.stream.size_hint()
55    }
56}
57
58pin_project! {
59    /// A stream created from a [`Body`].
60    #[derive(Clone, Copy, Debug)]
61    pub struct BodyStream<B> {
62        #[pin]
63        body: B,
64    }
65}
66
67impl<B> BodyStream<B> {
68    /// Create a new `BodyStream`.
69    pub fn new(body: B) -> Self {
70        Self { body }
71    }
72}
73
74impl<B> Body for BodyStream<B>
75where
76    B: Body,
77{
78    type Data = B::Data;
79    type Error = B::Error;
80
81    fn poll_frame(
82        self: Pin<&mut Self>,
83        cx: &mut Context<'_>,
84    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
85        self.project().body.poll_frame(cx)
86    }
87}
88
89impl<B> Stream for BodyStream<B>
90where
91    B: Body,
92{
93    type Item = Result<Frame<B::Data>, B::Error>;
94
95    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96        match self.project().body.poll_frame(cx) {
97            Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
98            Poll::Ready(None) => Poll::Ready(None),
99            Poll::Pending => Poll::Pending,
100        }
101    }
102}
103
104pin_project! {
105    /// A data stream created from a [`Body`].
106    #[derive(Clone, Copy, Debug)]
107    pub struct BodyDataStream<B> {
108        #[pin]
109        body: B,
110    }
111}
112
113impl<B> BodyDataStream<B> {
114    /// Create a new `BodyDataStream`
115    pub fn new(body: B) -> Self {
116        Self { body }
117    }
118}
119
120impl<B> Stream for BodyDataStream<B>
121where
122    B: Body,
123{
124    type Item = Result<B::Data, B::Error>;
125
126    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        loop {
128            return match ready!(self.as_mut().project().body.poll_frame(cx)) {
129                Some(Ok(frame)) => match frame.into_data() {
130                    Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
131                    Err(_) => continue,
132                },
133                Some(Err(err)) => Poll::Ready(Some(Err(err))),
134                None => Poll::Ready(None),
135            };
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use crate::{BodyExt, BodyStream, StreamBody};
143    use bytes::Bytes;
144    use futures_util::StreamExt;
145    use http_body::Frame;
146    use std::convert::Infallible;
147
148    #[tokio::test]
149    async fn body_from_stream() {
150        let chunks: Vec<Result<_, Infallible>> = vec![
151            Ok(Frame::data(Bytes::from(vec![1]))),
152            Ok(Frame::data(Bytes::from(vec![2]))),
153            Ok(Frame::data(Bytes::from(vec![3]))),
154        ];
155        let stream = futures_util::stream::iter(chunks);
156        let mut body = StreamBody::new(stream);
157
158        assert_eq!(
159            body.frame()
160                .await
161                .unwrap()
162                .unwrap()
163                .into_data()
164                .unwrap()
165                .as_ref(),
166            [1]
167        );
168        assert_eq!(
169            body.frame()
170                .await
171                .unwrap()
172                .unwrap()
173                .into_data()
174                .unwrap()
175                .as_ref(),
176            [2]
177        );
178        assert_eq!(
179            body.frame()
180                .await
181                .unwrap()
182                .unwrap()
183                .into_data()
184                .unwrap()
185                .as_ref(),
186            [3]
187        );
188
189        assert!(body.frame().await.is_none());
190    }
191
192    #[tokio::test]
193    async fn stream_from_body() {
194        let chunks: Vec<Result<_, Infallible>> = vec![
195            Ok(Frame::data(Bytes::from(vec![1]))),
196            Ok(Frame::data(Bytes::from(vec![2]))),
197            Ok(Frame::data(Bytes::from(vec![3]))),
198        ];
199        let stream = futures_util::stream::iter(chunks);
200        let body = StreamBody::new(stream);
201
202        let mut stream = BodyStream::new(body);
203
204        assert_eq!(
205            stream
206                .next()
207                .await
208                .unwrap()
209                .unwrap()
210                .into_data()
211                .unwrap()
212                .as_ref(),
213            [1]
214        );
215        assert_eq!(
216            stream
217                .next()
218                .await
219                .unwrap()
220                .unwrap()
221                .into_data()
222                .unwrap()
223                .as_ref(),
224            [2]
225        );
226        assert_eq!(
227            stream
228                .next()
229                .await
230                .unwrap()
231                .unwrap()
232                .into_data()
233                .unwrap()
234                .as_ref(),
235            [3]
236        );
237
238        assert!(stream.next().await.is_none());
239    }
240}