broker_tokio/io/util/async_buf_read_ext.rs
1use crate::io::util::lines::{lines, Lines};
2use crate::io::util::read_line::{read_line, ReadLine};
3use crate::io::util::read_until::{read_until, ReadUntil};
4use crate::io::util::split::{split, Split};
5use crate::io::AsyncBufRead;
6
7cfg_io_util! {
8 /// An extension trait which adds utility methods to [`AsyncBufRead`] types.
9 ///
10 /// [`AsyncBufRead`]: crate::io::AsyncBufRead
11 pub trait AsyncBufReadExt: AsyncBufRead {
12 /// Read all bytes into `buf` until the delimiter `byte` or EOF is reached.
13 ///
14 /// Equivalent to:
15 ///
16 /// ```ignore
17 /// async fn read_until(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>;
18 /// ```
19 ///
20 /// This function will read bytes from the underlying stream until the
21 /// delimiter or EOF is found. Once found, all bytes up to, and including,
22 /// the delimiter (if found) will be appended to `buf`.
23 ///
24 /// If successful, this function will return the total number of bytes read.
25 ///
26 /// # Errors
27 ///
28 /// This function will ignore all instances of [`ErrorKind::Interrupted`] and
29 /// will otherwise return any errors returned by [`fill_buf`].
30 ///
31 /// If an I/O error is encountered then all bytes read so far will be
32 /// present in `buf` and its length will have been adjusted appropriately.
33 ///
34 /// [`fill_buf`]: AsyncBufRead::poll_fill_buf
35 /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
36 ///
37 /// # Examples
38 ///
39 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
40 /// this example, we use [`Cursor`] to read all the bytes in a byte slice
41 /// in hyphen delimited segments:
42 ///
43 /// [`Cursor`]: std::io::Cursor
44 ///
45 /// ```
46 /// use tokio::io::AsyncBufReadExt;
47 ///
48 /// use std::io::Cursor;
49 ///
50 /// #[tokio::main]
51 /// async fn main() {
52 /// let mut cursor = Cursor::new(b"lorem-ipsum");
53 /// let mut buf = vec![];
54 ///
55 /// // cursor is at 'l'
56 /// let num_bytes = cursor.read_until(b'-', &mut buf)
57 /// .await
58 /// .expect("reading from cursor won't fail");
59 ///
60 /// assert_eq!(num_bytes, 6);
61 /// assert_eq!(buf, b"lorem-");
62 /// buf.clear();
63 ///
64 /// // cursor is at 'i'
65 /// let num_bytes = cursor.read_until(b'-', &mut buf)
66 /// .await
67 /// .expect("reading from cursor won't fail");
68 ///
69 /// assert_eq!(num_bytes, 5);
70 /// assert_eq!(buf, b"ipsum");
71 /// buf.clear();
72 ///
73 /// // cursor is at EOF
74 /// let num_bytes = cursor.read_until(b'-', &mut buf)
75 /// .await
76 /// .expect("reading from cursor won't fail");
77 /// assert_eq!(num_bytes, 0);
78 /// assert_eq!(buf, b"");
79 /// }
80 /// ```
81 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
82 where
83 Self: Unpin,
84 {
85 read_until(self, byte, buf)
86 }
87
88 /// Read all bytes until a newline (the 0xA byte) is reached, and append
89 /// them to the provided buffer.
90 ///
91 /// Equivalent to:
92 ///
93 /// ```ignore
94 /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>;
95 /// ```
96 ///
97 /// This function will read bytes from the underlying stream until the
98 /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
99 /// up to, and including, the delimiter (if found) will be appended to
100 /// `buf`.
101 ///
102 /// If successful, this function will return the total number of bytes read.
103 ///
104 /// If this function returns `Ok(0)`, the stream has reached EOF.
105 ///
106 /// # Errors
107 ///
108 /// This function has the same error semantics as [`read_until`] and will
109 /// also return an error if the read bytes are not valid UTF-8. If an I/O
110 /// error is encountered then `buf` may contain some bytes already read in
111 /// the event that all data read so far was valid UTF-8.
112 ///
113 /// [`read_until`]: AsyncBufReadExt::read_until
114 ///
115 /// # Examples
116 ///
117 /// [`std::io::Cursor`][`Cursor`] is a type that implements
118 /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the
119 /// lines in a byte slice:
120 ///
121 /// [`Cursor`]: std::io::Cursor
122 ///
123 /// ```
124 /// use tokio::io::AsyncBufReadExt;
125 ///
126 /// use std::io::Cursor;
127 ///
128 /// #[tokio::main]
129 /// async fn main() {
130 /// let mut cursor = Cursor::new(b"foo\nbar");
131 /// let mut buf = String::new();
132 ///
133 /// // cursor is at 'f'
134 /// let num_bytes = cursor.read_line(&mut buf)
135 /// .await
136 /// .expect("reading from cursor won't fail");
137 ///
138 /// assert_eq!(num_bytes, 4);
139 /// assert_eq!(buf, "foo\n");
140 /// buf.clear();
141 ///
142 /// // cursor is at 'b'
143 /// let num_bytes = cursor.read_line(&mut buf)
144 /// .await
145 /// .expect("reading from cursor won't fail");
146 ///
147 /// assert_eq!(num_bytes, 3);
148 /// assert_eq!(buf, "bar");
149 /// buf.clear();
150 ///
151 /// // cursor is at EOF
152 /// let num_bytes = cursor.read_line(&mut buf)
153 /// .await
154 /// .expect("reading from cursor won't fail");
155 ///
156 /// assert_eq!(num_bytes, 0);
157 /// assert_eq!(buf, "");
158 /// }
159 /// ```
160 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
161 where
162 Self: Unpin,
163 {
164 read_line(self, buf)
165 }
166
167 /// Returns a stream of the contents of this reader split on the byte
168 /// `byte`.
169 ///
170 /// This method is the asynchronous equivalent to
171 /// [`BufRead::split`](std::io::BufRead::split).
172 ///
173 /// The stream returned from this function will yield instances of
174 /// [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have
175 /// the delimiter byte at the end.
176 ///
177 /// [`io::Result`]: std::io::Result
178 /// [`Vec<u8>`]: std::vec::Vec
179 ///
180 /// # Errors
181 ///
182 /// Each item of the stream has the same error semantics as
183 /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
184 ///
185 /// # Examples
186 ///
187 /// ```
188 /// # use tokio::io::AsyncBufRead;
189 /// use tokio::io::AsyncBufReadExt;
190 ///
191 /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
192 /// let mut segments = my_buf_read.split(b'f');
193 ///
194 /// while let Some(segment) = segments.next_segment().await? {
195 /// println!("length = {}", segment.len())
196 /// }
197 /// # Ok(())
198 /// # }
199 /// ```
200 fn split(self, byte: u8) -> Split<Self>
201 where
202 Self: Sized + Unpin,
203 {
204 split(self, byte)
205 }
206
207 /// Returns a stream over the lines of this reader.
208 /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
209 ///
210 /// The stream returned from this function will yield instances of
211 /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
212 /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
213 ///
214 /// [`io::Result`]: std::io::Result
215 /// [`String`]: String
216 ///
217 /// # Errors
218 ///
219 /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
220 ///
221 /// # Examples
222 ///
223 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
224 /// this example, we use [`Cursor`] to iterate over all the lines in a byte
225 /// slice.
226 ///
227 /// [`Cursor`]: std::io::Cursor
228 ///
229 /// ```
230 /// use tokio::io::AsyncBufReadExt;
231 /// use tokio::stream::StreamExt;
232 ///
233 /// use std::io::Cursor;
234 ///
235 /// #[tokio::main]
236 /// async fn main() {
237 /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
238 ///
239 /// let mut lines = cursor.lines().map(|res| res.unwrap());
240 ///
241 /// assert_eq!(lines.next().await, Some(String::from("lorem")));
242 /// assert_eq!(lines.next().await, Some(String::from("ipsum")));
243 /// assert_eq!(lines.next().await, Some(String::from("dolor")));
244 /// assert_eq!(lines.next().await, None);
245 /// }
246 /// ```
247 ///
248 /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
249 fn lines(self) -> Lines<Self>
250 where
251 Self: Sized,
252 {
253 lines(self)
254 }
255 }
256}
257
258impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}