quick_xml/reader/
async_tokio.rs

1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, Result, SyntaxError};
11use crate::events::Event;
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
16use crate::utils::is_whitespace;
17
18/// A struct for read XML asynchronously from an [`AsyncBufRead`].
19///
20/// Having own struct allows us to implement anything without risk of name conflicts
21/// and does not suffer from the impossibility of having `async` in traits.
22struct TokioAdapter<'a, R>(&'a mut R);
23
24impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
25    impl_buffered_source!('b, 0, async, await);
26}
27
28////////////////////////////////////////////////////////////////////////////////////////////////////
29
30impl<'r, R> AsyncRead for BinaryStream<'r, R>
31where
32    R: AsyncRead + Unpin,
33{
34    fn poll_read(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37        buf: &mut ReadBuf<'_>,
38    ) -> Poll<io::Result<()>> {
39        let start = buf.remaining();
40        let this = self.get_mut();
41        let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
42
43        // If something was read, update offset
44        if let Poll::Ready(Ok(_)) = poll {
45            let amt = start - buf.remaining();
46            *this.offset += amt as u64;
47        }
48        poll
49    }
50}
51
52impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
53where
54    R: AsyncBufRead + Unpin,
55{
56    #[inline]
57    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
58        Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
59    }
60
61    #[inline]
62    fn consume(self: Pin<&mut Self>, amt: usize) {
63        let this = self.get_mut();
64        this.inner.consume(amt);
65        *this.offset += amt as u64;
66    }
67}
68
69////////////////////////////////////////////////////////////////////////////////////////////////////
70
71impl<R: AsyncBufRead + Unpin> Reader<R> {
72    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
73    /// given buffer.
74    ///
75    /// This is the main entry point for reading XML `Event`s when using an async reader.
76    ///
77    /// See the documentation of [`read_event_into()`] for more information.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// # tokio_test::block_on(async {
83    /// # use pretty_assertions::assert_eq;
84    /// use quick_xml::events::Event;
85    /// use quick_xml::reader::Reader;
86    ///
87    /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
88    /// // reader instead of relying on the zero-copy optimizations for reading
89    /// // from byte slices, which provides the sync interface anyway.
90    /// let mut reader = Reader::from_reader(r#"
91    ///     <tag1 att1 = "test">
92    ///        <tag2><!--Test comment-->Test</tag2>
93    ///        <tag2>Test 2</tag2>
94    ///     </tag1>
95    /// "#.as_bytes());
96    /// reader.config_mut().trim_text(true);
97    ///
98    /// let mut count = 0;
99    /// let mut buf = Vec::new();
100    /// let mut txt = Vec::new();
101    /// loop {
102    ///     match reader.read_event_into_async(&mut buf).await {
103    ///         Ok(Event::Start(_)) => count += 1,
104    ///         Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()),
105    ///         Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
106    ///         Ok(Event::Eof) => break,
107    ///         _ => (),
108    ///     }
109    ///     buf.clear();
110    /// }
111    /// assert_eq!(count, 3);
112    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
113    /// # }) // tokio_test::block_on
114    /// ```
115    ///
116    /// [`read_event_into()`]: Reader::read_event_into
117    pub async fn read_event_into_async<'b>(
118        &mut self,
119        mut buf: &'b mut Vec<u8>,
120    ) -> Result<Event<'b>> {
121        read_event_impl!(
122            self, buf,
123            TokioAdapter(&mut self.reader),
124            read_until_close_async,
125            await
126        )
127    }
128
129    /// An asynchronous version of [`read_to_end_into()`].
130    /// Reads asynchronously until end element is found using provided buffer as
131    /// intermediate storage for events content. This function is supposed to be
132    /// called after you already read a [`Start`] event.
133    ///
134    /// See the documentation of [`read_to_end_into()`] for more information.
135    ///
136    /// # Examples
137    ///
138    /// This example shows, how you can skip XML content after you read the
139    /// start event.
140    ///
141    /// ```
142    /// # tokio_test::block_on(async {
143    /// # use pretty_assertions::assert_eq;
144    /// use quick_xml::events::{BytesStart, Event};
145    /// use quick_xml::reader::Reader;
146    ///
147    /// let mut reader = Reader::from_reader(r#"
148    ///     <outer>
149    ///         <inner>
150    ///             <inner></inner>
151    ///             <inner/>
152    ///             <outer></outer>
153    ///             <outer/>
154    ///         </inner>
155    ///     </outer>
156    /// "#.as_bytes());
157    /// reader.config_mut().trim_text(true);
158    /// let mut buf = Vec::new();
159    ///
160    /// let start = BytesStart::new("outer");
161    /// let end   = start.to_end().into_owned();
162    ///
163    /// // First, we read a start event...
164    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
165    ///
166    /// // ...then, we could skip all events to the corresponding end event.
167    /// // This call will correctly handle nested <outer> elements.
168    /// // Note, however, that this method does not handle namespaces.
169    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
170    ///
171    /// // At the end we should get an Eof event, because we ate the whole XML
172    /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
173    /// # }) // tokio_test::block_on
174    /// ```
175    ///
176    /// [`read_to_end_into()`]: Self::read_to_end_into
177    /// [`Start`]: Event::Start
178    pub async fn read_to_end_into_async<'n>(
179        &mut self,
180        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
181        end: QName<'n>,
182        buf: &mut Vec<u8>,
183    ) -> Result<Span> {
184        Ok(read_to_end!(self, end, buf, read_event_into_async, { buf.clear(); }, await))
185    }
186
187    /// Private function to read until `>` is found. This function expects that
188    /// it was called just after encounter a `<` symbol.
189    async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
190        read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
191    }
192}
193
194////////////////////////////////////////////////////////////////////////////////////////////////////
195
196impl<R: AsyncBufRead + Unpin> NsReader<R> {
197    /// An asynchronous version of [`read_event_into()`]. Reads the next event into
198    /// given buffer.
199    ///
200    /// This method manages namespaces but doesn't resolve them automatically.
201    /// You should call [`resolve_element()`] if you want to get a namespace.
202    ///
203    /// You also can use [`read_resolved_event_into_async()`] instead if you want
204    /// to resolve namespace as soon as you get an event.
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// # tokio_test::block_on(async {
210    /// # use pretty_assertions::assert_eq;
211    /// use quick_xml::events::Event;
212    /// use quick_xml::name::{Namespace, ResolveResult::*};
213    /// use quick_xml::reader::NsReader;
214    ///
215    /// let mut reader = NsReader::from_reader(r#"
216    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
217    ///        <y:tag2><!--Test comment-->Test</y:tag2>
218    ///        <y:tag2>Test 2</y:tag2>
219    ///     </x:tag1>
220    /// "#.as_bytes());
221    /// reader.config_mut().trim_text(true);
222    ///
223    /// let mut count = 0;
224    /// let mut buf = Vec::new();
225    /// let mut txt = Vec::new();
226    /// loop {
227    ///     match reader.read_event_into_async(&mut buf).await.unwrap() {
228    ///         Event::Start(e) => {
229    ///             count += 1;
230    ///             let (ns, local) = reader.resolve_element(e.name());
231    ///             match local.as_ref() {
232    ///                 b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
233    ///                 b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
234    ///                 _ => unreachable!(),
235    ///             }
236    ///         }
237    ///         Event::Text(e) => {
238    ///             txt.push(e.unescape().unwrap().into_owned())
239    ///         }
240    ///         Event::Eof => break,
241    ///         _ => (),
242    ///     }
243    ///     buf.clear();
244    /// }
245    /// assert_eq!(count, 3);
246    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
247    /// # }) // tokio_test::block_on
248    /// ```
249    ///
250    /// [`read_event_into()`]: NsReader::read_event_into
251    /// [`resolve_element()`]: Self::resolve_element
252    /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
253    pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
254        self.pop();
255        let event = self.reader.read_event_into_async(buf).await;
256        self.process_event(event)
257    }
258
259    /// An asynchronous version of [`read_to_end_into()`].
260    /// Reads asynchronously until end element is found using provided buffer as
261    /// intermediate storage for events content. This function is supposed to be
262    /// called after you already read a [`Start`] event.
263    ///
264    /// See the documentation of [`read_to_end_into()`] for more information.
265    ///
266    /// # Examples
267    ///
268    /// This example shows, how you can skip XML content after you read the
269    /// start event.
270    ///
271    /// ```
272    /// # tokio_test::block_on(async {
273    /// # use pretty_assertions::assert_eq;
274    /// use quick_xml::name::{Namespace, ResolveResult};
275    /// use quick_xml::events::{BytesStart, Event};
276    /// use quick_xml::reader::NsReader;
277    ///
278    /// let mut reader = NsReader::from_reader(r#"
279    ///     <outer xmlns="namespace 1">
280    ///         <inner xmlns="namespace 2">
281    ///             <outer></outer>
282    ///         </inner>
283    ///         <inner>
284    ///             <inner></inner>
285    ///             <inner/>
286    ///             <outer></outer>
287    ///             <p:outer xmlns:p="ns"></p:outer>
288    ///             <outer/>
289    ///         </inner>
290    ///     </outer>
291    /// "#.as_bytes());
292    /// reader.config_mut().trim_text(true);
293    /// let mut buf = Vec::new();
294    ///
295    /// let ns = Namespace(b"namespace 1");
296    /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
297    /// let end   = start.to_end().into_owned();
298    ///
299    /// // First, we read a start event...
300    /// assert_eq!(
301    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
302    ///     (ResolveResult::Bound(ns), Event::Start(start))
303    /// );
304    ///
305    /// // ...then, we could skip all events to the corresponding end event.
306    /// // This call will correctly handle nested <outer> elements.
307    /// // Note, however, that this method does not handle namespaces.
308    /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
309    ///
310    /// // At the end we should get an Eof event, because we ate the whole XML
311    /// assert_eq!(
312    ///     reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
313    ///     (ResolveResult::Unbound, Event::Eof)
314    /// );
315    /// # }) // tokio_test::block_on
316    /// ```
317    ///
318    /// [`read_to_end_into()`]: Self::read_to_end_into
319    /// [`Start`]: Event::Start
320    pub async fn read_to_end_into_async<'n>(
321        &mut self,
322        // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
323        end: QName<'n>,
324        buf: &mut Vec<u8>,
325    ) -> Result<Span> {
326        // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
327        // match literally the start name. See `Config::check_end_names` documentation
328        self.reader.read_to_end_into_async(end, buf).await
329    }
330
331    /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
332    /// event into given buffer asynchronously and resolves its namespace (if applicable).
333    ///
334    /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
335    /// For all other events the concept of namespace is not defined, so
336    /// a [`ResolveResult::Unbound`] is returned.
337    ///
338    /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
339    /// which will not automatically resolve namespaces for you.
340    ///
341    /// # Examples
342    ///
343    /// ```
344    /// # tokio_test::block_on(async {
345    /// # use pretty_assertions::assert_eq;
346    /// use quick_xml::events::Event;
347    /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
348    /// use quick_xml::reader::NsReader;
349    ///
350    /// let mut reader = NsReader::from_reader(r#"
351    ///     <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
352    ///        <y:tag2><!--Test comment-->Test</y:tag2>
353    ///        <y:tag2>Test 2</y:tag2>
354    ///     </x:tag1>
355    /// "#.as_bytes());
356    /// reader.config_mut().trim_text(true);
357    ///
358    /// let mut count = 0;
359    /// let mut buf = Vec::new();
360    /// let mut txt = Vec::new();
361    /// loop {
362    ///     match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
363    ///         (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
364    ///             count += 1;
365    ///             assert_eq!(e.local_name(), QName(b"tag1").into());
366    ///         }
367    ///         (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
368    ///             count += 1;
369    ///             assert_eq!(e.local_name(), QName(b"tag2").into());
370    ///         }
371    ///         (_, Event::Start(_)) => unreachable!(),
372    ///
373    ///         (_, Event::Text(e)) => {
374    ///             txt.push(e.unescape().unwrap().into_owned())
375    ///         }
376    ///         (_, Event::Eof) => break,
377    ///         _ => (),
378    ///     }
379    ///     buf.clear();
380    /// }
381    /// assert_eq!(count, 3);
382    /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
383    /// # }) // tokio_test::block_on
384    /// ```
385    ///
386    /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
387    /// [`Start`]: Event::Start
388    /// [`Empty`]: Event::Empty
389    /// [`End`]: Event::End
390    /// [`read_event_into_async()`]: Self::read_event_into_async
391    pub async fn read_resolved_event_into_async<'ns, 'b>(
392        // Name 'ns lifetime, because otherwise we get an error
393        // "implicit elided lifetime not allowed here" on ResolveResult
394        &'ns mut self,
395        buf: &'b mut Vec<u8>,
396    ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
397        let event = self.read_event_into_async(buf).await;
398        self.resolve_event(event)
399    }
400}
401
402#[cfg(test)]
403mod test {
404    use super::TokioAdapter;
405    use crate::reader::test::check;
406
407    check!(
408        #[tokio::test]
409        read_event_into_async,
410        read_until_close_async,
411        TokioAdapter,
412        &mut Vec::new(),
413        async, await
414    );
415
416    #[test]
417    fn test_future_is_send() {
418        // This test should just compile, no actual runtime checks are performed here.
419        use super::*;
420        use tokio::io::BufReader;
421        fn check_send<T: Send>(_: T) {}
422
423        let input = vec![];
424        let mut reading_buf = vec![];
425        let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
426
427        check_send(reader.read_event_into_async(&mut reading_buf));
428    }
429}