reqwest/blocking/
body.rs

1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem::{self, MaybeUninit};
8use std::ptr;
9
10use bytes::Bytes;
11use futures_channel::mpsc;
12
13use crate::async_impl;
14
15/// The body of a `Request`.
16///
17/// In most cases, this is not needed directly, as the
18/// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
19/// passing many things (like a string or vector of bytes).
20///
21/// [builder]: ./struct.RequestBuilder.html#method.body
22#[derive(Debug)]
23pub struct Body {
24    kind: Kind,
25}
26
27impl Body {
28    /// Instantiate a `Body` from a reader.
29    ///
30    /// # Note
31    ///
32    /// While allowing for many types to be used, these bodies do not have
33    /// a way to reset to the beginning and be reused. This means that when
34    /// encountering a 307 or 308 status code, instead of repeating the
35    /// request at the new location, the `Response` will be returned with
36    /// the redirect status code set.
37    ///
38    /// ```rust
39    /// # use std::fs::File;
40    /// # use reqwest::blocking::Body;
41    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
42    /// let file = File::open("national_secrets.txt")?;
43    /// let body = Body::new(file);
44    /// # Ok(())
45    /// # }
46    /// ```
47    ///
48    /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
49    /// `From` implementations for `Body` will store the data in a manner
50    /// it can be reused.
51    ///
52    /// ```rust
53    /// # use reqwest::blocking::Body;
54    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
55    /// let s = "A stringy body";
56    /// let body = Body::from(s);
57    /// # Ok(())
58    /// # }
59    /// ```
60    pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
61        Body {
62            kind: Kind::Reader(Box::from(reader), None),
63        }
64    }
65
66    /// Create a `Body` from a `Read` where the size is known in advance
67    /// but the data should not be fully loaded into memory. This will
68    /// set the `Content-Length` header and stream from the `Read`.
69    ///
70    /// ```rust
71    /// # use std::fs::File;
72    /// # use reqwest::blocking::Body;
73    /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
74    /// let file = File::open("a_large_file.txt")?;
75    /// let file_size = file.metadata()?.len();
76    /// let body = Body::sized(file, file_size);
77    /// # Ok(())
78    /// # }
79    /// ```
80    pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
81        Body {
82            kind: Kind::Reader(Box::from(reader), Some(len)),
83        }
84    }
85
86    /// Returns the body as a byte slice if the body is already buffered in
87    /// memory. For streamed requests this method returns `None`.
88    pub fn as_bytes(&self) -> Option<&[u8]> {
89        match self.kind {
90            Kind::Reader(_, _) => None,
91            Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
92        }
93    }
94
95    /// Converts streamed requests to their buffered equivalent and
96    /// returns a reference to the buffer. If the request is already
97    /// buffered, this has no effect.
98    ///
99    /// Be aware that for large requests this method is expensive
100    /// and may cause your program to run out of memory.
101    pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
102        match self.kind {
103            Kind::Reader(ref mut reader, maybe_len) => {
104                let mut bytes = if let Some(len) = maybe_len {
105                    Vec::with_capacity(len as usize)
106                } else {
107                    Vec::new()
108                };
109                io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
110                self.kind = Kind::Bytes(bytes.into());
111                self.buffer()
112            }
113            Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
114        }
115    }
116
117    #[cfg(feature = "multipart")]
118    pub(crate) fn len(&self) -> Option<u64> {
119        match self.kind {
120            Kind::Reader(_, len) => len,
121            Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
122        }
123    }
124
125    #[cfg(feature = "multipart")]
126    pub(crate) fn into_reader(self) -> Reader {
127        match self.kind {
128            Kind::Reader(r, _) => Reader::Reader(r),
129            Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
130        }
131    }
132
133    pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
134        match self.kind {
135            Kind::Reader(read, len) => {
136                let (tx, rx) = mpsc::channel(0);
137                let tx = Sender {
138                    body: (read, len),
139                    tx,
140                };
141                (Some(tx), async_impl::Body::stream(rx), len)
142            }
143            Kind::Bytes(chunk) => {
144                let len = chunk.len() as u64;
145                (None, async_impl::Body::reusable(chunk), Some(len))
146            }
147        }
148    }
149
150    pub(crate) fn try_clone(&self) -> Option<Body> {
151        self.kind.try_clone().map(|kind| Body { kind })
152    }
153}
154
155enum Kind {
156    Reader(Box<dyn Read + Send>, Option<u64>),
157    Bytes(Bytes),
158}
159
160impl Kind {
161    fn try_clone(&self) -> Option<Kind> {
162        match self {
163            Kind::Reader(..) => None,
164            Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
165        }
166    }
167}
168
169impl From<Vec<u8>> for Body {
170    #[inline]
171    fn from(v: Vec<u8>) -> Body {
172        Body {
173            kind: Kind::Bytes(v.into()),
174        }
175    }
176}
177
178impl From<String> for Body {
179    #[inline]
180    fn from(s: String) -> Body {
181        s.into_bytes().into()
182    }
183}
184
185impl From<&'static [u8]> for Body {
186    #[inline]
187    fn from(s: &'static [u8]) -> Body {
188        Body {
189            kind: Kind::Bytes(Bytes::from_static(s)),
190        }
191    }
192}
193
194impl From<&'static str> for Body {
195    #[inline]
196    fn from(s: &'static str) -> Body {
197        s.as_bytes().into()
198    }
199}
200
201impl From<File> for Body {
202    #[inline]
203    fn from(f: File) -> Body {
204        let len = f.metadata().map(|m| m.len()).ok();
205        Body {
206            kind: Kind::Reader(Box::new(f), len),
207        }
208    }
209}
210impl From<Bytes> for Body {
211    #[inline]
212    fn from(b: Bytes) -> Body {
213        Body {
214            kind: Kind::Bytes(b),
215        }
216    }
217}
218
219impl fmt::Debug for Kind {
220    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221        match *self {
222            Kind::Reader(_, ref v) => f
223                .debug_struct("Reader")
224                .field("length", &DebugLength(v))
225                .finish(),
226            Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
227        }
228    }
229}
230
231struct DebugLength<'a>(&'a Option<u64>);
232
233impl<'a> fmt::Debug for DebugLength<'a> {
234    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235        match *self.0 {
236            Some(ref len) => fmt::Debug::fmt(len, f),
237            None => f.write_str("Unknown"),
238        }
239    }
240}
241
242#[cfg(feature = "multipart")]
243pub(crate) enum Reader {
244    Reader(Box<dyn Read + Send>),
245    Bytes(Cursor<Bytes>),
246}
247
248#[cfg(feature = "multipart")]
249impl Read for Reader {
250    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251        match *self {
252            Reader::Reader(ref mut rdr) => rdr.read(buf),
253            Reader::Bytes(ref mut rdr) => rdr.read(buf),
254        }
255    }
256}
257
258pub(crate) struct Sender {
259    body: (Box<dyn Read + Send>, Option<u64>),
260    tx: mpsc::Sender<Result<Bytes, Abort>>,
261}
262
263#[derive(Debug)]
264struct Abort;
265
266impl fmt::Display for Abort {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        f.write_str("abort request body")
269    }
270}
271
272impl std::error::Error for Abort {}
273
274async fn send_future(sender: Sender) -> Result<(), crate::Error> {
275    use bytes::{BufMut, BytesMut};
276    use futures_util::SinkExt;
277    use std::cmp;
278
279    let con_len = sender.body.1;
280    let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
281    let mut written = 0;
282    let mut buf = BytesMut::zeroed(cap as usize);
283    buf.clear();
284    let mut body = sender.body.0;
285    // Put in an option so that it can be consumed on error to call abort()
286    let mut tx = Some(sender.tx);
287
288    loop {
289        if Some(written) == con_len {
290            // Written up to content-length, so stop.
291            return Ok(());
292        }
293
294        // The input stream is read only if the buffer is empty so
295        // that there is only one read in the buffer at any time.
296        //
297        // We need to know whether there is any data to send before
298        // we check the transmission channel (with poll_ready below)
299        // because sometimes the receiver disappears as soon as it
300        // considers the data is completely transmitted, which may
301        // be true.
302        //
303        // The use case is a web server that closes its
304        // input stream as soon as the data received is valid JSON.
305        // This behaviour is questionable, but it exists and the
306        // fact is that there is actually no remaining data to read.
307        if buf.is_empty() {
308            if buf.capacity() == buf.len() {
309                buf.reserve(8192);
310                // zero out the reserved memory
311                let uninit = buf.spare_capacity_mut();
312                let uninit_len = uninit.len();
313                unsafe {
314                    ptr::write_bytes(uninit.as_mut_ptr().cast::<u8>(), 0, uninit_len);
315                }
316            }
317
318            let bytes = unsafe {
319                mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.spare_capacity_mut())
320            };
321            match body.read(bytes) {
322                Ok(0) => {
323                    // The buffer was empty and nothing's left to
324                    // read. Return.
325                    return Ok(());
326                }
327                Ok(n) => unsafe {
328                    buf.advance_mut(n);
329                },
330                Err(e) => {
331                    let _ = tx
332                        .take()
333                        .expect("tx only taken on error")
334                        .clone()
335                        .try_send(Err(Abort));
336                    return Err(crate::error::body(e));
337                }
338            }
339        }
340
341        // The only way to get here is when the buffer is not empty.
342        // We can check the transmission channel
343
344        let buf_len = buf.len() as u64;
345        tx.as_mut()
346            .expect("tx only taken on error")
347            .send(Ok(buf.split().freeze()))
348            .await
349            .map_err(crate::error::body)?;
350
351        written += buf_len;
352    }
353}
354
355impl Sender {
356    // A `Future` that may do blocking read calls.
357    // As a `Future`, this integrates easily with `wait::timeout`.
358    pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
359        send_future(self)
360    }
361}
362
363// useful for tests, but not publicly exposed
364#[cfg(test)]
365pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
366    let mut s = String::new();
367    match body.kind {
368        Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
369        Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
370    }
371    .map(|_| s)
372}