quick_xml/reader/
async_tokio.rs

1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, Result, SyntaxError};
11use crate::events::Event;
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
16use crate::utils::is_whitespace;
17
18/// A struct for read XML asynchronously from an [`AsyncBufRead`].
19///
20/// Having own struct allows us to implement anything without risk of name conflicts
21/// and does not suffer from the impossibility of having `async` in traits.
22struct TokioAdapter<'a, R>(&'a mut R);
23
24impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
25    impl_buffered_source!('b, 0, async, await);
26}
27
28////////////////////////////////////////////////////////////////////////////////////////////////////
29
30impl<'r, R> AsyncRead for BinaryStream<'r, R>
31where
32    R: AsyncRead + Unpin,
33{
34    fn poll_read(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37        buf: &mut ReadBuf<'_>,
38    ) -> Poll<io::Result<()>> {
39        let start = buf.remaining();
40        let this = self.get_mut();
41        let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
42
43        // If something was read, update offset
44        if let Poll::Ready(Ok(_)) = poll {
45            let amt = start - buf.remaining();
46            *this.offset += amt as u64;
47        }
48        poll
49    }
50}
51
52impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
53where
54    R: AsyncBufRead + Unpin,
55{
56    #[inline]
57    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
58        Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
59    }
60
61    #[inline]
62    fn consume(self: Pin<&mut Self>, amt: usize) {
63        let this = self.get_mut();
64        this.inner.consume(amt);
65        *this.offset += amt as u64;
66    }
67}
68
69////////////////////////////////////////////////////////////////////////////////////////////////////
70
71impl<R: AsyncBufRead + Unpin> Reader<R> {
72    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
73    /// given buffer.
74    ///
75    /// This is the main entry point for reading XML `Event`s when using an async reader.
76    ///
77    /// See the documentation of [`read_event_into()`] for more information.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// # tokio_test::block_on(async {
83    /// # use pretty_assertions::assert_eq;
84    /// use quick_xml::events::Event;
85    /// use quick_xml::reader::Reader;
86    ///
87    /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
88    /// // reader instead of relying on the zero-copy optimizations for reading
89    /// // from byte slices, which provides the sync interface anyway.
90    /// let mut reader = Reader::from_reader(r#"
91    ///     <tag1 att1 = "test">
92    ///        <tag2><!--Test comment-->Test</tag2>
93    ///        <tag2>Test 2</tag2>
94    ///     </tag1>
95    /// "#.as_bytes());
96    /// reader.config_mut().trim_text(true);
97    ///
98    /// let mut count = 0;
99    /// let mut buf = Vec::new();
100    /// let mut txt = Vec::new();
101    /// loop {
102    ///     match reader.read_event_into_async(&mut buf).await {
103    ///         Ok(Event::Start(_)) => count += 1,
104    ///         Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()),
105    ///         Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
106    ///         Ok(Event::Eof) => break,
107    ///         _ => (),
108    ///     }
109    ///     buf.clear();
110    /// }
111    /// assert_eq!(count, 3);
112    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
113    /// # }) // tokio_test::block_on
114    /// ```
115    ///
116    /// [`read_event_into()`]: Reader::read_event_into
117    pub async fn read_event_into_async<'b>(
118        &mut self,
119        mut buf: &'b mut Vec<u8>,
120    ) -> Result<Event<'b>> {
121        read_event_impl!(
122            self,
123            buf,
124            TokioAdapter(&mut self.reader),
125            read_until_close_async,
126            await
127        )
128    }
129
130    /// An asynchronous version of [`read_to_end_into()`].
131    /// Reads asynchronously until end element is found using provided buffer as
132    /// intermediate storage for events content. This function is supposed to be
133    /// called after you already read a [`Start`] event.
134    ///
135    /// See the documentation of [`read_to_end_into()`] for more information.
136    ///
137    /// # Examples
138    ///
139    /// This example shows, how you can skip XML content after you read the
140    /// start event.
141    ///
142    /// ```
143    /// # tokio_test::block_on(async {
144    /// # use pretty_assertions::assert_eq;
145    /// use quick_xml::events::{BytesStart, Event};
146    /// use quick_xml::reader::Reader;
147    ///
148    /// let mut reader = Reader::from_reader(r#"
149    ///     <outer>
150    ///         <inner>
151    ///             <inner></inner>
152    ///             <inner/>
153    ///             <outer></outer>
154    ///             <outer/>
155    ///         </inner>
156    ///     </outer>
157    /// "#.as_bytes());
158    /// reader.config_mut().trim_text(true);
159    /// let mut buf = Vec::new();
160    ///
161    /// let start = BytesStart::new("outer");
162    /// let end   = start.to_end().into_owned();
163    ///
164    /// // First, we read a start event...
165    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
166    ///
167    /// // ...then, we could skip all events to the corresponding end event.
168    /// // This call will correctly handle nested <outer> elements.
169    /// // Note, however, that this method does not handle namespaces.
170    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
171    ///
172    /// // At the end we should get an Eof event, because we ate the whole XML
173    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
174    /// # }) // tokio_test::block_on
175    /// ```
176    ///
177    /// [`read_to_end_into()`]: Self::read_to_end_into
178    /// [`Start`]: Event::Start
179    pub async fn read_to_end_into_async<'n>(
180        &mut self,
181        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
182        end: QName<'n>,
183        buf: &mut Vec<u8>,
184    ) -> Result<Span> {
185        Ok(read_to_end!(
186            self,
187            end,
188            buf,
189            read_event_into_async,
190            {
191                buf.clear();
192            },
193            await
194        ))
195    }
196
197    /// Private function to read until `>` is found. This function expects that
198    /// it was called just after encounter a `<` symbol.
199    async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
200        read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
201    }
202}
203
204////////////////////////////////////////////////////////////////////////////////////////////////////
205
206impl<R: AsyncBufRead + Unpin> NsReader<R> {
207    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
208    /// given buffer.
209    ///
210    /// This method manages namespaces but doesn't resolve them automatically.
211    /// You should call [`resolve_element()`] if you want to get a namespace.
212    ///
213    /// You also can use [`read_resolved_event_into_async()`] instead if you want
214    /// to resolve namespace as soon as you get an event.
215    ///
216    /// # Examples
217    ///
218    /// ```
219    /// # tokio_test::block_on(async {
220    /// # use pretty_assertions::assert_eq;
221    /// use quick_xml::events::Event;
222    /// use quick_xml::name::{Namespace, ResolveResult::*};
223    /// use quick_xml::reader::NsReader;
224    ///
225    /// let mut reader = NsReader::from_reader(r#"
226    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
227    ///        <y:tag2><!--Test comment-->Test</y:tag2>
228    ///        <y:tag2>Test 2</y:tag2>
229    ///     </x:tag1>
230    /// "#.as_bytes());
231    /// reader.config_mut().trim_text(true);
232    ///
233    /// let mut count = 0;
234    /// let mut buf = Vec::new();
235    /// let mut txt = Vec::new();
236    /// loop {
237    ///     match reader.read_event_into_async(&mut buf).await.unwrap() {
238    ///         Event::Start(e) => {
239    ///             count += 1;
240    ///             let (ns, local) = reader.resolve_element(e.name());
241    ///             match local.as_ref() {
242    ///                 b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
243    ///                 b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
244    ///                 _ => unreachable!(),
245    ///             }
246    ///         }
247    ///         Event::Text(e) => {
248    ///             txt.push(e.unescape().unwrap().into_owned())
249    ///         }
250    ///         Event::Eof => break,
251    ///         _ => (),
252    ///     }
253    ///     buf.clear();
254    /// }
255    /// assert_eq!(count, 3);
256    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
257    /// # }) // tokio_test::block_on
258    /// ```
259    ///
260    /// [`read_event_into()`]: NsReader::read_event_into
261    /// [`resolve_element()`]: Self::resolve_element
262    /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
263    pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
264        self.pop();
265        let event = self.reader.read_event_into_async(buf).await;
266        self.process_event(event)
267    }
268
269    /// An asynchronous version of [`read_to_end_into()`].
270    /// Reads asynchronously until end element is found using provided buffer as
271    /// intermediate storage for events content. This function is supposed to be
272    /// called after you already read a [`Start`] event.
273    ///
274    /// See the documentation of [`read_to_end_into()`] for more information.
275    ///
276    /// # Examples
277    ///
278    /// This example shows, how you can skip XML content after you read the
279    /// start event.
280    ///
281    /// ```
282    /// # tokio_test::block_on(async {
283    /// # use pretty_assertions::assert_eq;
284    /// use quick_xml::name::{Namespace, ResolveResult};
285    /// use quick_xml::events::{BytesStart, Event};
286    /// use quick_xml::reader::NsReader;
287    ///
288    /// let mut reader = NsReader::from_reader(r#"
289    ///     <outer xmlns="namespace 1">
290    ///         <inner xmlns="namespace 2">
291    ///             <outer></outer>
292    ///         </inner>
293    ///         <inner>
294    ///             <inner></inner>
295    ///             <inner/>
296    ///             <outer></outer>
297    ///             <p:outer xmlns:p="ns"></p:outer>
298    ///             <outer/>
299    ///         </inner>
300    ///     </outer>
301    /// "#.as_bytes());
302    /// reader.config_mut().trim_text(true);
303    /// let mut buf = Vec::new();
304    ///
305    /// let ns = Namespace(b"namespace 1");
306    /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
307    /// let end   = start.to_end().into_owned();
308    ///
309    /// // First, we read a start event...
310    /// assert_eq!(
311    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
312    ///     (ResolveResult::Bound(ns), Event::Start(start))
313    /// );
314    ///
315    /// // ...then, we could skip all events to the corresponding end event.
316    /// // This call will correctly handle nested <outer> elements.
317    /// // Note, however, that this method does not handle namespaces.
318    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
319    ///
320    /// // At the end we should get an Eof event, because we ate the whole XML
321    /// assert_eq!(
322    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
323    ///     (ResolveResult::Unbound, Event::Eof)
324    /// );
325    /// # }) // tokio_test::block_on
326    /// ```
327    ///
328    /// [`read_to_end_into()`]: Self::read_to_end_into
329    /// [`Start`]: Event::Start
330    pub async fn read_to_end_into_async<'n>(
331        &mut self,
332        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
333        end: QName<'n>,
334        buf: &mut Vec<u8>,
335    ) -> Result<Span> {
336        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
337        // match literally the start name. See `Config::check_end_names` documentation
338        self.reader.read_to_end_into_async(end, buf).await
339    }
340
341    /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
342    /// event into given buffer asynchronously and resolves its namespace (if applicable).
343    ///
344    /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
345    /// For all other events the concept of namespace is not defined, so
346    /// a [`ResolveResult::Unbound`] is returned.
347    ///
348    /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
349    /// which will not automatically resolve namespaces for you.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// # tokio_test::block_on(async {
355    /// # use pretty_assertions::assert_eq;
356    /// use quick_xml::events::Event;
357    /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
358    /// use quick_xml::reader::NsReader;
359    ///
360    /// let mut reader = NsReader::from_reader(r#"
361    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
362    ///        <y:tag2><!--Test comment-->Test</y:tag2>
363    ///        <y:tag2>Test 2</y:tag2>
364    ///     </x:tag1>
365    /// "#.as_bytes());
366    /// reader.config_mut().trim_text(true);
367    ///
368    /// let mut count = 0;
369    /// let mut buf = Vec::new();
370    /// let mut txt = Vec::new();
371    /// loop {
372    ///     match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
373    ///         (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
374    ///             count += 1;
375    ///             assert_eq!(e.local_name(), QName(b"tag1").into());
376    ///         }
377    ///         (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
378    ///             count += 1;
379    ///             assert_eq!(e.local_name(), QName(b"tag2").into());
380    ///         }
381    ///         (_, Event::Start(_)) => unreachable!(),
382    ///
383    ///         (_, Event::Text(e)) => {
384    ///             txt.push(e.unescape().unwrap().into_owned())
385    ///         }
386    ///         (_, Event::Eof) => break,
387    ///         _ => (),
388    ///     }
389    ///     buf.clear();
390    /// }
391    /// assert_eq!(count, 3);
392    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
393    /// # }) // tokio_test::block_on
394    /// ```
395    ///
396    /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
397    /// [`Start`]: Event::Start
398    /// [`Empty`]: Event::Empty
399    /// [`End`]: Event::End
400    /// [`read_event_into_async()`]: Self::read_event_into_async
401    pub async fn read_resolved_event_into_async<'ns, 'b>(
402        // Name 'ns lifetime, because otherwise we get an error
403        // "implicit elided lifetime not allowed here" on ResolveResult
404        &'ns mut self,
405        buf: &'b mut Vec<u8>,
406    ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
407        let event = self.read_event_into_async(buf).await;
408        self.resolve_event(event)
409    }
410}
411
412#[cfg(test)]
413mod test {
414    use super::TokioAdapter;
415    use crate::reader::test::check;
416
417    check!(
418        #[tokio::test]
419        read_event_into_async,
420        read_until_close_async,
421        TokioAdapter,
422        &mut Vec::new(),
423        async,
424        await
425    );
426
427    #[test]
428    fn test_future_is_send() {
429        // This test should just compile, no actual runtime checks are performed here.
430        use super::*;
431        use tokio::io::BufReader;
432        fn check_send<T: Send>(_: T) {}
433
434        let input = vec![];
435        let mut reading_buf = vec![];
436        let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
437
438        check_send(reader.read_event_into_async(&mut reading_buf));
439    }
440}