1use std::{
2 cell::RefCell,
3 cmp, fmt,
4 future::poll_fn,
5 mem,
6 pin::Pin,
7 rc::Rc,
8 task::{ready, Context, Poll},
9};
10
11use actix_web::{
12 error::PayloadError,
13 http::header::{self, ContentDisposition, HeaderMap},
14 web::{Bytes, BytesMut},
15};
16use derive_more::{Display, Error};
17use futures_core::Stream;
18use mime::Mime;
19
20use crate::{
21 error::Error,
22 payload::{PayloadBuffer, PayloadRef},
23 safety::Safety,
24};
25
26#[derive(Debug, Display, Error)]
28#[display(fmt = "size limit exceeded while collecting field data")]
29#[non_exhaustive]
30pub struct LimitExceeded;
31
32pub struct Field {
34 content_type: Option<Mime>,
36
37 content_disposition: Option<ContentDisposition>,
39
40 pub(crate) form_field_name: String,
47
48 headers: HeaderMap,
50
51 safety: Safety,
52 inner: Rc<RefCell<InnerField>>,
53}
54
55impl Field {
56 pub(crate) fn new(
57 content_type: Option<Mime>,
58 content_disposition: Option<ContentDisposition>,
59 form_field_name: Option<String>,
60 headers: HeaderMap,
61 safety: Safety,
62 inner: Rc<RefCell<InnerField>>,
63 ) -> Self {
64 Field {
65 content_type,
66 content_disposition,
67 form_field_name: form_field_name.unwrap_or_default(),
68 headers,
69 inner,
70 safety,
71 }
72 }
73
74 pub fn headers(&self) -> &HeaderMap {
76 &self.headers
77 }
78
79 pub fn content_type(&self) -> Option<&Mime> {
85 self.content_type.as_ref()
86 }
87
88 pub fn content_disposition(&self) -> Option<&ContentDisposition> {
105 self.content_disposition.as_ref()
106 }
107
108 pub fn name(&self) -> Option<&str> {
113 self.content_disposition()?.get_name()
114 }
115
116 pub async fn bytes(&mut self, limit: usize) -> Result<Result<Bytes, Error>, LimitExceeded> {
127 const INITIAL_ALLOC_BYTES: usize = 2 * 1024;
129
130 let mut exceeded_limit = false;
131 let mut buf = BytesMut::with_capacity(INITIAL_ALLOC_BYTES);
132
133 let mut field = Pin::new(self);
134
135 match poll_fn(|cx| loop {
136 match ready!(field.as_mut().poll_next(cx)) {
137 Some(Ok(_chunk)) if exceeded_limit => {}
139
140 Some(Ok(chunk)) if buf.len() + chunk.len() > limit => {
142 exceeded_limit = true;
143 let _ = mem::take(&mut buf);
145 }
146
147 Some(Ok(chunk)) => buf.extend_from_slice(&chunk),
148
149 None => return Poll::Ready(Ok(())),
150 Some(Err(err)) => return Poll::Ready(Err(err)),
151 }
152 })
153 .await
154 {
155 Err(err) => Ok(Err(err)),
157
158 Ok(()) if exceeded_limit => Err(LimitExceeded),
160
161 Ok(()) => Ok(Ok(buf.freeze())),
163 }
164 }
165}
166
167impl Stream for Field {
168 type Item = Result<Bytes, Error>;
169
170 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171 let this = self.get_mut();
172 let mut inner = this.inner.borrow_mut();
173
174 if let Some(mut buffer) = inner
175 .payload
176 .as_ref()
177 .expect("Field should not be polled after completion")
178 .get_mut(&this.safety)
179 {
180 buffer.poll_stream(cx)?;
182 } else if !this.safety.is_clean() {
183 return Poll::Ready(Some(Err(Error::NotConsumed)));
185 } else {
186 return Poll::Pending;
187 }
188
189 inner.poll(&this.safety)
190 }
191}
192
193impl fmt::Debug for Field {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 if let Some(ct) = &self.content_type {
196 writeln!(f, "\nField: {}", ct)?;
197 } else {
198 writeln!(f, "\nField:")?;
199 }
200 writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
201 writeln!(f, " headers:")?;
202 for (key, val) in self.headers.iter() {
203 writeln!(f, " {:?}: {:?}", key, val)?;
204 }
205 Ok(())
206 }
207}
208
209pub(crate) struct InnerField {
210 payload: Option<PayloadRef>,
212 boundary: String,
213 eof: bool,
214 length: Option<u64>,
215}
216
217impl InnerField {
218 pub(crate) fn new_in_rc(
219 payload: PayloadRef,
220 boundary: String,
221 headers: &HeaderMap,
222 ) -> Result<Rc<RefCell<InnerField>>, PayloadError> {
223 Self::new(payload, boundary, headers).map(|this| Rc::new(RefCell::new(this)))
224 }
225
226 pub(crate) fn new(
227 payload: PayloadRef,
228 boundary: String,
229 headers: &HeaderMap,
230 ) -> Result<InnerField, PayloadError> {
231 let len = if let Some(len) = headers.get(&header::CONTENT_LENGTH) {
232 match len.to_str().ok().and_then(|len| len.parse::<u64>().ok()) {
233 Some(len) => Some(len),
234 None => return Err(PayloadError::Incomplete(None)),
235 }
236 } else {
237 None
238 };
239
240 Ok(InnerField {
241 boundary,
242 payload: Some(payload),
243 eof: false,
244 length: len,
245 })
246 }
247
248 pub(crate) fn read_len(
252 payload: &mut PayloadBuffer,
253 size: &mut u64,
254 ) -> Poll<Option<Result<Bytes, Error>>> {
255 if *size == 0 {
256 Poll::Ready(None)
257 } else {
258 match payload.read_max(*size)? {
259 Some(mut chunk) => {
260 let len = cmp::min(chunk.len() as u64, *size);
261 *size -= len;
262 let ch = chunk.split_to(len as usize);
263 if !chunk.is_empty() {
264 payload.unprocessed(chunk);
265 }
266 Poll::Ready(Some(Ok(ch)))
267 }
268 None => {
269 if payload.eof && (*size != 0) {
270 Poll::Ready(Some(Err(Error::Incomplete)))
271 } else {
272 Poll::Pending
273 }
274 }
275 }
276 }
277 }
278
279 pub(crate) fn read_stream(
283 payload: &mut PayloadBuffer,
284 boundary: &str,
285 ) -> Poll<Option<Result<Bytes, Error>>> {
286 let mut pos = 0;
287
288 let len = payload.buf.len();
289 if len == 0 {
290 return if payload.eof {
291 Poll::Ready(Some(Err(Error::Incomplete)))
292 } else {
293 Poll::Pending
294 };
295 }
296
297 if len > 4 && payload.buf[0] == b'\r' {
299 let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" {
300 Some(4)
301 } else if &payload.buf[1..3] == b"--" {
302 Some(3)
303 } else {
304 None
305 };
306
307 if let Some(b_len) = b_len {
308 let b_size = boundary.len() + b_len;
309 if len < b_size {
310 return Poll::Pending;
311 } else if &payload.buf[b_len..b_size] == boundary.as_bytes() {
312 return Poll::Ready(None);
314 }
315 }
316 }
317
318 loop {
319 return if let Some(idx) = memchr::memmem::find(&payload.buf[pos..], b"\r") {
320 let cur = pos + idx;
321
322 if cur + 4 > len {
324 if cur > 0 {
325 Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
326 } else {
327 Poll::Pending
328 }
329 } else {
330 if (&payload.buf[cur..cur + 2] == b"\r\n"
332 && &payload.buf[cur + 2..cur + 4] == b"--")
333 || (&payload.buf[cur..=cur] == b"\r"
334 && &payload.buf[cur + 1..cur + 3] == b"--")
335 {
336 if cur != 0 {
337 Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
339 } else {
340 pos = cur + 1;
341 continue;
342 }
343 } else {
344 pos = cur + 1;
346 continue;
347 }
348 }
349 } else {
350 Poll::Ready(Some(Ok(payload.buf.split().freeze())))
351 };
352 }
353 }
354
355 pub(crate) fn poll(&mut self, safety: &Safety) -> Poll<Option<Result<Bytes, Error>>> {
356 if self.payload.is_none() {
357 return Poll::Ready(None);
358 }
359
360 let result = if let Some(mut payload) = self
361 .payload
362 .as_ref()
363 .expect("Field should not be polled after completion")
364 .get_mut(safety)
365 {
366 if !self.eof {
367 let res = if let Some(ref mut len) = self.length {
368 InnerField::read_len(&mut payload, len)
369 } else {
370 InnerField::read_stream(&mut payload, &self.boundary)
371 };
372
373 match res {
374 Poll::Pending => return Poll::Pending,
375 Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))),
376 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
377 Poll::Ready(None) => self.eof = true,
378 }
379 }
380
381 match payload.readline() {
382 Ok(None) => Poll::Pending,
383 Ok(Some(line)) => {
384 if line.as_ref() != b"\r\n" {
385 log::warn!("multipart field did not read all the data or it is malformed");
386 }
387 Poll::Ready(None)
388 }
389 Err(err) => Poll::Ready(Some(Err(err))),
390 }
391 } else {
392 Poll::Pending
393 };
394
395 if let Poll::Ready(None) = result {
396 let _ = self.payload.take();
398 }
399
400 result
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use futures_util::{stream, StreamExt as _};
407
408 use super::*;
409 use crate::Multipart;
410
411 fn create_double_request_with_header() -> (Bytes, HeaderMap) {
413 let bytes = Bytes::from(
414 "testasdadsad\r\n\
415 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
416 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
417 Content-Type: text/plain; charset=utf-8\r\n\
418 \r\n\
419 one+one+one\r\n\
420 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
421 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
422 Content-Type: text/plain; charset=utf-8\r\n\
423 \r\n\
424 two+two+two\r\n\
425 --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
426 );
427 let mut headers = HeaderMap::new();
428 headers.insert(
429 header::CONTENT_TYPE,
430 header::HeaderValue::from_static(
431 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
432 ),
433 );
434 (bytes, headers)
435 }
436
437 #[actix_rt::test]
438 async fn bytes_unlimited() {
439 let (body, headers) = create_double_request_with_header();
440
441 let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
442
443 let field = multipart
444 .next()
445 .await
446 .expect("multipart should have two fields")
447 .expect("multipart body should be well formatted")
448 .bytes(usize::MAX)
449 .await
450 .expect("field data should not be size limited")
451 .expect("reading field data should not error");
452 assert_eq!(field, "one+one+one");
453
454 let field = multipart
455 .next()
456 .await
457 .expect("multipart should have two fields")
458 .expect("multipart body should be well formatted")
459 .bytes(usize::MAX)
460 .await
461 .expect("field data should not be size limited")
462 .expect("reading field data should not error");
463 assert_eq!(field, "two+two+two");
464 }
465
466 #[actix_rt::test]
467 async fn bytes_limited() {
468 let (body, headers) = create_double_request_with_header();
469
470 let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
471
472 multipart
473 .next()
474 .await
475 .expect("multipart should have two fields")
476 .expect("multipart body should be well formatted")
477 .bytes(8) .await
479 .expect_err("field data should be size limited");
480
481 let field = multipart
483 .next()
484 .await
485 .expect("multipart should have two fields")
486 .expect("multipart body should be well formatted")
487 .bytes(usize::MAX)
488 .await
489 .expect("field data should not be size limited")
490 .expect("reading field data should not error");
491 assert_eq!(field, "two+two+two");
492 }
493}