1use bytes::Buf;
2use futures_core::{ready, stream::Stream};
3use http_body::{Body, Frame};
4use pin_project_lite::pin_project;
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10pin_project! {
11 #[derive(Clone, Copy, Debug)]
13 pub struct StreamBody<S> {
14 #[pin]
15 stream: S,
16 }
17}
18
19impl<S> StreamBody<S> {
20 pub fn new(stream: S) -> Self {
22 Self { stream }
23 }
24}
25
26impl<S, D, E> Body for StreamBody<S>
27where
28 S: Stream<Item = Result<Frame<D>, E>>,
29 D: Buf,
30{
31 type Data = D;
32 type Error = E;
33
34 fn poll_frame(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
38 match self.project().stream.poll_next(cx) {
39 Poll::Ready(Some(result)) => Poll::Ready(Some(result)),
40 Poll::Ready(None) => Poll::Ready(None),
41 Poll::Pending => Poll::Pending,
42 }
43 }
44}
45
46impl<S: Stream> Stream for StreamBody<S> {
47 type Item = S::Item;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 self.project().stream.poll_next(cx)
51 }
52
53 fn size_hint(&self) -> (usize, Option<usize>) {
54 self.stream.size_hint()
55 }
56}
57
58pin_project! {
59 #[derive(Clone, Copy, Debug)]
61 pub struct BodyStream<B> {
62 #[pin]
63 body: B,
64 }
65}
66
67impl<B> BodyStream<B> {
68 pub fn new(body: B) -> Self {
70 Self { body }
71 }
72}
73
74impl<B> Body for BodyStream<B>
75where
76 B: Body,
77{
78 type Data = B::Data;
79 type Error = B::Error;
80
81 fn poll_frame(
82 self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
85 self.project().body.poll_frame(cx)
86 }
87}
88
89impl<B> Stream for BodyStream<B>
90where
91 B: Body,
92{
93 type Item = Result<Frame<B::Data>, B::Error>;
94
95 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96 match self.project().body.poll_frame(cx) {
97 Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
98 Poll::Ready(None) => Poll::Ready(None),
99 Poll::Pending => Poll::Pending,
100 }
101 }
102}
103
104pin_project! {
105 #[derive(Clone, Copy, Debug)]
107 pub struct BodyDataStream<B> {
108 #[pin]
109 body: B,
110 }
111}
112
113impl<B> BodyDataStream<B> {
114 pub fn new(body: B) -> Self {
116 Self { body }
117 }
118}
119
120impl<B> Stream for BodyDataStream<B>
121where
122 B: Body,
123{
124 type Item = Result<B::Data, B::Error>;
125
126 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 loop {
128 return match ready!(self.as_mut().project().body.poll_frame(cx)) {
129 Some(Ok(frame)) => match frame.into_data() {
130 Ok(bytes) => Poll::Ready(Some(Ok(bytes))),
131 Err(_) => continue,
132 },
133 Some(Err(err)) => Poll::Ready(Some(Err(err))),
134 None => Poll::Ready(None),
135 };
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use crate::{BodyExt, BodyStream, StreamBody};
143 use bytes::Bytes;
144 use futures_util::StreamExt;
145 use http_body::Frame;
146 use std::convert::Infallible;
147
148 #[tokio::test]
149 async fn body_from_stream() {
150 let chunks: Vec<Result<_, Infallible>> = vec![
151 Ok(Frame::data(Bytes::from(vec![1]))),
152 Ok(Frame::data(Bytes::from(vec![2]))),
153 Ok(Frame::data(Bytes::from(vec![3]))),
154 ];
155 let stream = futures_util::stream::iter(chunks);
156 let mut body = StreamBody::new(stream);
157
158 assert_eq!(
159 body.frame()
160 .await
161 .unwrap()
162 .unwrap()
163 .into_data()
164 .unwrap()
165 .as_ref(),
166 [1]
167 );
168 assert_eq!(
169 body.frame()
170 .await
171 .unwrap()
172 .unwrap()
173 .into_data()
174 .unwrap()
175 .as_ref(),
176 [2]
177 );
178 assert_eq!(
179 body.frame()
180 .await
181 .unwrap()
182 .unwrap()
183 .into_data()
184 .unwrap()
185 .as_ref(),
186 [3]
187 );
188
189 assert!(body.frame().await.is_none());
190 }
191
192 #[tokio::test]
193 async fn stream_from_body() {
194 let chunks: Vec<Result<_, Infallible>> = vec![
195 Ok(Frame::data(Bytes::from(vec![1]))),
196 Ok(Frame::data(Bytes::from(vec![2]))),
197 Ok(Frame::data(Bytes::from(vec![3]))),
198 ];
199 let stream = futures_util::stream::iter(chunks);
200 let body = StreamBody::new(stream);
201
202 let mut stream = BodyStream::new(body);
203
204 assert_eq!(
205 stream
206 .next()
207 .await
208 .unwrap()
209 .unwrap()
210 .into_data()
211 .unwrap()
212 .as_ref(),
213 [1]
214 );
215 assert_eq!(
216 stream
217 .next()
218 .await
219 .unwrap()
220 .unwrap()
221 .into_data()
222 .unwrap()
223 .as_ref(),
224 [2]
225 );
226 assert_eq!(
227 stream
228 .next()
229 .await
230 .unwrap()
231 .unwrap()
232 .into_data()
233 .unwrap()
234 .as_ref(),
235 [3]
236 );
237
238 assert!(stream.next().await.is_none());
239 }
240}