actix_multipart/
multipart.rs

1//! Multipart response payload support.
2
3use std::{
4    cell::RefCell,
5    pin::Pin,
6    rc::Rc,
7    task::{Context, Poll},
8};
9
10use actix_web::{
11    dev,
12    error::{ParseError, PayloadError},
13    http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
14    web::Bytes,
15    HttpRequest,
16};
17use futures_core::stream::Stream;
18use mime::Mime;
19
20use crate::{
21    error::Error,
22    field::InnerField,
23    payload::{PayloadBuffer, PayloadRef},
24    safety::Safety,
25    Field,
26};
27
28const MAX_HEADERS: usize = 32;
29
30/// The server-side implementation of `multipart/form-data` requests.
31///
32/// This will parse the incoming stream into `MultipartItem` instances via its `Stream`
33/// implementation. `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` is
34/// used for nested multipart streams.
35pub struct Multipart {
36    flow: Flow,
37    safety: Safety,
38}
39
40enum Flow {
41    InFlight(Inner),
42
43    /// Error container is Some until an error is returned out of the flow.
44    Error(Option<Error>),
45}
46
47impl Multipart {
48    /// Creates multipart instance from parts.
49    pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
50    where
51        S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
52    {
53        match Self::find_ct_and_boundary(headers) {
54            Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, stream),
55            Err(err) => Self::from_error(err),
56        }
57    }
58
59    /// Creates multipart instance from parts.
60    pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
61        match Self::find_ct_and_boundary(req.headers()) {
62            Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, payload.take()),
63            Err(err) => Self::from_error(err),
64        }
65    }
66
67    /// Extract Content-Type and boundary info from headers.
68    pub(crate) fn find_ct_and_boundary(headers: &HeaderMap) -> Result<(Mime, String), Error> {
69        let content_type = headers
70            .get(&header::CONTENT_TYPE)
71            .ok_or(Error::ContentTypeMissing)?
72            .to_str()
73            .ok()
74            .and_then(|content_type| content_type.parse::<Mime>().ok())
75            .ok_or(Error::ContentTypeParse)?;
76
77        if content_type.type_() != mime::MULTIPART {
78            return Err(Error::ContentTypeIncompatible);
79        }
80
81        let boundary = content_type
82            .get_param(mime::BOUNDARY)
83            .ok_or(Error::BoundaryMissing)?
84            .as_str()
85            .to_owned();
86
87        Ok((content_type, boundary))
88    }
89
90    /// Constructs a new multipart reader from given Content-Type, boundary, and stream.
91    pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
92    where
93        S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
94    {
95        Multipart {
96            safety: Safety::new(),
97            flow: Flow::InFlight(Inner {
98                payload: PayloadRef::new(PayloadBuffer::new(stream)),
99                content_type: ct,
100                boundary,
101                state: State::FirstBoundary,
102                item: Item::None,
103            }),
104        }
105    }
106
107    /// Constructs a new multipart reader from given `MultipartError`.
108    pub(crate) fn from_error(err: Error) -> Multipart {
109        Multipart {
110            flow: Flow::Error(Some(err)),
111            safety: Safety::new(),
112        }
113    }
114
115    /// Return requests parsed Content-Type or raise the stored error.
116    pub(crate) fn content_type_or_bail(&mut self) -> Result<mime::Mime, Error> {
117        match self.flow {
118            Flow::InFlight(ref inner) => Ok(inner.content_type.clone()),
119            Flow::Error(ref mut err) => Err(err
120                .take()
121                .expect("error should not be taken after it was returned")),
122        }
123    }
124}
125
126impl Stream for Multipart {
127    type Item = Result<Field, Error>;
128
129    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130        let this = self.get_mut();
131
132        match this.flow {
133            Flow::InFlight(ref mut inner) => {
134                if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
135                    // check safety and poll read payload to buffer.
136                    buffer.poll_stream(cx)?;
137                } else if !this.safety.is_clean() {
138                    // safety violation
139                    return Poll::Ready(Some(Err(Error::NotConsumed)));
140                } else {
141                    return Poll::Pending;
142                }
143
144                inner.poll(&this.safety, cx)
145            }
146
147            Flow::Error(ref mut err) => Poll::Ready(Some(Err(err
148                .take()
149                .expect("Multipart polled after finish")))),
150        }
151    }
152}
153
154#[derive(PartialEq, Debug)]
155enum State {
156    /// Skip data until first boundary.
157    FirstBoundary,
158
159    /// Reading boundary.
160    Boundary,
161
162    /// Reading Headers.
163    Headers,
164
165    /// Stream EOF.
166    Eof,
167}
168
169enum Item {
170    None,
171    Field(Rc<RefCell<InnerField>>),
172}
173
174struct Inner {
175    /// Request's payload stream & buffer.
176    payload: PayloadRef,
177
178    /// Request's Content-Type.
179    ///
180    /// Guaranteed to have "multipart" top-level media type, i.e., `multipart/*`.
181    content_type: Mime,
182
183    /// Field boundary.
184    boundary: String,
185
186    state: State,
187    item: Item,
188}
189
190impl Inner {
191    fn read_field_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, Error> {
192        match payload.read_until(b"\r\n\r\n")? {
193            None => {
194                if payload.eof {
195                    Err(Error::Incomplete)
196                } else {
197                    Ok(None)
198                }
199            }
200
201            Some(bytes) => {
202                let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
203
204                match httparse::parse_headers(&bytes, &mut hdrs).map_err(ParseError::from)? {
205                    httparse::Status::Complete((_, hdrs)) => {
206                        // convert headers
207                        let mut headers = HeaderMap::with_capacity(hdrs.len());
208
209                        for h in hdrs {
210                            let name =
211                                HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
212                            let value =
213                                HeaderValue::try_from(h.value).map_err(|_| ParseError::Header)?;
214                            headers.append(name, value);
215                        }
216
217                        Ok(Some(headers))
218                    }
219
220                    httparse::Status::Partial => Err(ParseError::Header.into()),
221                }
222            }
223        }
224    }
225
226    /// Reads a field boundary from the payload buffer (and discards it).
227    ///
228    /// Reads "in-between" and "final" boundaries. E.g. for boundary = "foo":
229    ///
230    /// ```plain
231    /// --foo    <-- in-between fields
232    /// --foo--  <-- end of request body, should be followed by EOF
233    /// ```
234    ///
235    /// Returns:
236    ///
237    /// - `Ok(Some(true))` - final field boundary read (EOF)
238    /// - `Ok(Some(false))` - field boundary read
239    /// - `Ok(None)` - boundary not found, more data needs reading
240    /// - `Err(BoundaryMissing)` - multipart boundary is missing
241    fn read_boundary(payload: &mut PayloadBuffer, boundary: &str) -> Result<Option<bool>, Error> {
242        // TODO: need to read epilogue
243        let chunk = match payload.readline_or_eof()? {
244            // TODO: this might be okay as a let Some() else return Ok(None)
245            None => return Ok(payload.eof.then_some(true)),
246            Some(chunk) => chunk,
247        };
248
249        const BOUNDARY_MARKER: &[u8] = b"--";
250        const LINE_BREAK: &[u8] = b"\r\n";
251
252        let boundary_len = boundary.len();
253
254        if chunk.len() < boundary_len + 2 + 2
255            || !chunk.starts_with(BOUNDARY_MARKER)
256            || &chunk[2..boundary_len + 2] != boundary.as_bytes()
257        {
258            return Err(Error::BoundaryMissing);
259        }
260
261        // chunk facts:
262        // - long enough to contain boundary + 2 markers or 1 marker and line-break
263        // - starts with boundary marker
264        // - chunk contains correct boundary
265
266        if &chunk[boundary_len + 2..] == LINE_BREAK {
267            // boundary is followed by line-break, indicating more fields to come
268            return Ok(Some(false));
269        }
270
271        // boundary is followed by marker
272        if &chunk[boundary_len + 2..boundary_len + 4] == BOUNDARY_MARKER
273            && (
274                // chunk is exactly boundary len + 2 markers
275                chunk.len() == boundary_len + 2 + 2
276                // final boundary is allowed to end with a line-break
277                || &chunk[boundary_len + 4..] == LINE_BREAK
278            )
279        {
280            return Ok(Some(true));
281        }
282
283        Err(Error::BoundaryMissing)
284    }
285
286    fn skip_until_boundary(
287        payload: &mut PayloadBuffer,
288        boundary: &str,
289    ) -> Result<Option<bool>, Error> {
290        let mut eof = false;
291
292        loop {
293            match payload.readline()? {
294                Some(chunk) => {
295                    if chunk.is_empty() {
296                        return Err(Error::BoundaryMissing);
297                    }
298                    if chunk.len() < boundary.len() {
299                        continue;
300                    }
301                    if &chunk[..2] == b"--" && &chunk[2..chunk.len() - 2] == boundary.as_bytes() {
302                        break;
303                    } else {
304                        if chunk.len() < boundary.len() + 2 {
305                            continue;
306                        }
307                        let b: &[u8] = boundary.as_ref();
308                        if &chunk[..boundary.len()] == b
309                            && &chunk[boundary.len()..boundary.len() + 2] == b"--"
310                        {
311                            eof = true;
312                            break;
313                        }
314                    }
315                }
316                None => {
317                    return if payload.eof {
318                        Err(Error::Incomplete)
319                    } else {
320                        Ok(None)
321                    };
322                }
323            }
324        }
325        Ok(Some(eof))
326    }
327
328    fn poll(&mut self, safety: &Safety, cx: &Context<'_>) -> Poll<Option<Result<Field, Error>>> {
329        if self.state == State::Eof {
330            Poll::Ready(None)
331        } else {
332            // release field
333            loop {
334                // Nested multipart streams of fields has to be consumed
335                // before switching to next
336                if safety.current() {
337                    let stop = match self.item {
338                        Item::Field(ref mut field) => match field.borrow_mut().poll(safety) {
339                            Poll::Pending => return Poll::Pending,
340                            Poll::Ready(Some(Ok(_))) => continue,
341                            Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
342                            Poll::Ready(None) => true,
343                        },
344                        Item::None => false,
345                    };
346                    if stop {
347                        self.item = Item::None;
348                    }
349                    if let Item::None = self.item {
350                        break;
351                    }
352                }
353            }
354
355            let field_headers = if let Some(mut payload) = self.payload.get_mut(safety) {
356                match self.state {
357                    // read until first boundary
358                    State::FirstBoundary => {
359                        match Inner::skip_until_boundary(&mut payload, &self.boundary)? {
360                            None => return Poll::Pending,
361                            Some(eof) => {
362                                if eof {
363                                    self.state = State::Eof;
364                                    return Poll::Ready(None);
365                                } else {
366                                    self.state = State::Headers;
367                                }
368                            }
369                        }
370                    }
371
372                    // read boundary
373                    State::Boundary => match Inner::read_boundary(&mut payload, &self.boundary)? {
374                        None => return Poll::Pending,
375                        Some(eof) => {
376                            if eof {
377                                self.state = State::Eof;
378                                return Poll::Ready(None);
379                            } else {
380                                self.state = State::Headers;
381                            }
382                        }
383                    },
384
385                    _ => {}
386                }
387
388                // read field headers for next field
389                if self.state == State::Headers {
390                    if let Some(headers) = Inner::read_field_headers(&mut payload)? {
391                        self.state = State::Boundary;
392                        headers
393                    } else {
394                        return Poll::Pending;
395                    }
396                } else {
397                    unreachable!()
398                }
399            } else {
400                log::debug!("NotReady: field is in flight");
401                return Poll::Pending;
402            };
403
404            let field_content_disposition = field_headers
405                .get(&header::CONTENT_DISPOSITION)
406                .and_then(|cd| ContentDisposition::from_raw(cd).ok())
407                .filter(|content_disposition| {
408                    matches!(
409                        content_disposition.disposition,
410                        header::DispositionType::FormData,
411                    )
412                });
413
414            let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
415                // According to RFC 7578 ยง4.2, which relates to "multipart/form-data" requests
416                // specifically, fields must have a Content-Disposition header, its disposition
417                // type must be set as "form-data", and it must have a name parameter.
418
419                let Some(cd) = &field_content_disposition else {
420                    return Poll::Ready(Some(Err(Error::ContentDispositionMissing)));
421                };
422
423                let Some(field_name) = cd.get_name() else {
424                    return Poll::Ready(Some(Err(Error::ContentDispositionNameMissing)));
425                };
426
427                Some(field_name.to_owned())
428            } else {
429                None
430            };
431
432            // TODO: check out other multipart/* RFCs for specific requirements
433
434            let field_content_type: Option<Mime> = field_headers
435                .get(&header::CONTENT_TYPE)
436                .and_then(|ct| ct.to_str().ok())
437                .and_then(|ct| ct.parse().ok());
438
439            self.state = State::Boundary;
440
441            // nested multipart stream is not supported
442            if let Some(mime) = &field_content_type {
443                if mime.type_() == mime::MULTIPART {
444                    return Poll::Ready(Some(Err(Error::Nested)));
445                }
446            }
447
448            let field_inner =
449                InnerField::new_in_rc(self.payload.clone(), self.boundary.clone(), &field_headers)?;
450
451            self.item = Item::Field(Rc::clone(&field_inner));
452
453            Poll::Ready(Some(Ok(Field::new(
454                field_content_type,
455                field_content_disposition,
456                form_field_name,
457                field_headers,
458                safety.clone(cx),
459                field_inner,
460            ))))
461        }
462    }
463}
464
465impl Drop for Inner {
466    fn drop(&mut self) {
467        // InnerMultipartItem::Field has to be dropped first because of Safety.
468        self.item = Item::None;
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use std::time::Duration;
475
476    use actix_http::h1;
477    use actix_web::{
478        http::header::{DispositionParam, DispositionType},
479        rt,
480        test::TestRequest,
481        web::{BufMut as _, BytesMut},
482        FromRequest,
483    };
484    use assert_matches::assert_matches;
485    use futures_test::stream::StreamTestExt as _;
486    use futures_util::{future::lazy, stream, StreamExt as _};
487    use tokio::sync::mpsc;
488    use tokio_stream::wrappers::UnboundedReceiverStream;
489
490    use super::*;
491
492    const BOUNDARY: &str = "abbc761f78ff4d7cb7573b5a23f96ef0";
493
494    #[actix_rt::test]
495    async fn test_boundary() {
496        let headers = HeaderMap::new();
497        match Multipart::find_ct_and_boundary(&headers) {
498            Err(Error::ContentTypeMissing) => {}
499            _ => unreachable!("should not happen"),
500        }
501
502        let mut headers = HeaderMap::new();
503        headers.insert(
504            header::CONTENT_TYPE,
505            header::HeaderValue::from_static("test"),
506        );
507
508        match Multipart::find_ct_and_boundary(&headers) {
509            Err(Error::ContentTypeParse) => {}
510            _ => unreachable!("should not happen"),
511        }
512
513        let mut headers = HeaderMap::new();
514        headers.insert(
515            header::CONTENT_TYPE,
516            header::HeaderValue::from_static("multipart/mixed"),
517        );
518        match Multipart::find_ct_and_boundary(&headers) {
519            Err(Error::BoundaryMissing) => {}
520            _ => unreachable!("should not happen"),
521        }
522
523        let mut headers = HeaderMap::new();
524        headers.insert(
525            header::CONTENT_TYPE,
526            header::HeaderValue::from_static(
527                "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
528            ),
529        );
530
531        assert_eq!(
532            Multipart::find_ct_and_boundary(&headers).unwrap().1,
533            "5c02368e880e436dab70ed54e1c58209",
534        );
535    }
536
537    fn create_stream() -> (
538        mpsc::UnboundedSender<Result<Bytes, PayloadError>>,
539        impl Stream<Item = Result<Bytes, PayloadError>>,
540    ) {
541        let (tx, rx) = mpsc::unbounded_channel();
542
543        (
544            tx,
545            UnboundedReceiverStream::new(rx).map(|res| res.map_err(|_| panic!())),
546        )
547    }
548
549    fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
550        let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary(
551            BOUNDARY,
552            "file",
553            Some("fn.txt".to_owned()),
554            Some(mime::TEXT_PLAIN_UTF_8),
555            Bytes::from_static(b"data"),
556        );
557
558        let mut buf = BytesMut::with_capacity(body.len() + 14);
559
560        // add junk before form to test pre-boundary data rejection
561        buf.put("testasdadsad\r\n".as_bytes());
562
563        buf.put(body);
564
565        (buf.freeze(), headers)
566    }
567
568    // TODO: use test utility when multi-file support is introduced
569    fn create_double_request_with_header() -> (Bytes, HeaderMap) {
570        let bytes = Bytes::from(
571            "testasdadsad\r\n\
572             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
573             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
574             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
575             test\r\n\
576             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
577             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
578             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
579             data\r\n\
580             --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
581        );
582        let mut headers = HeaderMap::new();
583        headers.insert(
584            header::CONTENT_TYPE,
585            header::HeaderValue::from_static(
586                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
587            ),
588        );
589        (bytes, headers)
590    }
591
592    #[actix_rt::test]
593    async fn test_multipart_no_end_crlf() {
594        let (sender, payload) = create_stream();
595        let (mut bytes, headers) = create_double_request_with_header();
596        let bytes_stripped = bytes.split_to(bytes.len()); // strip crlf
597
598        sender.send(Ok(bytes_stripped)).unwrap();
599        drop(sender); // eof
600
601        let mut multipart = Multipart::new(&headers, payload);
602
603        match multipart.next().await.unwrap() {
604            Ok(_) => {}
605            _ => unreachable!(),
606        }
607
608        match multipart.next().await.unwrap() {
609            Ok(_) => {}
610            _ => unreachable!(),
611        }
612
613        match multipart.next().await {
614            None => {}
615            _ => unreachable!(),
616        }
617    }
618
619    #[actix_rt::test]
620    async fn test_multipart() {
621        let (sender, payload) = create_stream();
622        let (bytes, headers) = create_double_request_with_header();
623
624        sender.send(Ok(bytes)).unwrap();
625
626        let mut multipart = Multipart::new(&headers, payload);
627        match multipart.next().await {
628            Some(Ok(mut field)) => {
629                let cd = field.content_disposition().unwrap();
630                assert_eq!(cd.disposition, DispositionType::FormData);
631                assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
632
633                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
634                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
635
636                match field.next().await.unwrap() {
637                    Ok(chunk) => assert_eq!(chunk, "test"),
638                    _ => unreachable!(),
639                }
640                match field.next().await {
641                    None => {}
642                    _ => unreachable!(),
643                }
644            }
645            _ => unreachable!(),
646        }
647
648        match multipart.next().await.unwrap() {
649            Ok(mut field) => {
650                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
651                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
652
653                match field.next().await {
654                    Some(Ok(chunk)) => assert_eq!(chunk, "data"),
655                    _ => unreachable!(),
656                }
657                match field.next().await {
658                    None => {}
659                    _ => unreachable!(),
660                }
661            }
662            _ => unreachable!(),
663        }
664
665        match multipart.next().await {
666            None => {}
667            _ => unreachable!(),
668        }
669    }
670
671    // Loops, collecting all bytes until end-of-field
672    async fn get_whole_field(field: &mut Field) -> BytesMut {
673        let mut b = BytesMut::new();
674        loop {
675            match field.next().await {
676                Some(Ok(chunk)) => b.extend_from_slice(&chunk),
677                None => return b,
678                _ => unreachable!(),
679            }
680        }
681    }
682
683    #[actix_rt::test]
684    async fn test_stream() {
685        let (bytes, headers) = create_double_request_with_header();
686        let payload = stream::iter(bytes)
687            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
688            .interleave_pending();
689
690        let mut multipart = Multipart::new(&headers, payload);
691        match multipart.next().await.unwrap() {
692            Ok(mut field) => {
693                let cd = field.content_disposition().unwrap();
694                assert_eq!(cd.disposition, DispositionType::FormData);
695                assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
696
697                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
698                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
699
700                assert_eq!(get_whole_field(&mut field).await, "test");
701            }
702            _ => unreachable!(),
703        }
704
705        match multipart.next().await {
706            Some(Ok(mut field)) => {
707                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
708                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
709
710                assert_eq!(get_whole_field(&mut field).await, "data");
711            }
712            _ => unreachable!(),
713        }
714
715        match multipart.next().await {
716            None => {}
717            _ => unreachable!(),
718        }
719    }
720
721    #[actix_rt::test]
722    async fn test_basic() {
723        let (_, payload) = h1::Payload::create(false);
724        let mut payload = PayloadBuffer::new(payload);
725
726        assert_eq!(payload.buf.len(), 0);
727        lazy(|cx| payload.poll_stream(cx)).await.unwrap();
728        assert_eq!(None, payload.read_max(1).unwrap());
729    }
730
731    #[actix_rt::test]
732    async fn test_eof() {
733        let (mut sender, payload) = h1::Payload::create(false);
734        let mut payload = PayloadBuffer::new(payload);
735
736        assert_eq!(None, payload.read_max(4).unwrap());
737        sender.feed_data(Bytes::from("data"));
738        sender.feed_eof();
739        lazy(|cx| payload.poll_stream(cx)).await.unwrap();
740
741        assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
742        assert_eq!(payload.buf.len(), 0);
743        assert!(payload.read_max(1).is_err());
744        assert!(payload.eof);
745    }
746
747    #[actix_rt::test]
748    async fn test_err() {
749        let (mut sender, payload) = h1::Payload::create(false);
750        let mut payload = PayloadBuffer::new(payload);
751        assert_eq!(None, payload.read_max(1).unwrap());
752        sender.set_error(PayloadError::Incomplete(None));
753        lazy(|cx| payload.poll_stream(cx)).await.err().unwrap();
754    }
755
756    #[actix_rt::test]
757    async fn read_max() {
758        let (mut sender, payload) = h1::Payload::create(false);
759        let mut payload = PayloadBuffer::new(payload);
760
761        sender.feed_data(Bytes::from("line1"));
762        sender.feed_data(Bytes::from("line2"));
763        lazy(|cx| payload.poll_stream(cx)).await.unwrap();
764        assert_eq!(payload.buf.len(), 10);
765
766        assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
767        assert_eq!(payload.buf.len(), 5);
768
769        assert_eq!(Some(Bytes::from("line2")), payload.read_max(5).unwrap());
770        assert_eq!(payload.buf.len(), 0);
771    }
772
773    #[actix_rt::test]
774    async fn read_exactly() {
775        let (mut sender, payload) = h1::Payload::create(false);
776        let mut payload = PayloadBuffer::new(payload);
777
778        assert_eq!(None, payload.read_exact(2));
779
780        sender.feed_data(Bytes::from("line1"));
781        sender.feed_data(Bytes::from("line2"));
782        lazy(|cx| payload.poll_stream(cx)).await.unwrap();
783
784        assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
785        assert_eq!(payload.buf.len(), 8);
786
787        assert_eq!(Some(Bytes::from_static(b"ne1l")), payload.read_exact(4));
788        assert_eq!(payload.buf.len(), 4);
789    }
790
791    #[actix_rt::test]
792    async fn read_until() {
793        let (mut sender, payload) = h1::Payload::create(false);
794        let mut payload = PayloadBuffer::new(payload);
795
796        assert_eq!(None, payload.read_until(b"ne").unwrap());
797
798        sender.feed_data(Bytes::from("line1"));
799        sender.feed_data(Bytes::from("line2"));
800        lazy(|cx| payload.poll_stream(cx)).await.unwrap();
801
802        assert_eq!(
803            Some(Bytes::from("line")),
804            payload.read_until(b"ne").unwrap()
805        );
806        assert_eq!(payload.buf.len(), 6);
807
808        assert_eq!(
809            Some(Bytes::from("1line2")),
810            payload.read_until(b"2").unwrap()
811        );
812        assert_eq!(payload.buf.len(), 0);
813    }
814
815    #[actix_rt::test]
816    async fn test_multipart_from_error() {
817        let err = Error::ContentTypeMissing;
818        let mut multipart = Multipart::from_error(err);
819        assert!(multipart.next().await.unwrap().is_err())
820    }
821
822    #[actix_rt::test]
823    async fn test_multipart_from_boundary() {
824        let (_, payload) = create_stream();
825        let (_, headers) = create_simple_request_with_header();
826        let (ct, boundary) = Multipart::find_ct_and_boundary(&headers).unwrap();
827        let _ = Multipart::from_ct_and_boundary(ct, boundary, payload);
828    }
829
830    #[actix_rt::test]
831    async fn test_multipart_payload_consumption() {
832        // with sample payload and HttpRequest with no headers
833        let (_, inner_payload) = h1::Payload::create(false);
834        let mut payload = actix_web::dev::Payload::from(inner_payload);
835        let req = TestRequest::default().to_http_request();
836
837        // multipart should generate an error
838        let mut mp = Multipart::from_request(&req, &mut payload).await.unwrap();
839        assert!(mp.next().await.unwrap().is_err());
840
841        // and should not consume the payload
842        match payload {
843            actix_web::dev::Payload::H1 { .. } => {} //expected
844            _ => unreachable!(),
845        }
846    }
847
848    #[actix_rt::test]
849    async fn no_content_disposition_form_data() {
850        let bytes = Bytes::from(
851            "testasdadsad\r\n\
852             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
853             Content-Type: text/plain; charset=utf-8\r\n\
854             Content-Length: 4\r\n\
855             \r\n\
856             test\r\n\
857             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
858        );
859        let mut headers = HeaderMap::new();
860        headers.insert(
861            header::CONTENT_TYPE,
862            header::HeaderValue::from_static(
863                "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
864            ),
865        );
866        let payload = stream::iter(bytes)
867            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
868            .interleave_pending();
869
870        let mut multipart = Multipart::new(&headers, payload);
871        let res = multipart.next().await.unwrap();
872        assert_matches!(
873            res.expect_err(
874                "according to RFC 7578, form-data fields require a content-disposition header"
875            ),
876            Error::ContentDispositionMissing
877        );
878    }
879
880    #[actix_rt::test]
881    async fn no_content_disposition_non_form_data() {
882        let bytes = Bytes::from(
883            "testasdadsad\r\n\
884             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
885             Content-Type: text/plain; charset=utf-8\r\n\
886             Content-Length: 4\r\n\
887             \r\n\
888             test\r\n\
889             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
890        );
891        let mut headers = HeaderMap::new();
892        headers.insert(
893            header::CONTENT_TYPE,
894            header::HeaderValue::from_static(
895                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
896            ),
897        );
898        let payload = stream::iter(bytes)
899            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
900            .interleave_pending();
901
902        let mut multipart = Multipart::new(&headers, payload);
903        let res = multipart.next().await.unwrap();
904        res.unwrap();
905    }
906
907    #[actix_rt::test]
908    async fn no_name_in_form_data_content_disposition() {
909        let bytes = Bytes::from(
910            "testasdadsad\r\n\
911             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
912             Content-Disposition: form-data; filename=\"fn.txt\"\r\n\
913             Content-Type: text/plain; charset=utf-8\r\n\
914             Content-Length: 4\r\n\
915             \r\n\
916             test\r\n\
917             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
918        );
919        let mut headers = HeaderMap::new();
920        headers.insert(
921            header::CONTENT_TYPE,
922            header::HeaderValue::from_static(
923                "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
924            ),
925        );
926        let payload = stream::iter(bytes)
927            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
928            .interleave_pending();
929
930        let mut multipart = Multipart::new(&headers, payload);
931        let res = multipart.next().await.unwrap();
932        assert_matches!(
933            res.expect_err("according to RFC 7578, form-data fields require a name attribute"),
934            Error::ContentDispositionNameMissing
935        );
936    }
937
938    #[actix_rt::test]
939    async fn test_drop_multipart_dont_hang() {
940        let (sender, payload) = create_stream();
941        let (bytes, headers) = create_simple_request_with_header();
942        sender.send(Ok(bytes)).unwrap();
943        drop(sender); // eof
944
945        let mut multipart = Multipart::new(&headers, payload);
946        let mut field = multipart.next().await.unwrap().unwrap();
947
948        drop(multipart);
949
950        // should fail immediately
951        match field.next().await {
952            Some(Err(Error::NotConsumed)) => {}
953            _ => panic!(),
954        };
955    }
956
957    #[actix_rt::test]
958    async fn test_drop_field_awaken_multipart() {
959        let (sender, payload) = create_stream();
960        let (bytes, headers) = create_double_request_with_header();
961        sender.send(Ok(bytes)).unwrap();
962        drop(sender); // eof
963
964        let mut multipart = Multipart::new(&headers, payload);
965        let mut field = multipart.next().await.unwrap().unwrap();
966
967        let task = rt::spawn(async move {
968            rt::time::sleep(Duration::from_millis(500)).await;
969            assert_eq!(field.next().await.unwrap().unwrap(), "test");
970            drop(field);
971        });
972
973        // dropping field should awaken current task
974        let _ = multipart.next().await.unwrap().unwrap();
975        task.await.unwrap();
976    }
977}