actix_web/types/
readlines.rs1use std::{
4 borrow::Cow,
5 pin::Pin,
6 str,
7 task::{Context, Poll},
8};
9
10use bytes::{Bytes, BytesMut};
11use encoding_rs::{Encoding, UTF_8};
12use futures_core::{ready, stream::Stream};
13
14use crate::{
15 dev::Payload,
16 error::{PayloadError, ReadlinesError},
17 HttpMessage,
18};
19
20pub struct Readlines<T: HttpMessage> {
22 stream: Payload<T::Stream>,
23 buf: BytesMut,
24 limit: usize,
25 checked_buff: bool,
26 encoding: &'static Encoding,
27 err: Option<ReadlinesError>,
28}
29
30impl<T> Readlines<T>
31where
32 T: HttpMessage,
33 T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
34{
35 pub fn new(req: &mut T) -> Self {
37 let encoding = match req.encoding() {
38 Ok(enc) => enc,
39 Err(err) => return Self::err(err.into()),
40 };
41
42 Readlines {
43 stream: req.take_payload(),
44 buf: BytesMut::with_capacity(262_144),
45 limit: 262_144,
46 checked_buff: true,
47 err: None,
48 encoding,
49 }
50 }
51
52 pub fn limit(mut self, limit: usize) -> Self {
54 self.limit = limit;
55 self
56 }
57
58 fn err(err: ReadlinesError) -> Self {
59 Readlines {
60 stream: Payload::None,
61 buf: BytesMut::new(),
62 limit: 262_144,
63 checked_buff: true,
64 encoding: UTF_8,
65 err: Some(err),
66 }
67 }
68}
69
70impl<T> Stream for Readlines<T>
71where
72 T: HttpMessage,
73 T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
74{
75 type Item = Result<String, ReadlinesError>;
76
77 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 let this = self.get_mut();
79
80 if let Some(err) = this.err.take() {
81 return Poll::Ready(Some(Err(err)));
82 }
83
84 if !this.checked_buff {
86 let mut found: Option<usize> = None;
87 for (ind, b) in this.buf.iter().enumerate() {
88 if *b == b'\n' {
89 found = Some(ind);
90 break;
91 }
92 }
93 if let Some(ind) = found {
94 if ind + 1 > this.limit {
96 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
97 }
98 let line = if this.encoding == UTF_8 {
99 str::from_utf8(&this.buf.split_to(ind + 1))
100 .map_err(|_| ReadlinesError::EncodingError)?
101 .to_owned()
102 } else {
103 this.encoding
104 .decode_without_bom_handling_and_without_replacement(
105 &this.buf.split_to(ind + 1),
106 )
107 .map(Cow::into_owned)
108 .ok_or(ReadlinesError::EncodingError)?
109 };
110 return Poll::Ready(Some(Ok(line)));
111 }
112 this.checked_buff = true;
113 }
114
115 match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
117 Some(Ok(mut bytes)) => {
118 let mut found: Option<usize> = None;
120 for (ind, b) in bytes.iter().enumerate() {
121 if *b == b'\n' {
122 found = Some(ind);
123 break;
124 }
125 }
126 if let Some(ind) = found {
127 if ind + 1 > this.limit {
129 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
130 }
131 let line = if this.encoding == UTF_8 {
132 str::from_utf8(&bytes.split_to(ind + 1))
133 .map_err(|_| ReadlinesError::EncodingError)?
134 .to_owned()
135 } else {
136 this.encoding
137 .decode_without_bom_handling_and_without_replacement(
138 &bytes.split_to(ind + 1),
139 )
140 .map(Cow::into_owned)
141 .ok_or(ReadlinesError::EncodingError)?
142 };
143 this.buf.extend_from_slice(&bytes);
145 this.checked_buff = false;
146 return Poll::Ready(Some(Ok(line)));
147 }
148 this.buf.extend_from_slice(&bytes);
149 Poll::Pending
150 }
151
152 None => {
153 if this.buf.is_empty() {
154 return Poll::Ready(None);
155 }
156 if this.buf.len() > this.limit {
157 return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
158 }
159 let line = if this.encoding == UTF_8 {
160 str::from_utf8(&this.buf)
161 .map_err(|_| ReadlinesError::EncodingError)?
162 .to_owned()
163 } else {
164 this.encoding
165 .decode_without_bom_handling_and_without_replacement(&this.buf)
166 .map(Cow::into_owned)
167 .ok_or(ReadlinesError::EncodingError)?
168 };
169 this.buf.clear();
170 Poll::Ready(Some(Ok(line)))
171 }
172
173 Some(Err(err)) => Poll::Ready(Some(Err(ReadlinesError::from(err)))),
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use futures_util::StreamExt as _;
181
182 use super::*;
183 use crate::test::TestRequest;
184
185 #[actix_rt::test]
186 async fn test_readlines() {
187 let mut req = TestRequest::default()
188 .set_payload(Bytes::from_static(
189 b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
190 industry. Lorem Ipsum has been the industry's standard dummy\n\
191 Contrary to popular belief, Lorem Ipsum is not simply random text.",
192 ))
193 .to_request();
194
195 let mut stream = Readlines::new(&mut req);
196 assert_eq!(
197 stream.next().await.unwrap().unwrap(),
198 "Lorem Ipsum is simply dummy text of the printing and typesetting\n"
199 );
200
201 assert_eq!(
202 stream.next().await.unwrap().unwrap(),
203 "industry. Lorem Ipsum has been the industry's standard dummy\n"
204 );
205
206 assert_eq!(
207 stream.next().await.unwrap().unwrap(),
208 "Contrary to popular belief, Lorem Ipsum is not simply random text."
209 );
210 }
211}