1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use bytes::{Buf, Bytes};
use futures::task::{Context, Poll};
use futures::{AsyncRead, Stream, StreamExt};
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
pub struct StreamBody<S> {
s: S,
remain_bytes: Option<Bytes>,
}
impl<S> StreamBody<S> {
#[allow(missing_docs)]
pub fn new(s: S) -> Self {
Self {
s,
remain_bytes: None,
}
}
}
impl<S, E, D> AsyncRead for StreamBody<S>
where
D: Buf,
S: Stream<Item = std::result::Result<D, E>> + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
loop {
if let Some(bytes) = &mut self.remain_bytes {
let data = bytes.split_to(buf.len().min(bytes.len()));
buf[..data.len()].copy_from_slice(&data);
if !bytes.has_remaining() {
self.remain_bytes = None;
}
return Poll::Ready(Ok(data.len()));
} else {
match self.s.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(mut bytes))) => {
self.remain_bytes = Some(bytes.to_bytes());
}
Poll::Ready(Some(Err(_))) => {
return Poll::Ready(Err(Error::from(ErrorKind::InvalidData)))
}
Poll::Ready(None) => return Poll::Ready(Ok(0)),
Poll::Pending => return Poll::Pending,
}
}
}
}
}