quick_xml/reader/async_tokio.rs
1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, Result, SyntaxError};
11use crate::events::Event;
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
16use crate::utils::is_whitespace;
17
18/// A struct for read XML asynchronously from an [`AsyncBufRead`].
19///
20/// Having own struct allows us to implement anything without risk of name conflicts
21/// and does not suffer from the impossibility of having `async` in traits.
22struct TokioAdapter<'a, R>(&'a mut R);
23
24impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
25 impl_buffered_source!('b, 0, async, await);
26}
27
28////////////////////////////////////////////////////////////////////////////////////////////////////
29
30impl<'r, R> AsyncRead for BinaryStream<'r, R>
31where
32 R: AsyncRead + Unpin,
33{
34 fn poll_read(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 buf: &mut ReadBuf<'_>,
38 ) -> Poll<io::Result<()>> {
39 let start = buf.remaining();
40 let this = self.get_mut();
41 let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
42
43 // If something was read, update offset
44 if let Poll::Ready(Ok(_)) = poll {
45 let amt = start - buf.remaining();
46 *this.offset += amt as u64;
47 }
48 poll
49 }
50}
51
52impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
53where
54 R: AsyncBufRead + Unpin,
55{
56 #[inline]
57 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
58 Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
59 }
60
61 #[inline]
62 fn consume(self: Pin<&mut Self>, amt: usize) {
63 let this = self.get_mut();
64 this.inner.consume(amt);
65 *this.offset += amt as u64;
66 }
67}
68
69////////////////////////////////////////////////////////////////////////////////////////////////////
70
71impl<R: AsyncBufRead + Unpin> Reader<R> {
72 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
73 /// given buffer.
74 ///
75 /// This is the main entry point for reading XML `Event`s when using an async reader.
76 ///
77 /// See the documentation of [`read_event_into()`] for more information.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// # tokio_test::block_on(async {
83 /// # use pretty_assertions::assert_eq;
84 /// use quick_xml::events::Event;
85 /// use quick_xml::reader::Reader;
86 ///
87 /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
88 /// // reader instead of relying on the zero-copy optimizations for reading
89 /// // from byte slices, which provides the sync interface anyway.
90 /// let mut reader = Reader::from_reader(r#"
91 /// <tag1 att1 = "test">
92 /// <tag2><!--Test comment-->Test</tag2>
93 /// <tag2>Test 2</tag2>
94 /// </tag1>
95 /// "#.as_bytes());
96 /// reader.config_mut().trim_text(true);
97 ///
98 /// let mut count = 0;
99 /// let mut buf = Vec::new();
100 /// let mut txt = Vec::new();
101 /// loop {
102 /// match reader.read_event_into_async(&mut buf).await {
103 /// Ok(Event::Start(_)) => count += 1,
104 /// Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()),
105 /// Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
106 /// Ok(Event::Eof) => break,
107 /// _ => (),
108 /// }
109 /// buf.clear();
110 /// }
111 /// assert_eq!(count, 3);
112 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
113 /// # }) // tokio_test::block_on
114 /// ```
115 ///
116 /// [`read_event_into()`]: Reader::read_event_into
117 pub async fn read_event_into_async<'b>(
118 &mut self,
119 mut buf: &'b mut Vec<u8>,
120 ) -> Result<Event<'b>> {
121 read_event_impl!(
122 self,
123 buf,
124 TokioAdapter(&mut self.reader),
125 read_until_close_async,
126 await
127 )
128 }
129
130 /// An asynchronous version of [`read_to_end_into()`].
131 /// Reads asynchronously until end element is found using provided buffer as
132 /// intermediate storage for events content. This function is supposed to be
133 /// called after you already read a [`Start`] event.
134 ///
135 /// See the documentation of [`read_to_end_into()`] for more information.
136 ///
137 /// # Examples
138 ///
139 /// This example shows, how you can skip XML content after you read the
140 /// start event.
141 ///
142 /// ```
143 /// # tokio_test::block_on(async {
144 /// # use pretty_assertions::assert_eq;
145 /// use quick_xml::events::{BytesStart, Event};
146 /// use quick_xml::reader::Reader;
147 ///
148 /// let mut reader = Reader::from_reader(r#"
149 /// <outer>
150 /// <inner>
151 /// <inner></inner>
152 /// <inner/>
153 /// <outer></outer>
154 /// <outer/>
155 /// </inner>
156 /// </outer>
157 /// "#.as_bytes());
158 /// reader.config_mut().trim_text(true);
159 /// let mut buf = Vec::new();
160 ///
161 /// let start = BytesStart::new("outer");
162 /// let end = start.to_end().into_owned();
163 ///
164 /// // First, we read a start event...
165 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
166 ///
167 /// // ...then, we could skip all events to the corresponding end event.
168 /// // This call will correctly handle nested <outer> elements.
169 /// // Note, however, that this method does not handle namespaces.
170 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
171 ///
172 /// // At the end we should get an Eof event, because we ate the whole XML
173 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
174 /// # }) // tokio_test::block_on
175 /// ```
176 ///
177 /// [`read_to_end_into()`]: Self::read_to_end_into
178 /// [`Start`]: Event::Start
179 pub async fn read_to_end_into_async<'n>(
180 &mut self,
181 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
182 end: QName<'n>,
183 buf: &mut Vec<u8>,
184 ) -> Result<Span> {
185 Ok(read_to_end!(
186 self,
187 end,
188 buf,
189 read_event_into_async,
190 {
191 buf.clear();
192 },
193 await
194 ))
195 }
196
197 /// Private function to read until `>` is found. This function expects that
198 /// it was called just after encounter a `<` symbol.
199 async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
200 read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
201 }
202}
203
204////////////////////////////////////////////////////////////////////////////////////////////////////
205
206impl<R: AsyncBufRead + Unpin> NsReader<R> {
207 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
208 /// given buffer.
209 ///
210 /// This method manages namespaces but doesn't resolve them automatically.
211 /// You should call [`resolve_element()`] if you want to get a namespace.
212 ///
213 /// You also can use [`read_resolved_event_into_async()`] instead if you want
214 /// to resolve namespace as soon as you get an event.
215 ///
216 /// # Examples
217 ///
218 /// ```
219 /// # tokio_test::block_on(async {
220 /// # use pretty_assertions::assert_eq;
221 /// use quick_xml::events::Event;
222 /// use quick_xml::name::{Namespace, ResolveResult::*};
223 /// use quick_xml::reader::NsReader;
224 ///
225 /// let mut reader = NsReader::from_reader(r#"
226 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
227 /// <y:tag2><!--Test comment-->Test</y:tag2>
228 /// <y:tag2>Test 2</y:tag2>
229 /// </x:tag1>
230 /// "#.as_bytes());
231 /// reader.config_mut().trim_text(true);
232 ///
233 /// let mut count = 0;
234 /// let mut buf = Vec::new();
235 /// let mut txt = Vec::new();
236 /// loop {
237 /// match reader.read_event_into_async(&mut buf).await.unwrap() {
238 /// Event::Start(e) => {
239 /// count += 1;
240 /// let (ns, local) = reader.resolve_element(e.name());
241 /// match local.as_ref() {
242 /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
243 /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
244 /// _ => unreachable!(),
245 /// }
246 /// }
247 /// Event::Text(e) => {
248 /// txt.push(e.unescape().unwrap().into_owned())
249 /// }
250 /// Event::Eof => break,
251 /// _ => (),
252 /// }
253 /// buf.clear();
254 /// }
255 /// assert_eq!(count, 3);
256 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
257 /// # }) // tokio_test::block_on
258 /// ```
259 ///
260 /// [`read_event_into()`]: NsReader::read_event_into
261 /// [`resolve_element()`]: Self::resolve_element
262 /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
263 pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
264 self.pop();
265 let event = self.reader.read_event_into_async(buf).await;
266 self.process_event(event)
267 }
268
269 /// An asynchronous version of [`read_to_end_into()`].
270 /// Reads asynchronously until end element is found using provided buffer as
271 /// intermediate storage for events content. This function is supposed to be
272 /// called after you already read a [`Start`] event.
273 ///
274 /// See the documentation of [`read_to_end_into()`] for more information.
275 ///
276 /// # Examples
277 ///
278 /// This example shows, how you can skip XML content after you read the
279 /// start event.
280 ///
281 /// ```
282 /// # tokio_test::block_on(async {
283 /// # use pretty_assertions::assert_eq;
284 /// use quick_xml::name::{Namespace, ResolveResult};
285 /// use quick_xml::events::{BytesStart, Event};
286 /// use quick_xml::reader::NsReader;
287 ///
288 /// let mut reader = NsReader::from_reader(r#"
289 /// <outer xmlns="namespace 1">
290 /// <inner xmlns="namespace 2">
291 /// <outer></outer>
292 /// </inner>
293 /// <inner>
294 /// <inner></inner>
295 /// <inner/>
296 /// <outer></outer>
297 /// <p:outer xmlns:p="ns"></p:outer>
298 /// <outer/>
299 /// </inner>
300 /// </outer>
301 /// "#.as_bytes());
302 /// reader.config_mut().trim_text(true);
303 /// let mut buf = Vec::new();
304 ///
305 /// let ns = Namespace(b"namespace 1");
306 /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
307 /// let end = start.to_end().into_owned();
308 ///
309 /// // First, we read a start event...
310 /// assert_eq!(
311 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
312 /// (ResolveResult::Bound(ns), Event::Start(start))
313 /// );
314 ///
315 /// // ...then, we could skip all events to the corresponding end event.
316 /// // This call will correctly handle nested <outer> elements.
317 /// // Note, however, that this method does not handle namespaces.
318 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
319 ///
320 /// // At the end we should get an Eof event, because we ate the whole XML
321 /// assert_eq!(
322 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
323 /// (ResolveResult::Unbound, Event::Eof)
324 /// );
325 /// # }) // tokio_test::block_on
326 /// ```
327 ///
328 /// [`read_to_end_into()`]: Self::read_to_end_into
329 /// [`Start`]: Event::Start
330 pub async fn read_to_end_into_async<'n>(
331 &mut self,
332 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
333 end: QName<'n>,
334 buf: &mut Vec<u8>,
335 ) -> Result<Span> {
336 // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
337 // match literally the start name. See `Config::check_end_names` documentation
338 self.reader.read_to_end_into_async(end, buf).await
339 }
340
341 /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
342 /// event into given buffer asynchronously and resolves its namespace (if applicable).
343 ///
344 /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
345 /// For all other events the concept of namespace is not defined, so
346 /// a [`ResolveResult::Unbound`] is returned.
347 ///
348 /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
349 /// which will not automatically resolve namespaces for you.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// # tokio_test::block_on(async {
355 /// # use pretty_assertions::assert_eq;
356 /// use quick_xml::events::Event;
357 /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
358 /// use quick_xml::reader::NsReader;
359 ///
360 /// let mut reader = NsReader::from_reader(r#"
361 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
362 /// <y:tag2><!--Test comment-->Test</y:tag2>
363 /// <y:tag2>Test 2</y:tag2>
364 /// </x:tag1>
365 /// "#.as_bytes());
366 /// reader.config_mut().trim_text(true);
367 ///
368 /// let mut count = 0;
369 /// let mut buf = Vec::new();
370 /// let mut txt = Vec::new();
371 /// loop {
372 /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
373 /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
374 /// count += 1;
375 /// assert_eq!(e.local_name(), QName(b"tag1").into());
376 /// }
377 /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
378 /// count += 1;
379 /// assert_eq!(e.local_name(), QName(b"tag2").into());
380 /// }
381 /// (_, Event::Start(_)) => unreachable!(),
382 ///
383 /// (_, Event::Text(e)) => {
384 /// txt.push(e.unescape().unwrap().into_owned())
385 /// }
386 /// (_, Event::Eof) => break,
387 /// _ => (),
388 /// }
389 /// buf.clear();
390 /// }
391 /// assert_eq!(count, 3);
392 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
393 /// # }) // tokio_test::block_on
394 /// ```
395 ///
396 /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
397 /// [`Start`]: Event::Start
398 /// [`Empty`]: Event::Empty
399 /// [`End`]: Event::End
400 /// [`read_event_into_async()`]: Self::read_event_into_async
401 pub async fn read_resolved_event_into_async<'ns, 'b>(
402 // Name 'ns lifetime, because otherwise we get an error
403 // "implicit elided lifetime not allowed here" on ResolveResult
404 &'ns mut self,
405 buf: &'b mut Vec<u8>,
406 ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
407 let event = self.read_event_into_async(buf).await;
408 self.resolve_event(event)
409 }
410}
411
412#[cfg(test)]
413mod test {
414 use super::TokioAdapter;
415 use crate::reader::test::check;
416
417 check!(
418 #[tokio::test]
419 read_event_into_async,
420 read_until_close_async,
421 TokioAdapter,
422 &mut Vec::new(),
423 async,
424 await
425 );
426
427 #[test]
428 fn test_future_is_send() {
429 // This test should just compile, no actual runtime checks are performed here.
430 use super::*;
431 use tokio::io::BufReader;
432 fn check_send<T: Send>(_: T) {}
433
434 let input = vec![];
435 let mut reading_buf = vec![];
436 let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
437
438 check_send(reader.read_event_into_async(&mut reading_buf));
439 }
440}