1use std::{
4 cell::RefCell,
5 pin::Pin,
6 rc::Rc,
7 task::{Context, Poll},
8};
9
10use actix_web::{
11 dev,
12 error::{ParseError, PayloadError},
13 http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
14 web::Bytes,
15 HttpRequest,
16};
17use futures_core::stream::Stream;
18use mime::Mime;
19
20use crate::{
21 error::Error,
22 field::InnerField,
23 payload::{PayloadBuffer, PayloadRef},
24 safety::Safety,
25 Field,
26};
27
28const MAX_HEADERS: usize = 32;
29
30pub struct Multipart {
36 flow: Flow,
37 safety: Safety,
38}
39
40enum Flow {
41 InFlight(Inner),
42
43 Error(Option<Error>),
45}
46
47impl Multipart {
48 pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
50 where
51 S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
52 {
53 match Self::find_ct_and_boundary(headers) {
54 Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, stream),
55 Err(err) => Self::from_error(err),
56 }
57 }
58
59 pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
61 match Self::find_ct_and_boundary(req.headers()) {
62 Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, payload.take()),
63 Err(err) => Self::from_error(err),
64 }
65 }
66
67 pub(crate) fn find_ct_and_boundary(headers: &HeaderMap) -> Result<(Mime, String), Error> {
69 let content_type = headers
70 .get(&header::CONTENT_TYPE)
71 .ok_or(Error::ContentTypeMissing)?
72 .to_str()
73 .ok()
74 .and_then(|content_type| content_type.parse::<Mime>().ok())
75 .ok_or(Error::ContentTypeParse)?;
76
77 if content_type.type_() != mime::MULTIPART {
78 return Err(Error::ContentTypeIncompatible);
79 }
80
81 let boundary = content_type
82 .get_param(mime::BOUNDARY)
83 .ok_or(Error::BoundaryMissing)?
84 .as_str()
85 .to_owned();
86
87 Ok((content_type, boundary))
88 }
89
90 pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
92 where
93 S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
94 {
95 Multipart {
96 safety: Safety::new(),
97 flow: Flow::InFlight(Inner {
98 payload: PayloadRef::new(PayloadBuffer::new(stream)),
99 content_type: ct,
100 boundary,
101 state: State::FirstBoundary,
102 item: Item::None,
103 }),
104 }
105 }
106
107 pub(crate) fn from_error(err: Error) -> Multipart {
109 Multipart {
110 flow: Flow::Error(Some(err)),
111 safety: Safety::new(),
112 }
113 }
114
115 pub(crate) fn content_type_or_bail(&mut self) -> Result<mime::Mime, Error> {
117 match self.flow {
118 Flow::InFlight(ref inner) => Ok(inner.content_type.clone()),
119 Flow::Error(ref mut err) => Err(err
120 .take()
121 .expect("error should not be taken after it was returned")),
122 }
123 }
124}
125
126impl Stream for Multipart {
127 type Item = Result<Field, Error>;
128
129 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 let this = self.get_mut();
131
132 match this.flow {
133 Flow::InFlight(ref mut inner) => {
134 if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
135 buffer.poll_stream(cx)?;
137 } else if !this.safety.is_clean() {
138 return Poll::Ready(Some(Err(Error::NotConsumed)));
140 } else {
141 return Poll::Pending;
142 }
143
144 inner.poll(&this.safety, cx)
145 }
146
147 Flow::Error(ref mut err) => Poll::Ready(Some(Err(err
148 .take()
149 .expect("Multipart polled after finish")))),
150 }
151 }
152}
153
154#[derive(PartialEq, Debug)]
155enum State {
156 FirstBoundary,
158
159 Boundary,
161
162 Headers,
164
165 Eof,
167}
168
169enum Item {
170 None,
171 Field(Rc<RefCell<InnerField>>),
172}
173
174struct Inner {
175 payload: PayloadRef,
177
178 content_type: Mime,
182
183 boundary: String,
185
186 state: State,
187 item: Item,
188}
189
190impl Inner {
191 fn read_field_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, Error> {
192 match payload.read_until(b"\r\n\r\n")? {
193 None => {
194 if payload.eof {
195 Err(Error::Incomplete)
196 } else {
197 Ok(None)
198 }
199 }
200
201 Some(bytes) => {
202 let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
203
204 match httparse::parse_headers(&bytes, &mut hdrs).map_err(ParseError::from)? {
205 httparse::Status::Complete((_, hdrs)) => {
206 let mut headers = HeaderMap::with_capacity(hdrs.len());
208
209 for h in hdrs {
210 let name =
211 HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
212 let value =
213 HeaderValue::try_from(h.value).map_err(|_| ParseError::Header)?;
214 headers.append(name, value);
215 }
216
217 Ok(Some(headers))
218 }
219
220 httparse::Status::Partial => Err(ParseError::Header.into()),
221 }
222 }
223 }
224 }
225
226 fn read_boundary(payload: &mut PayloadBuffer, boundary: &str) -> Result<Option<bool>, Error> {
242 let chunk = match payload.readline_or_eof()? {
244 None => return Ok(payload.eof.then_some(true)),
246 Some(chunk) => chunk,
247 };
248
249 const BOUNDARY_MARKER: &[u8] = b"--";
250 const LINE_BREAK: &[u8] = b"\r\n";
251
252 let boundary_len = boundary.len();
253
254 if chunk.len() < boundary_len + 2 + 2
255 || !chunk.starts_with(BOUNDARY_MARKER)
256 || &chunk[2..boundary_len + 2] != boundary.as_bytes()
257 {
258 return Err(Error::BoundaryMissing);
259 }
260
261 if &chunk[boundary_len + 2..] == LINE_BREAK {
267 return Ok(Some(false));
269 }
270
271 if &chunk[boundary_len + 2..boundary_len + 4] == BOUNDARY_MARKER
273 && (
274 chunk.len() == boundary_len + 2 + 2
276 || &chunk[boundary_len + 4..] == LINE_BREAK
278 )
279 {
280 return Ok(Some(true));
281 }
282
283 Err(Error::BoundaryMissing)
284 }
285
286 fn skip_until_boundary(
287 payload: &mut PayloadBuffer,
288 boundary: &str,
289 ) -> Result<Option<bool>, Error> {
290 let mut eof = false;
291
292 loop {
293 match payload.readline()? {
294 Some(chunk) => {
295 if chunk.is_empty() {
296 return Err(Error::BoundaryMissing);
297 }
298 if chunk.len() < boundary.len() {
299 continue;
300 }
301 if &chunk[..2] == b"--" && &chunk[2..chunk.len() - 2] == boundary.as_bytes() {
302 break;
303 } else {
304 if chunk.len() < boundary.len() + 2 {
305 continue;
306 }
307 let b: &[u8] = boundary.as_ref();
308 if &chunk[..boundary.len()] == b
309 && &chunk[boundary.len()..boundary.len() + 2] == b"--"
310 {
311 eof = true;
312 break;
313 }
314 }
315 }
316 None => {
317 return if payload.eof {
318 Err(Error::Incomplete)
319 } else {
320 Ok(None)
321 };
322 }
323 }
324 }
325 Ok(Some(eof))
326 }
327
328 fn poll(&mut self, safety: &Safety, cx: &Context<'_>) -> Poll<Option<Result<Field, Error>>> {
329 if self.state == State::Eof {
330 Poll::Ready(None)
331 } else {
332 loop {
334 if safety.current() {
337 let stop = match self.item {
338 Item::Field(ref mut field) => match field.borrow_mut().poll(safety) {
339 Poll::Pending => return Poll::Pending,
340 Poll::Ready(Some(Ok(_))) => continue,
341 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
342 Poll::Ready(None) => true,
343 },
344 Item::None => false,
345 };
346 if stop {
347 self.item = Item::None;
348 }
349 if let Item::None = self.item {
350 break;
351 }
352 }
353 }
354
355 let field_headers = if let Some(mut payload) = self.payload.get_mut(safety) {
356 match self.state {
357 State::FirstBoundary => {
359 match Inner::skip_until_boundary(&mut payload, &self.boundary)? {
360 None => return Poll::Pending,
361 Some(eof) => {
362 if eof {
363 self.state = State::Eof;
364 return Poll::Ready(None);
365 } else {
366 self.state = State::Headers;
367 }
368 }
369 }
370 }
371
372 State::Boundary => match Inner::read_boundary(&mut payload, &self.boundary)? {
374 None => return Poll::Pending,
375 Some(eof) => {
376 if eof {
377 self.state = State::Eof;
378 return Poll::Ready(None);
379 } else {
380 self.state = State::Headers;
381 }
382 }
383 },
384
385 _ => {}
386 }
387
388 if self.state == State::Headers {
390 if let Some(headers) = Inner::read_field_headers(&mut payload)? {
391 self.state = State::Boundary;
392 headers
393 } else {
394 return Poll::Pending;
395 }
396 } else {
397 unreachable!()
398 }
399 } else {
400 log::debug!("NotReady: field is in flight");
401 return Poll::Pending;
402 };
403
404 let field_content_disposition = field_headers
405 .get(&header::CONTENT_DISPOSITION)
406 .and_then(|cd| ContentDisposition::from_raw(cd).ok())
407 .filter(|content_disposition| {
408 matches!(
409 content_disposition.disposition,
410 header::DispositionType::FormData,
411 )
412 });
413
414 let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
415 let Some(cd) = &field_content_disposition else {
420 return Poll::Ready(Some(Err(Error::ContentDispositionMissing)));
421 };
422
423 let Some(field_name) = cd.get_name() else {
424 return Poll::Ready(Some(Err(Error::ContentDispositionNameMissing)));
425 };
426
427 Some(field_name.to_owned())
428 } else {
429 None
430 };
431
432 let field_content_type: Option<Mime> = field_headers
435 .get(&header::CONTENT_TYPE)
436 .and_then(|ct| ct.to_str().ok())
437 .and_then(|ct| ct.parse().ok());
438
439 self.state = State::Boundary;
440
441 if let Some(mime) = &field_content_type {
443 if mime.type_() == mime::MULTIPART {
444 return Poll::Ready(Some(Err(Error::Nested)));
445 }
446 }
447
448 let field_inner =
449 InnerField::new_in_rc(self.payload.clone(), self.boundary.clone(), &field_headers)?;
450
451 self.item = Item::Field(Rc::clone(&field_inner));
452
453 Poll::Ready(Some(Ok(Field::new(
454 field_content_type,
455 field_content_disposition,
456 form_field_name,
457 field_headers,
458 safety.clone(cx),
459 field_inner,
460 ))))
461 }
462 }
463}
464
465impl Drop for Inner {
466 fn drop(&mut self) {
467 self.item = Item::None;
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use std::time::Duration;
475
476 use actix_http::h1;
477 use actix_web::{
478 http::header::{DispositionParam, DispositionType},
479 rt,
480 test::TestRequest,
481 web::{BufMut as _, BytesMut},
482 FromRequest,
483 };
484 use assert_matches::assert_matches;
485 use futures_test::stream::StreamTestExt as _;
486 use futures_util::{future::lazy, stream, StreamExt as _};
487 use tokio::sync::mpsc;
488 use tokio_stream::wrappers::UnboundedReceiverStream;
489
490 use super::*;
491
492 const BOUNDARY: &str = "abbc761f78ff4d7cb7573b5a23f96ef0";
493
494 #[actix_rt::test]
495 async fn test_boundary() {
496 let headers = HeaderMap::new();
497 match Multipart::find_ct_and_boundary(&headers) {
498 Err(Error::ContentTypeMissing) => {}
499 _ => unreachable!("should not happen"),
500 }
501
502 let mut headers = HeaderMap::new();
503 headers.insert(
504 header::CONTENT_TYPE,
505 header::HeaderValue::from_static("test"),
506 );
507
508 match Multipart::find_ct_and_boundary(&headers) {
509 Err(Error::ContentTypeParse) => {}
510 _ => unreachable!("should not happen"),
511 }
512
513 let mut headers = HeaderMap::new();
514 headers.insert(
515 header::CONTENT_TYPE,
516 header::HeaderValue::from_static("multipart/mixed"),
517 );
518 match Multipart::find_ct_and_boundary(&headers) {
519 Err(Error::BoundaryMissing) => {}
520 _ => unreachable!("should not happen"),
521 }
522
523 let mut headers = HeaderMap::new();
524 headers.insert(
525 header::CONTENT_TYPE,
526 header::HeaderValue::from_static(
527 "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
528 ),
529 );
530
531 assert_eq!(
532 Multipart::find_ct_and_boundary(&headers).unwrap().1,
533 "5c02368e880e436dab70ed54e1c58209",
534 );
535 }
536
537 fn create_stream() -> (
538 mpsc::UnboundedSender<Result<Bytes, PayloadError>>,
539 impl Stream<Item = Result<Bytes, PayloadError>>,
540 ) {
541 let (tx, rx) = mpsc::unbounded_channel();
542
543 (
544 tx,
545 UnboundedReceiverStream::new(rx).map(|res| res.map_err(|_| panic!())),
546 )
547 }
548
549 fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
550 let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary(
551 BOUNDARY,
552 "file",
553 Some("fn.txt".to_owned()),
554 Some(mime::TEXT_PLAIN_UTF_8),
555 Bytes::from_static(b"data"),
556 );
557
558 let mut buf = BytesMut::with_capacity(body.len() + 14);
559
560 buf.put("testasdadsad\r\n".as_bytes());
562
563 buf.put(body);
564
565 (buf.freeze(), headers)
566 }
567
568 fn create_double_request_with_header() -> (Bytes, HeaderMap) {
570 let bytes = Bytes::from(
571 "testasdadsad\r\n\
572 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
573 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
574 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
575 test\r\n\
576 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
577 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
578 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
579 data\r\n\
580 --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
581 );
582 let mut headers = HeaderMap::new();
583 headers.insert(
584 header::CONTENT_TYPE,
585 header::HeaderValue::from_static(
586 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
587 ),
588 );
589 (bytes, headers)
590 }
591
592 #[actix_rt::test]
593 async fn test_multipart_no_end_crlf() {
594 let (sender, payload) = create_stream();
595 let (mut bytes, headers) = create_double_request_with_header();
596 let bytes_stripped = bytes.split_to(bytes.len()); sender.send(Ok(bytes_stripped)).unwrap();
599 drop(sender); let mut multipart = Multipart::new(&headers, payload);
602
603 match multipart.next().await.unwrap() {
604 Ok(_) => {}
605 _ => unreachable!(),
606 }
607
608 match multipart.next().await.unwrap() {
609 Ok(_) => {}
610 _ => unreachable!(),
611 }
612
613 match multipart.next().await {
614 None => {}
615 _ => unreachable!(),
616 }
617 }
618
619 #[actix_rt::test]
620 async fn test_multipart() {
621 let (sender, payload) = create_stream();
622 let (bytes, headers) = create_double_request_with_header();
623
624 sender.send(Ok(bytes)).unwrap();
625
626 let mut multipart = Multipart::new(&headers, payload);
627 match multipart.next().await {
628 Some(Ok(mut field)) => {
629 let cd = field.content_disposition().unwrap();
630 assert_eq!(cd.disposition, DispositionType::FormData);
631 assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
632
633 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
634 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
635
636 match field.next().await.unwrap() {
637 Ok(chunk) => assert_eq!(chunk, "test"),
638 _ => unreachable!(),
639 }
640 match field.next().await {
641 None => {}
642 _ => unreachable!(),
643 }
644 }
645 _ => unreachable!(),
646 }
647
648 match multipart.next().await.unwrap() {
649 Ok(mut field) => {
650 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
651 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
652
653 match field.next().await {
654 Some(Ok(chunk)) => assert_eq!(chunk, "data"),
655 _ => unreachable!(),
656 }
657 match field.next().await {
658 None => {}
659 _ => unreachable!(),
660 }
661 }
662 _ => unreachable!(),
663 }
664
665 match multipart.next().await {
666 None => {}
667 _ => unreachable!(),
668 }
669 }
670
671 async fn get_whole_field(field: &mut Field) -> BytesMut {
673 let mut b = BytesMut::new();
674 loop {
675 match field.next().await {
676 Some(Ok(chunk)) => b.extend_from_slice(&chunk),
677 None => return b,
678 _ => unreachable!(),
679 }
680 }
681 }
682
683 #[actix_rt::test]
684 async fn test_stream() {
685 let (bytes, headers) = create_double_request_with_header();
686 let payload = stream::iter(bytes)
687 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
688 .interleave_pending();
689
690 let mut multipart = Multipart::new(&headers, payload);
691 match multipart.next().await.unwrap() {
692 Ok(mut field) => {
693 let cd = field.content_disposition().unwrap();
694 assert_eq!(cd.disposition, DispositionType::FormData);
695 assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
696
697 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
698 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
699
700 assert_eq!(get_whole_field(&mut field).await, "test");
701 }
702 _ => unreachable!(),
703 }
704
705 match multipart.next().await {
706 Some(Ok(mut field)) => {
707 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
708 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
709
710 assert_eq!(get_whole_field(&mut field).await, "data");
711 }
712 _ => unreachable!(),
713 }
714
715 match multipart.next().await {
716 None => {}
717 _ => unreachable!(),
718 }
719 }
720
721 #[actix_rt::test]
722 async fn test_basic() {
723 let (_, payload) = h1::Payload::create(false);
724 let mut payload = PayloadBuffer::new(payload);
725
726 assert_eq!(payload.buf.len(), 0);
727 lazy(|cx| payload.poll_stream(cx)).await.unwrap();
728 assert_eq!(None, payload.read_max(1).unwrap());
729 }
730
731 #[actix_rt::test]
732 async fn test_eof() {
733 let (mut sender, payload) = h1::Payload::create(false);
734 let mut payload = PayloadBuffer::new(payload);
735
736 assert_eq!(None, payload.read_max(4).unwrap());
737 sender.feed_data(Bytes::from("data"));
738 sender.feed_eof();
739 lazy(|cx| payload.poll_stream(cx)).await.unwrap();
740
741 assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
742 assert_eq!(payload.buf.len(), 0);
743 assert!(payload.read_max(1).is_err());
744 assert!(payload.eof);
745 }
746
747 #[actix_rt::test]
748 async fn test_err() {
749 let (mut sender, payload) = h1::Payload::create(false);
750 let mut payload = PayloadBuffer::new(payload);
751 assert_eq!(None, payload.read_max(1).unwrap());
752 sender.set_error(PayloadError::Incomplete(None));
753 lazy(|cx| payload.poll_stream(cx)).await.err().unwrap();
754 }
755
756 #[actix_rt::test]
757 async fn read_max() {
758 let (mut sender, payload) = h1::Payload::create(false);
759 let mut payload = PayloadBuffer::new(payload);
760
761 sender.feed_data(Bytes::from("line1"));
762 sender.feed_data(Bytes::from("line2"));
763 lazy(|cx| payload.poll_stream(cx)).await.unwrap();
764 assert_eq!(payload.buf.len(), 10);
765
766 assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
767 assert_eq!(payload.buf.len(), 5);
768
769 assert_eq!(Some(Bytes::from("line2")), payload.read_max(5).unwrap());
770 assert_eq!(payload.buf.len(), 0);
771 }
772
773 #[actix_rt::test]
774 async fn read_exactly() {
775 let (mut sender, payload) = h1::Payload::create(false);
776 let mut payload = PayloadBuffer::new(payload);
777
778 assert_eq!(None, payload.read_exact(2));
779
780 sender.feed_data(Bytes::from("line1"));
781 sender.feed_data(Bytes::from("line2"));
782 lazy(|cx| payload.poll_stream(cx)).await.unwrap();
783
784 assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
785 assert_eq!(payload.buf.len(), 8);
786
787 assert_eq!(Some(Bytes::from_static(b"ne1l")), payload.read_exact(4));
788 assert_eq!(payload.buf.len(), 4);
789 }
790
791 #[actix_rt::test]
792 async fn read_until() {
793 let (mut sender, payload) = h1::Payload::create(false);
794 let mut payload = PayloadBuffer::new(payload);
795
796 assert_eq!(None, payload.read_until(b"ne").unwrap());
797
798 sender.feed_data(Bytes::from("line1"));
799 sender.feed_data(Bytes::from("line2"));
800 lazy(|cx| payload.poll_stream(cx)).await.unwrap();
801
802 assert_eq!(
803 Some(Bytes::from("line")),
804 payload.read_until(b"ne").unwrap()
805 );
806 assert_eq!(payload.buf.len(), 6);
807
808 assert_eq!(
809 Some(Bytes::from("1line2")),
810 payload.read_until(b"2").unwrap()
811 );
812 assert_eq!(payload.buf.len(), 0);
813 }
814
815 #[actix_rt::test]
816 async fn test_multipart_from_error() {
817 let err = Error::ContentTypeMissing;
818 let mut multipart = Multipart::from_error(err);
819 assert!(multipart.next().await.unwrap().is_err())
820 }
821
822 #[actix_rt::test]
823 async fn test_multipart_from_boundary() {
824 let (_, payload) = create_stream();
825 let (_, headers) = create_simple_request_with_header();
826 let (ct, boundary) = Multipart::find_ct_and_boundary(&headers).unwrap();
827 let _ = Multipart::from_ct_and_boundary(ct, boundary, payload);
828 }
829
830 #[actix_rt::test]
831 async fn test_multipart_payload_consumption() {
832 let (_, inner_payload) = h1::Payload::create(false);
834 let mut payload = actix_web::dev::Payload::from(inner_payload);
835 let req = TestRequest::default().to_http_request();
836
837 let mut mp = Multipart::from_request(&req, &mut payload).await.unwrap();
839 assert!(mp.next().await.unwrap().is_err());
840
841 match payload {
843 actix_web::dev::Payload::H1 { .. } => {} _ => unreachable!(),
845 }
846 }
847
848 #[actix_rt::test]
849 async fn no_content_disposition_form_data() {
850 let bytes = Bytes::from(
851 "testasdadsad\r\n\
852 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
853 Content-Type: text/plain; charset=utf-8\r\n\
854 Content-Length: 4\r\n\
855 \r\n\
856 test\r\n\
857 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
858 );
859 let mut headers = HeaderMap::new();
860 headers.insert(
861 header::CONTENT_TYPE,
862 header::HeaderValue::from_static(
863 "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
864 ),
865 );
866 let payload = stream::iter(bytes)
867 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
868 .interleave_pending();
869
870 let mut multipart = Multipart::new(&headers, payload);
871 let res = multipart.next().await.unwrap();
872 assert_matches!(
873 res.expect_err(
874 "according to RFC 7578, form-data fields require a content-disposition header"
875 ),
876 Error::ContentDispositionMissing
877 );
878 }
879
880 #[actix_rt::test]
881 async fn no_content_disposition_non_form_data() {
882 let bytes = Bytes::from(
883 "testasdadsad\r\n\
884 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
885 Content-Type: text/plain; charset=utf-8\r\n\
886 Content-Length: 4\r\n\
887 \r\n\
888 test\r\n\
889 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
890 );
891 let mut headers = HeaderMap::new();
892 headers.insert(
893 header::CONTENT_TYPE,
894 header::HeaderValue::from_static(
895 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
896 ),
897 );
898 let payload = stream::iter(bytes)
899 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
900 .interleave_pending();
901
902 let mut multipart = Multipart::new(&headers, payload);
903 let res = multipart.next().await.unwrap();
904 res.unwrap();
905 }
906
907 #[actix_rt::test]
908 async fn no_name_in_form_data_content_disposition() {
909 let bytes = Bytes::from(
910 "testasdadsad\r\n\
911 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
912 Content-Disposition: form-data; filename=\"fn.txt\"\r\n\
913 Content-Type: text/plain; charset=utf-8\r\n\
914 Content-Length: 4\r\n\
915 \r\n\
916 test\r\n\
917 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
918 );
919 let mut headers = HeaderMap::new();
920 headers.insert(
921 header::CONTENT_TYPE,
922 header::HeaderValue::from_static(
923 "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
924 ),
925 );
926 let payload = stream::iter(bytes)
927 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
928 .interleave_pending();
929
930 let mut multipart = Multipart::new(&headers, payload);
931 let res = multipart.next().await.unwrap();
932 assert_matches!(
933 res.expect_err("according to RFC 7578, form-data fields require a name attribute"),
934 Error::ContentDispositionNameMissing
935 );
936 }
937
938 #[actix_rt::test]
939 async fn test_drop_multipart_dont_hang() {
940 let (sender, payload) = create_stream();
941 let (bytes, headers) = create_simple_request_with_header();
942 sender.send(Ok(bytes)).unwrap();
943 drop(sender); let mut multipart = Multipart::new(&headers, payload);
946 let mut field = multipart.next().await.unwrap().unwrap();
947
948 drop(multipart);
949
950 match field.next().await {
952 Some(Err(Error::NotConsumed)) => {}
953 _ => panic!(),
954 };
955 }
956
957 #[actix_rt::test]
958 async fn test_drop_field_awaken_multipart() {
959 let (sender, payload) = create_stream();
960 let (bytes, headers) = create_double_request_with_header();
961 sender.send(Ok(bytes)).unwrap();
962 drop(sender); let mut multipart = Multipart::new(&headers, payload);
965 let mut field = multipart.next().await.unwrap().unwrap();
966
967 let task = rt::spawn(async move {
968 rt::time::sleep(Duration::from_millis(500)).await;
969 assert_eq!(field.next().await.unwrap().unwrap(), "test");
970 drop(field);
971 });
972
973 let _ = multipart.next().await.unwrap().unwrap();
975 task.await.unwrap();
976 }
977}