1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9
10use bytes::Bytes;
11use futures_channel::mpsc;
12
13use crate::async_impl;
14
15#[derive(Debug)]
23pub struct Body {
24 kind: Kind,
25}
26
27impl Body {
28 pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
61 Body {
62 kind: Kind::Reader(Box::from(reader), None),
63 }
64 }
65
66 pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
81 Body {
82 kind: Kind::Reader(Box::from(reader), Some(len)),
83 }
84 }
85
86 pub fn as_bytes(&self) -> Option<&[u8]> {
89 match self.kind {
90 Kind::Reader(_, _) => None,
91 Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
92 }
93 }
94
95 pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
102 match self.kind {
103 Kind::Reader(ref mut reader, maybe_len) => {
104 let mut bytes = if let Some(len) = maybe_len {
105 Vec::with_capacity(len as usize)
106 } else {
107 Vec::new()
108 };
109 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
110 self.kind = Kind::Bytes(bytes.into());
111 self.buffer()
112 }
113 Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
114 }
115 }
116
117 #[cfg(feature = "multipart")]
118 pub(crate) fn len(&self) -> Option<u64> {
119 match self.kind {
120 Kind::Reader(_, len) => len,
121 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
122 }
123 }
124
125 #[cfg(feature = "multipart")]
126 pub(crate) fn into_reader(self) -> Reader {
127 match self.kind {
128 Kind::Reader(r, _) => Reader::Reader(r),
129 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
130 }
131 }
132
133 pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
134 match self.kind {
135 Kind::Reader(read, len) => {
136 let (tx, rx) = mpsc::channel(0);
137 let tx = Sender {
138 body: (read, len),
139 tx,
140 };
141 (Some(tx), async_impl::Body::stream(rx), len)
142 }
143 Kind::Bytes(chunk) => {
144 let len = chunk.len() as u64;
145 (None, async_impl::Body::reusable(chunk), Some(len))
146 }
147 }
148 }
149
150 pub(crate) fn try_clone(&self) -> Option<Body> {
151 self.kind.try_clone().map(|kind| Body { kind })
152 }
153}
154
155enum Kind {
156 Reader(Box<dyn Read + Send>, Option<u64>),
157 Bytes(Bytes),
158}
159
160impl Kind {
161 fn try_clone(&self) -> Option<Kind> {
162 match self {
163 Kind::Reader(..) => None,
164 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
165 }
166 }
167}
168
169impl From<Vec<u8>> for Body {
170 #[inline]
171 fn from(v: Vec<u8>) -> Body {
172 Body {
173 kind: Kind::Bytes(v.into()),
174 }
175 }
176}
177
178impl From<String> for Body {
179 #[inline]
180 fn from(s: String) -> Body {
181 s.into_bytes().into()
182 }
183}
184
185impl From<&'static [u8]> for Body {
186 #[inline]
187 fn from(s: &'static [u8]) -> Body {
188 Body {
189 kind: Kind::Bytes(Bytes::from_static(s)),
190 }
191 }
192}
193
194impl From<&'static str> for Body {
195 #[inline]
196 fn from(s: &'static str) -> Body {
197 s.as_bytes().into()
198 }
199}
200
201impl From<File> for Body {
202 #[inline]
203 fn from(f: File) -> Body {
204 let len = f.metadata().map(|m| m.len()).ok();
205 Body {
206 kind: Kind::Reader(Box::new(f), len),
207 }
208 }
209}
210impl From<Bytes> for Body {
211 #[inline]
212 fn from(b: Bytes) -> Body {
213 Body {
214 kind: Kind::Bytes(b),
215 }
216 }
217}
218
219impl fmt::Debug for Kind {
220 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221 match *self {
222 Kind::Reader(_, ref v) => f
223 .debug_struct("Reader")
224 .field("length", &DebugLength(v))
225 .finish(),
226 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
227 }
228 }
229}
230
231struct DebugLength<'a>(&'a Option<u64>);
232
233impl<'a> fmt::Debug for DebugLength<'a> {
234 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235 match *self.0 {
236 Some(ref len) => fmt::Debug::fmt(len, f),
237 None => f.write_str("Unknown"),
238 }
239 }
240}
241
242#[cfg(feature = "multipart")]
243pub(crate) enum Reader {
244 Reader(Box<dyn Read + Send>),
245 Bytes(Cursor<Bytes>),
246}
247
248#[cfg(feature = "multipart")]
249impl Read for Reader {
250 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251 match *self {
252 Reader::Reader(ref mut rdr) => rdr.read(buf),
253 Reader::Bytes(ref mut rdr) => rdr.read(buf),
254 }
255 }
256}
257
258pub(crate) struct Sender {
259 body: (Box<dyn Read + Send>, Option<u64>),
260 tx: mpsc::Sender<Result<Bytes, Abort>>,
261}
262
263#[derive(Debug)]
264struct Abort;
265
266impl fmt::Display for Abort {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.write_str("abort request body")
269 }
270}
271
272impl std::error::Error for Abort {}
273
274async fn send_future(sender: Sender) -> Result<(), crate::Error> {
275 use bytes::{BufMut, BytesMut};
276 use futures_util::SinkExt;
277 use std::cmp;
278
279 let con_len = sender.body.1;
280 let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
281 let mut written = 0;
282 let mut buf = BytesMut::zeroed(cap as usize);
283 buf.clear();
284 let mut body = sender.body.0;
285 let mut tx = Some(sender.tx);
287
288 loop {
289 if Some(written) == con_len {
290 return Ok(());
292 }
293
294 if buf.is_empty() {
308 if buf.capacity() == buf.len() {
309 buf.reserve(8192);
310 let uninit = buf.spare_capacity_mut();
312 let uninit_len = uninit.len();
313 unsafe {
314 ptr::write_bytes(uninit.as_mut_ptr().cast::<u8>(), 0, uninit_len);
315 }
316 }
317
318 let bytes = unsafe {
319 mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.spare_capacity_mut())
320 };
321 match body.read(bytes) {
322 Ok(0) => {
323 return Ok(());
326 }
327 Ok(n) => unsafe {
328 buf.advance_mut(n);
329 },
330 Err(e) => {
331 let _ = tx
332 .take()
333 .expect("tx only taken on error")
334 .clone()
335 .try_send(Err(Abort));
336 return Err(crate::error::body(e));
337 }
338 }
339 }
340
341 let buf_len = buf.len() as u64;
345 tx.as_mut()
346 .expect("tx only taken on error")
347 .send(Ok(buf.split().freeze()))
348 .await
349 .map_err(crate::error::body)?;
350
351 written += buf_len;
352 }
353}
354
355impl Sender {
356 pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
359 send_future(self)
360 }
361}
362
363#[cfg(test)]
365pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
366 let mut s = String::new();
367 match body.kind {
368 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
369 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
370 }
371 .map(|_| s)
372}