actix_multipart/
field.rs

1use std::{
2    cell::RefCell,
3    cmp, fmt,
4    future::poll_fn,
5    mem,
6    pin::Pin,
7    rc::Rc,
8    task::{ready, Context, Poll},
9};
10
11use actix_web::{
12    error::PayloadError,
13    http::header::{self, ContentDisposition, HeaderMap},
14    web::{Bytes, BytesMut},
15};
16use derive_more::{Display, Error};
17use futures_core::Stream;
18use mime::Mime;
19
20use crate::{
21    error::Error,
22    payload::{PayloadBuffer, PayloadRef},
23    safety::Safety,
24};
25
26/// Error type returned from [`Field::bytes()`] when field data is larger than limit.
27#[derive(Debug, Display, Error)]
28#[display(fmt = "size limit exceeded while collecting field data")]
29#[non_exhaustive]
30pub struct LimitExceeded;
31
32/// A single field in a multipart stream.
33pub struct Field {
34    /// Field's Content-Type.
35    content_type: Option<Mime>,
36
37    /// Field's Content-Disposition.
38    content_disposition: Option<ContentDisposition>,
39
40    /// Form field name.
41    ///
42    /// A non-optional storage for form field names to avoid unwraps in `form` module. Will be an
43    /// empty string in non-form contexts.
44    ///
45    // INVARIANT: always non-empty when request content-type is multipart/form-data.
46    pub(crate) form_field_name: String,
47
48    /// Field's header map.
49    headers: HeaderMap,
50
51    safety: Safety,
52    inner: Rc<RefCell<InnerField>>,
53}
54
55impl Field {
56    pub(crate) fn new(
57        content_type: Option<Mime>,
58        content_disposition: Option<ContentDisposition>,
59        form_field_name: Option<String>,
60        headers: HeaderMap,
61        safety: Safety,
62        inner: Rc<RefCell<InnerField>>,
63    ) -> Self {
64        Field {
65            content_type,
66            content_disposition,
67            form_field_name: form_field_name.unwrap_or_default(),
68            headers,
69            inner,
70            safety,
71        }
72    }
73
74    /// Returns a reference to the field's header map.
75    pub fn headers(&self) -> &HeaderMap {
76        &self.headers
77    }
78
79    /// Returns a reference to the field's content (mime) type, if it is supplied by the client.
80    ///
81    /// According to [RFC 7578](https://www.rfc-editor.org/rfc/rfc7578#section-4.4), if it is not
82    /// present, it should default to "text/plain". Note it is the responsibility of the client to
83    /// provide the appropriate content type, there is no attempt to validate this by the server.
84    pub fn content_type(&self) -> Option<&Mime> {
85        self.content_type.as_ref()
86    }
87
88    /// Returns this field's parsed Content-Disposition header, if set.
89    ///
90    /// # Validation
91    ///
92    /// Per [RFC 7578 §4.2], the parts of a multipart/form-data payload MUST contain a
93    /// Content-Disposition header field where the disposition type is `form-data` and MUST also
94    /// contain an additional parameter of `name` with its value being the original field name from
95    /// the form. This requirement is enforced during extraction for multipart/form-data requests,
96    /// but not other kinds of multipart requests (such as multipart/related).
97    ///
98    /// As such, it is safe to `.unwrap()` calls `.content_disposition()` if you've verified.
99    ///
100    /// The [`name()`](Self::name) method is also provided as a convenience for obtaining the
101    /// aforementioned name parameter.
102    ///
103    /// [RFC 7578 §4.2]: https://datatracker.ietf.org/doc/html/rfc7578#section-4.2
104    pub fn content_disposition(&self) -> Option<&ContentDisposition> {
105        self.content_disposition.as_ref()
106    }
107
108    /// Returns the field's name, if set.
109    ///
110    /// See [`content_disposition()`](Self::content_disposition) regarding guarantees on presence of
111    /// the "name" field.
112    pub fn name(&self) -> Option<&str> {
113        self.content_disposition()?.get_name()
114    }
115
116    /// Collects the raw field data, up to `limit` bytes.
117    ///
118    /// # Errors
119    ///
120    /// Any errors produced by the data stream are returned as `Ok(Err(Error))` immediately.
121    ///
122    /// If the buffered data size would exceed `limit`, an `Err(LimitExceeded)` is returned. Note
123    /// that, in this case, the full data stream is exhausted before returning the error so that
124    /// subsequent fields can still be read. To better defend against malicious/infinite requests,
125    /// it is advisable to also put a timeout on this call.
126    pub async fn bytes(&mut self, limit: usize) -> Result<Result<Bytes, Error>, LimitExceeded> {
127        /// Sensible default (2kB) for initial, bounded allocation when collecting body bytes.
128        const INITIAL_ALLOC_BYTES: usize = 2 * 1024;
129
130        let mut exceeded_limit = false;
131        let mut buf = BytesMut::with_capacity(INITIAL_ALLOC_BYTES);
132
133        let mut field = Pin::new(self);
134
135        match poll_fn(|cx| loop {
136            match ready!(field.as_mut().poll_next(cx)) {
137                // if already over limit, discard chunk to advance multipart request
138                Some(Ok(_chunk)) if exceeded_limit => {}
139
140                // if limit is exceeded set flag to true and continue
141                Some(Ok(chunk)) if buf.len() + chunk.len() > limit => {
142                    exceeded_limit = true;
143                    // eagerly de-allocate field data buffer
144                    let _ = mem::take(&mut buf);
145                }
146
147                Some(Ok(chunk)) => buf.extend_from_slice(&chunk),
148
149                None => return Poll::Ready(Ok(())),
150                Some(Err(err)) => return Poll::Ready(Err(err)),
151            }
152        })
153        .await
154        {
155            // propagate error returned from body poll
156            Err(err) => Ok(Err(err)),
157
158            // limit was exceeded while reading body
159            Ok(()) if exceeded_limit => Err(LimitExceeded),
160
161            // otherwise return body buffer
162            Ok(()) => Ok(Ok(buf.freeze())),
163        }
164    }
165}
166
167impl Stream for Field {
168    type Item = Result<Bytes, Error>;
169
170    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171        let this = self.get_mut();
172        let mut inner = this.inner.borrow_mut();
173
174        if let Some(mut buffer) = inner
175            .payload
176            .as_ref()
177            .expect("Field should not be polled after completion")
178            .get_mut(&this.safety)
179        {
180            // check safety and poll read payload to buffer.
181            buffer.poll_stream(cx)?;
182        } else if !this.safety.is_clean() {
183            // safety violation
184            return Poll::Ready(Some(Err(Error::NotConsumed)));
185        } else {
186            return Poll::Pending;
187        }
188
189        inner.poll(&this.safety)
190    }
191}
192
193impl fmt::Debug for Field {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        if let Some(ct) = &self.content_type {
196            writeln!(f, "\nField: {}", ct)?;
197        } else {
198            writeln!(f, "\nField:")?;
199        }
200        writeln!(f, "  boundary: {}", self.inner.borrow().boundary)?;
201        writeln!(f, "  headers:")?;
202        for (key, val) in self.headers.iter() {
203            writeln!(f, "    {:?}: {:?}", key, val)?;
204        }
205        Ok(())
206    }
207}
208
209pub(crate) struct InnerField {
210    /// Payload is initialized as Some and is `take`n when the field stream finishes.
211    payload: Option<PayloadRef>,
212    boundary: String,
213    eof: bool,
214    length: Option<u64>,
215}
216
217impl InnerField {
218    pub(crate) fn new_in_rc(
219        payload: PayloadRef,
220        boundary: String,
221        headers: &HeaderMap,
222    ) -> Result<Rc<RefCell<InnerField>>, PayloadError> {
223        Self::new(payload, boundary, headers).map(|this| Rc::new(RefCell::new(this)))
224    }
225
226    pub(crate) fn new(
227        payload: PayloadRef,
228        boundary: String,
229        headers: &HeaderMap,
230    ) -> Result<InnerField, PayloadError> {
231        let len = if let Some(len) = headers.get(&header::CONTENT_LENGTH) {
232            match len.to_str().ok().and_then(|len| len.parse::<u64>().ok()) {
233                Some(len) => Some(len),
234                None => return Err(PayloadError::Incomplete(None)),
235            }
236        } else {
237            None
238        };
239
240        Ok(InnerField {
241            boundary,
242            payload: Some(payload),
243            eof: false,
244            length: len,
245        })
246    }
247
248    /// Reads body part content chunk of the specified size.
249    ///
250    /// The body part must has `Content-Length` header with proper value.
251    pub(crate) fn read_len(
252        payload: &mut PayloadBuffer,
253        size: &mut u64,
254    ) -> Poll<Option<Result<Bytes, Error>>> {
255        if *size == 0 {
256            Poll::Ready(None)
257        } else {
258            match payload.read_max(*size)? {
259                Some(mut chunk) => {
260                    let len = cmp::min(chunk.len() as u64, *size);
261                    *size -= len;
262                    let ch = chunk.split_to(len as usize);
263                    if !chunk.is_empty() {
264                        payload.unprocessed(chunk);
265                    }
266                    Poll::Ready(Some(Ok(ch)))
267                }
268                None => {
269                    if payload.eof && (*size != 0) {
270                        Poll::Ready(Some(Err(Error::Incomplete)))
271                    } else {
272                        Poll::Pending
273                    }
274                }
275            }
276        }
277    }
278
279    /// Reads content chunk of body part with unknown length.
280    ///
281    /// The `Content-Length` header for body part is not necessary.
282    pub(crate) fn read_stream(
283        payload: &mut PayloadBuffer,
284        boundary: &str,
285    ) -> Poll<Option<Result<Bytes, Error>>> {
286        let mut pos = 0;
287
288        let len = payload.buf.len();
289        if len == 0 {
290            return if payload.eof {
291                Poll::Ready(Some(Err(Error::Incomplete)))
292            } else {
293                Poll::Pending
294            };
295        }
296
297        // check boundary
298        if len > 4 && payload.buf[0] == b'\r' {
299            let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" {
300                Some(4)
301            } else if &payload.buf[1..3] == b"--" {
302                Some(3)
303            } else {
304                None
305            };
306
307            if let Some(b_len) = b_len {
308                let b_size = boundary.len() + b_len;
309                if len < b_size {
310                    return Poll::Pending;
311                } else if &payload.buf[b_len..b_size] == boundary.as_bytes() {
312                    // found boundary
313                    return Poll::Ready(None);
314                }
315            }
316        }
317
318        loop {
319            return if let Some(idx) = memchr::memmem::find(&payload.buf[pos..], b"\r") {
320                let cur = pos + idx;
321
322                // check if we have enough data for boundary detection
323                if cur + 4 > len {
324                    if cur > 0 {
325                        Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
326                    } else {
327                        Poll::Pending
328                    }
329                } else {
330                    // check boundary
331                    if (&payload.buf[cur..cur + 2] == b"\r\n"
332                        && &payload.buf[cur + 2..cur + 4] == b"--")
333                        || (&payload.buf[cur..=cur] == b"\r"
334                            && &payload.buf[cur + 1..cur + 3] == b"--")
335                    {
336                        if cur != 0 {
337                            // return buffer
338                            Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
339                        } else {
340                            pos = cur + 1;
341                            continue;
342                        }
343                    } else {
344                        // not boundary
345                        pos = cur + 1;
346                        continue;
347                    }
348                }
349            } else {
350                Poll::Ready(Some(Ok(payload.buf.split().freeze())))
351            };
352        }
353    }
354
355    pub(crate) fn poll(&mut self, safety: &Safety) -> Poll<Option<Result<Bytes, Error>>> {
356        if self.payload.is_none() {
357            return Poll::Ready(None);
358        }
359
360        let result = if let Some(mut payload) = self
361            .payload
362            .as_ref()
363            .expect("Field should not be polled after completion")
364            .get_mut(safety)
365        {
366            if !self.eof {
367                let res = if let Some(ref mut len) = self.length {
368                    InnerField::read_len(&mut payload, len)
369                } else {
370                    InnerField::read_stream(&mut payload, &self.boundary)
371                };
372
373                match res {
374                    Poll::Pending => return Poll::Pending,
375                    Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))),
376                    Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
377                    Poll::Ready(None) => self.eof = true,
378                }
379            }
380
381            match payload.readline() {
382                Ok(None) => Poll::Pending,
383                Ok(Some(line)) => {
384                    if line.as_ref() != b"\r\n" {
385                        log::warn!("multipart field did not read all the data or it is malformed");
386                    }
387                    Poll::Ready(None)
388                }
389                Err(err) => Poll::Ready(Some(Err(err))),
390            }
391        } else {
392            Poll::Pending
393        };
394
395        if let Poll::Ready(None) = result {
396            // drop payload buffer and make future un-poll-able
397            let _ = self.payload.take();
398        }
399
400        result
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use futures_util::{stream, StreamExt as _};
407
408    use super::*;
409    use crate::Multipart;
410
411    // TODO: use test utility when multi-file support is introduced
412    fn create_double_request_with_header() -> (Bytes, HeaderMap) {
413        let bytes = Bytes::from(
414            "testasdadsad\r\n\
415             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
416             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
417             Content-Type: text/plain; charset=utf-8\r\n\
418             \r\n\
419             one+one+one\r\n\
420             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
421             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
422             Content-Type: text/plain; charset=utf-8\r\n\
423             \r\n\
424             two+two+two\r\n\
425             --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
426        );
427        let mut headers = HeaderMap::new();
428        headers.insert(
429            header::CONTENT_TYPE,
430            header::HeaderValue::from_static(
431                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
432            ),
433        );
434        (bytes, headers)
435    }
436
437    #[actix_rt::test]
438    async fn bytes_unlimited() {
439        let (body, headers) = create_double_request_with_header();
440
441        let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
442
443        let field = multipart
444            .next()
445            .await
446            .expect("multipart should have two fields")
447            .expect("multipart body should be well formatted")
448            .bytes(usize::MAX)
449            .await
450            .expect("field data should not be size limited")
451            .expect("reading field data should not error");
452        assert_eq!(field, "one+one+one");
453
454        let field = multipart
455            .next()
456            .await
457            .expect("multipart should have two fields")
458            .expect("multipart body should be well formatted")
459            .bytes(usize::MAX)
460            .await
461            .expect("field data should not be size limited")
462            .expect("reading field data should not error");
463        assert_eq!(field, "two+two+two");
464    }
465
466    #[actix_rt::test]
467    async fn bytes_limited() {
468        let (body, headers) = create_double_request_with_header();
469
470        let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
471
472        multipart
473            .next()
474            .await
475            .expect("multipart should have two fields")
476            .expect("multipart body should be well formatted")
477            .bytes(8) // smaller than data size
478            .await
479            .expect_err("field data should be size limited");
480
481        // next field still readable
482        let field = multipart
483            .next()
484            .await
485            .expect("multipart should have two fields")
486            .expect("multipart body should be well formatted")
487            .bytes(usize::MAX)
488            .await
489            .expect("field data should not be size limited")
490            .expect("reading field data should not error");
491        assert_eq!(field, "two+two+two");
492    }
493}