actix_web/types/
readlines.rs

1//! For request line reader documentation, see [`Readlines`].
2
3use std::{
4    borrow::Cow,
5    pin::Pin,
6    str,
7    task::{Context, Poll},
8};
9
10use bytes::{Bytes, BytesMut};
11use encoding_rs::{Encoding, UTF_8};
12use futures_core::{ready, stream::Stream};
13
14use crate::{
15    dev::Payload,
16    error::{PayloadError, ReadlinesError},
17    HttpMessage,
18};
19
20/// Stream that reads request line by line.
21pub struct Readlines<T: HttpMessage> {
22    stream: Payload<T::Stream>,
23    buf: BytesMut,
24    limit: usize,
25    checked_buff: bool,
26    encoding: &'static Encoding,
27    err: Option<ReadlinesError>,
28}
29
30impl<T> Readlines<T>
31where
32    T: HttpMessage,
33    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
34{
35    /// Create a new stream to read request line by line.
36    pub fn new(req: &mut T) -> Self {
37        let encoding = match req.encoding() {
38            Ok(enc) => enc,
39            Err(err) => return Self::err(err.into()),
40        };
41
42        Readlines {
43            stream: req.take_payload(),
44            buf: BytesMut::with_capacity(262_144),
45            limit: 262_144,
46            checked_buff: true,
47            err: None,
48            encoding,
49        }
50    }
51
52    /// Set maximum accepted payload size. The default limit is 256kB.
53    pub fn limit(mut self, limit: usize) -> Self {
54        self.limit = limit;
55        self
56    }
57
58    fn err(err: ReadlinesError) -> Self {
59        Readlines {
60            stream: Payload::None,
61            buf: BytesMut::new(),
62            limit: 262_144,
63            checked_buff: true,
64            encoding: UTF_8,
65            err: Some(err),
66        }
67    }
68}
69
70impl<T> Stream for Readlines<T>
71where
72    T: HttpMessage,
73    T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
74{
75    type Item = Result<String, ReadlinesError>;
76
77    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        let this = self.get_mut();
79
80        if let Some(err) = this.err.take() {
81            return Poll::Ready(Some(Err(err)));
82        }
83
84        // check if there is a newline in the buffer
85        if !this.checked_buff {
86            let mut found: Option<usize> = None;
87            for (ind, b) in this.buf.iter().enumerate() {
88                if *b == b'\n' {
89                    found = Some(ind);
90                    break;
91                }
92            }
93            if let Some(ind) = found {
94                // check if line is longer than limit
95                if ind + 1 > this.limit {
96                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
97                }
98                let line = if this.encoding == UTF_8 {
99                    str::from_utf8(&this.buf.split_to(ind + 1))
100                        .map_err(|_| ReadlinesError::EncodingError)?
101                        .to_owned()
102                } else {
103                    this.encoding
104                        .decode_without_bom_handling_and_without_replacement(
105                            &this.buf.split_to(ind + 1),
106                        )
107                        .map(Cow::into_owned)
108                        .ok_or(ReadlinesError::EncodingError)?
109                };
110                return Poll::Ready(Some(Ok(line)));
111            }
112            this.checked_buff = true;
113        }
114
115        // poll req for more bytes
116        match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
117            Some(Ok(mut bytes)) => {
118                // check if there is a newline in bytes
119                let mut found: Option<usize> = None;
120                for (ind, b) in bytes.iter().enumerate() {
121                    if *b == b'\n' {
122                        found = Some(ind);
123                        break;
124                    }
125                }
126                if let Some(ind) = found {
127                    // check if line is longer than limit
128                    if ind + 1 > this.limit {
129                        return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
130                    }
131                    let line = if this.encoding == UTF_8 {
132                        str::from_utf8(&bytes.split_to(ind + 1))
133                            .map_err(|_| ReadlinesError::EncodingError)?
134                            .to_owned()
135                    } else {
136                        this.encoding
137                            .decode_without_bom_handling_and_without_replacement(
138                                &bytes.split_to(ind + 1),
139                            )
140                            .map(Cow::into_owned)
141                            .ok_or(ReadlinesError::EncodingError)?
142                    };
143                    // extend buffer with rest of the bytes;
144                    this.buf.extend_from_slice(&bytes);
145                    this.checked_buff = false;
146                    return Poll::Ready(Some(Ok(line)));
147                }
148                this.buf.extend_from_slice(&bytes);
149                Poll::Pending
150            }
151
152            None => {
153                if this.buf.is_empty() {
154                    return Poll::Ready(None);
155                }
156                if this.buf.len() > this.limit {
157                    return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
158                }
159                let line = if this.encoding == UTF_8 {
160                    str::from_utf8(&this.buf)
161                        .map_err(|_| ReadlinesError::EncodingError)?
162                        .to_owned()
163                } else {
164                    this.encoding
165                        .decode_without_bom_handling_and_without_replacement(&this.buf)
166                        .map(Cow::into_owned)
167                        .ok_or(ReadlinesError::EncodingError)?
168                };
169                this.buf.clear();
170                Poll::Ready(Some(Ok(line)))
171            }
172
173            Some(Err(err)) => Poll::Ready(Some(Err(ReadlinesError::from(err)))),
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use futures_util::StreamExt as _;
181
182    use super::*;
183    use crate::test::TestRequest;
184
185    #[actix_rt::test]
186    async fn test_readlines() {
187        let mut req = TestRequest::default()
188            .set_payload(Bytes::from_static(
189                b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
190                  industry. Lorem Ipsum has been the industry's standard dummy\n\
191                  Contrary to popular belief, Lorem Ipsum is not simply random text.",
192            ))
193            .to_request();
194
195        let mut stream = Readlines::new(&mut req);
196        assert_eq!(
197            stream.next().await.unwrap().unwrap(),
198            "Lorem Ipsum is simply dummy text of the printing and typesetting\n"
199        );
200
201        assert_eq!(
202            stream.next().await.unwrap().unwrap(),
203            "industry. Lorem Ipsum has been the industry's standard dummy\n"
204        );
205
206        assert_eq!(
207            stream.next().await.unwrap().unwrap(),
208            "Contrary to popular belief, Lorem Ipsum is not simply random text."
209        );
210    }
211}