quick_xml/reader/async_tokio.rs
1//! This is an implementation of [`Reader`] for reading from a [`AsyncBufRead`]
2//! as underlying byte stream. This reader fully implements async/await so reading
3//! can use non-blocking I/O.
4
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
9
10use crate::errors::{Error, Result, SyntaxError};
11use crate::events::Event;
12use crate::name::{QName, ResolveResult};
13use crate::parser::{ElementParser, Parser, PiParser};
14use crate::reader::buffered_reader::impl_buffered_source;
15use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
16use crate::utils::is_whitespace;
17
18/// A struct for read XML asynchronously from an [`AsyncBufRead`].
19///
20/// Having own struct allows us to implement anything without risk of name conflicts
21/// and does not suffer from the impossibility of having `async` in traits.
22struct TokioAdapter<'a, R>(&'a mut R);
23
24impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
25 impl_buffered_source!('b, 0, async, await);
26}
27
28////////////////////////////////////////////////////////////////////////////////////////////////////
29
30impl<'r, R> AsyncRead for BinaryStream<'r, R>
31where
32 R: AsyncRead + Unpin,
33{
34 fn poll_read(
35 self: Pin<&mut Self>,
36 cx: &mut Context<'_>,
37 buf: &mut ReadBuf<'_>,
38 ) -> Poll<io::Result<()>> {
39 let start = buf.remaining();
40 let this = self.get_mut();
41 let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
42
43 // If something was read, update offset
44 if let Poll::Ready(Ok(_)) = poll {
45 let amt = start - buf.remaining();
46 *this.offset += amt as u64;
47 }
48 poll
49 }
50}
51
52impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
53where
54 R: AsyncBufRead + Unpin,
55{
56 #[inline]
57 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
58 Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
59 }
60
61 #[inline]
62 fn consume(self: Pin<&mut Self>, amt: usize) {
63 let this = self.get_mut();
64 this.inner.consume(amt);
65 *this.offset += amt as u64;
66 }
67}
68
69////////////////////////////////////////////////////////////////////////////////////////////////////
70
71impl<R: AsyncBufRead + Unpin> Reader<R> {
72 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
73 /// given buffer.
74 ///
75 /// This is the main entry point for reading XML `Event`s when using an async reader.
76 ///
77 /// See the documentation of [`read_event_into()`] for more information.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// # tokio_test::block_on(async {
83 /// # use pretty_assertions::assert_eq;
84 /// use quick_xml::events::Event;
85 /// use quick_xml::reader::Reader;
86 ///
87 /// // This explicitly uses `from_reader("...".as_bytes())` to use a buffered
88 /// // reader instead of relying on the zero-copy optimizations for reading
89 /// // from byte slices, which provides the sync interface anyway.
90 /// let mut reader = Reader::from_reader(r#"
91 /// <tag1 att1 = "test">
92 /// <tag2><!--Test comment-->Test</tag2>
93 /// <tag2>Test 2</tag2>
94 /// </tag1>
95 /// "#.as_bytes());
96 /// reader.config_mut().trim_text(true);
97 ///
98 /// let mut count = 0;
99 /// let mut buf = Vec::new();
100 /// let mut txt = Vec::new();
101 /// loop {
102 /// match reader.read_event_into_async(&mut buf).await {
103 /// Ok(Event::Start(_)) => count += 1,
104 /// Ok(Event::Text(e)) => txt.push(e.unescape().unwrap().into_owned()),
105 /// Err(e) => panic!("Error at position {}: {:?}", reader.error_position(), e),
106 /// Ok(Event::Eof) => break,
107 /// _ => (),
108 /// }
109 /// buf.clear();
110 /// }
111 /// assert_eq!(count, 3);
112 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
113 /// # }) // tokio_test::block_on
114 /// ```
115 ///
116 /// [`read_event_into()`]: Reader::read_event_into
117 pub async fn read_event_into_async<'b>(
118 &mut self,
119 mut buf: &'b mut Vec<u8>,
120 ) -> Result<Event<'b>> {
121 read_event_impl!(
122 self, buf,
123 TokioAdapter(&mut self.reader),
124 read_until_close_async,
125 await
126 )
127 }
128
129 /// An asynchronous version of [`read_to_end_into()`].
130 /// Reads asynchronously until end element is found using provided buffer as
131 /// intermediate storage for events content. This function is supposed to be
132 /// called after you already read a [`Start`] event.
133 ///
134 /// See the documentation of [`read_to_end_into()`] for more information.
135 ///
136 /// # Examples
137 ///
138 /// This example shows, how you can skip XML content after you read the
139 /// start event.
140 ///
141 /// ```
142 /// # tokio_test::block_on(async {
143 /// # use pretty_assertions::assert_eq;
144 /// use quick_xml::events::{BytesStart, Event};
145 /// use quick_xml::reader::Reader;
146 ///
147 /// let mut reader = Reader::from_reader(r#"
148 /// <outer>
149 /// <inner>
150 /// <inner></inner>
151 /// <inner/>
152 /// <outer></outer>
153 /// <outer/>
154 /// </inner>
155 /// </outer>
156 /// "#.as_bytes());
157 /// reader.config_mut().trim_text(true);
158 /// let mut buf = Vec::new();
159 ///
160 /// let start = BytesStart::new("outer");
161 /// let end = start.to_end().into_owned();
162 ///
163 /// // First, we read a start event...
164 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start));
165 ///
166 /// // ...then, we could skip all events to the corresponding end event.
167 /// // This call will correctly handle nested <outer> elements.
168 /// // Note, however, that this method does not handle namespaces.
169 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
170 ///
171 /// // At the end we should get an Eof event, because we ate the whole XML
172 /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof);
173 /// # }) // tokio_test::block_on
174 /// ```
175 ///
176 /// [`read_to_end_into()`]: Self::read_to_end_into
177 /// [`Start`]: Event::Start
178 pub async fn read_to_end_into_async<'n>(
179 &mut self,
180 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
181 end: QName<'n>,
182 buf: &mut Vec<u8>,
183 ) -> Result<Span> {
184 Ok(read_to_end!(self, end, buf, read_event_into_async, { buf.clear(); }, await))
185 }
186
187 /// Private function to read until `>` is found. This function expects that
188 /// it was called just after encounter a `<` symbol.
189 async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
190 read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
191 }
192}
193
194////////////////////////////////////////////////////////////////////////////////////////////////////
195
196impl<R: AsyncBufRead + Unpin> NsReader<R> {
197 /// An asynchronous version of [`read_event_into()`]. Reads the next event into
198 /// given buffer.
199 ///
200 /// This method manages namespaces but doesn't resolve them automatically.
201 /// You should call [`resolve_element()`] if you want to get a namespace.
202 ///
203 /// You also can use [`read_resolved_event_into_async()`] instead if you want
204 /// to resolve namespace as soon as you get an event.
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// # tokio_test::block_on(async {
210 /// # use pretty_assertions::assert_eq;
211 /// use quick_xml::events::Event;
212 /// use quick_xml::name::{Namespace, ResolveResult::*};
213 /// use quick_xml::reader::NsReader;
214 ///
215 /// let mut reader = NsReader::from_reader(r#"
216 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
217 /// <y:tag2><!--Test comment-->Test</y:tag2>
218 /// <y:tag2>Test 2</y:tag2>
219 /// </x:tag1>
220 /// "#.as_bytes());
221 /// reader.config_mut().trim_text(true);
222 ///
223 /// let mut count = 0;
224 /// let mut buf = Vec::new();
225 /// let mut txt = Vec::new();
226 /// loop {
227 /// match reader.read_event_into_async(&mut buf).await.unwrap() {
228 /// Event::Start(e) => {
229 /// count += 1;
230 /// let (ns, local) = reader.resolve_element(e.name());
231 /// match local.as_ref() {
232 /// b"tag1" => assert_eq!(ns, Bound(Namespace(b"www.xxxx"))),
233 /// b"tag2" => assert_eq!(ns, Bound(Namespace(b"www.yyyy"))),
234 /// _ => unreachable!(),
235 /// }
236 /// }
237 /// Event::Text(e) => {
238 /// txt.push(e.unescape().unwrap().into_owned())
239 /// }
240 /// Event::Eof => break,
241 /// _ => (),
242 /// }
243 /// buf.clear();
244 /// }
245 /// assert_eq!(count, 3);
246 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
247 /// # }) // tokio_test::block_on
248 /// ```
249 ///
250 /// [`read_event_into()`]: NsReader::read_event_into
251 /// [`resolve_element()`]: Self::resolve_element
252 /// [`read_resolved_event_into_async()`]: Self::read_resolved_event_into_async
253 pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
254 self.pop();
255 let event = self.reader.read_event_into_async(buf).await;
256 self.process_event(event)
257 }
258
259 /// An asynchronous version of [`read_to_end_into()`].
260 /// Reads asynchronously until end element is found using provided buffer as
261 /// intermediate storage for events content. This function is supposed to be
262 /// called after you already read a [`Start`] event.
263 ///
264 /// See the documentation of [`read_to_end_into()`] for more information.
265 ///
266 /// # Examples
267 ///
268 /// This example shows, how you can skip XML content after you read the
269 /// start event.
270 ///
271 /// ```
272 /// # tokio_test::block_on(async {
273 /// # use pretty_assertions::assert_eq;
274 /// use quick_xml::name::{Namespace, ResolveResult};
275 /// use quick_xml::events::{BytesStart, Event};
276 /// use quick_xml::reader::NsReader;
277 ///
278 /// let mut reader = NsReader::from_reader(r#"
279 /// <outer xmlns="namespace 1">
280 /// <inner xmlns="namespace 2">
281 /// <outer></outer>
282 /// </inner>
283 /// <inner>
284 /// <inner></inner>
285 /// <inner/>
286 /// <outer></outer>
287 /// <p:outer xmlns:p="ns"></p:outer>
288 /// <outer/>
289 /// </inner>
290 /// </outer>
291 /// "#.as_bytes());
292 /// reader.config_mut().trim_text(true);
293 /// let mut buf = Vec::new();
294 ///
295 /// let ns = Namespace(b"namespace 1");
296 /// let start = BytesStart::from_content(r#"outer xmlns="namespace 1""#, 5);
297 /// let end = start.to_end().into_owned();
298 ///
299 /// // First, we read a start event...
300 /// assert_eq!(
301 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
302 /// (ResolveResult::Bound(ns), Event::Start(start))
303 /// );
304 ///
305 /// // ...then, we could skip all events to the corresponding end event.
306 /// // This call will correctly handle nested <outer> elements.
307 /// // Note, however, that this method does not handle namespaces.
308 /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap();
309 ///
310 /// // At the end we should get an Eof event, because we ate the whole XML
311 /// assert_eq!(
312 /// reader.read_resolved_event_into_async(&mut buf).await.unwrap(),
313 /// (ResolveResult::Unbound, Event::Eof)
314 /// );
315 /// # }) // tokio_test::block_on
316 /// ```
317 ///
318 /// [`read_to_end_into()`]: Self::read_to_end_into
319 /// [`Start`]: Event::Start
320 pub async fn read_to_end_into_async<'n>(
321 &mut self,
322 // We should name that lifetime due to https://github.com/rust-lang/rust/issues/63033`
323 end: QName<'n>,
324 buf: &mut Vec<u8>,
325 ) -> Result<Span> {
326 // According to the https://www.w3.org/TR/xml11/#dt-etag, end name should
327 // match literally the start name. See `Config::check_end_names` documentation
328 self.reader.read_to_end_into_async(end, buf).await
329 }
330
331 /// An asynchronous version of [`read_resolved_event_into()`]. Reads the next
332 /// event into given buffer asynchronously and resolves its namespace (if applicable).
333 ///
334 /// Namespace is resolved only for [`Start`], [`Empty`] and [`End`] events.
335 /// For all other events the concept of namespace is not defined, so
336 /// a [`ResolveResult::Unbound`] is returned.
337 ///
338 /// If you are not interested in namespaces, you can use [`read_event_into_async()`]
339 /// which will not automatically resolve namespaces for you.
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// # tokio_test::block_on(async {
345 /// # use pretty_assertions::assert_eq;
346 /// use quick_xml::events::Event;
347 /// use quick_xml::name::{Namespace, QName, ResolveResult::*};
348 /// use quick_xml::reader::NsReader;
349 ///
350 /// let mut reader = NsReader::from_reader(r#"
351 /// <x:tag1 xmlns:x="www.xxxx" xmlns:y="www.yyyy" att1 = "test">
352 /// <y:tag2><!--Test comment-->Test</y:tag2>
353 /// <y:tag2>Test 2</y:tag2>
354 /// </x:tag1>
355 /// "#.as_bytes());
356 /// reader.config_mut().trim_text(true);
357 ///
358 /// let mut count = 0;
359 /// let mut buf = Vec::new();
360 /// let mut txt = Vec::new();
361 /// loop {
362 /// match reader.read_resolved_event_into_async(&mut buf).await.unwrap() {
363 /// (Bound(Namespace(b"www.xxxx")), Event::Start(e)) => {
364 /// count += 1;
365 /// assert_eq!(e.local_name(), QName(b"tag1").into());
366 /// }
367 /// (Bound(Namespace(b"www.yyyy")), Event::Start(e)) => {
368 /// count += 1;
369 /// assert_eq!(e.local_name(), QName(b"tag2").into());
370 /// }
371 /// (_, Event::Start(_)) => unreachable!(),
372 ///
373 /// (_, Event::Text(e)) => {
374 /// txt.push(e.unescape().unwrap().into_owned())
375 /// }
376 /// (_, Event::Eof) => break,
377 /// _ => (),
378 /// }
379 /// buf.clear();
380 /// }
381 /// assert_eq!(count, 3);
382 /// assert_eq!(txt, vec!["Test".to_string(), "Test 2".to_string()]);
383 /// # }) // tokio_test::block_on
384 /// ```
385 ///
386 /// [`read_resolved_event_into()`]: NsReader::read_resolved_event_into
387 /// [`Start`]: Event::Start
388 /// [`Empty`]: Event::Empty
389 /// [`End`]: Event::End
390 /// [`read_event_into_async()`]: Self::read_event_into_async
391 pub async fn read_resolved_event_into_async<'ns, 'b>(
392 // Name 'ns lifetime, because otherwise we get an error
393 // "implicit elided lifetime not allowed here" on ResolveResult
394 &'ns mut self,
395 buf: &'b mut Vec<u8>,
396 ) -> Result<(ResolveResult<'ns>, Event<'b>)> {
397 let event = self.read_event_into_async(buf).await;
398 self.resolve_event(event)
399 }
400}
401
402#[cfg(test)]
403mod test {
404 use super::TokioAdapter;
405 use crate::reader::test::check;
406
407 check!(
408 #[tokio::test]
409 read_event_into_async,
410 read_until_close_async,
411 TokioAdapter,
412 &mut Vec::new(),
413 async, await
414 );
415
416 #[test]
417 fn test_future_is_send() {
418 // This test should just compile, no actual runtime checks are performed here.
419 use super::*;
420 use tokio::io::BufReader;
421 fn check_send<T: Send>(_: T) {}
422
423 let input = vec![];
424 let mut reading_buf = vec![];
425 let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
426
427 check_send(reader.read_event_into_async(&mut reading_buf));
428 }
429}