noodles_vcf/async/io/
reader.rs

1pub mod header;
2mod query;
3mod record;
4
5use futures::{stream, Stream};
6use noodles_bgzf as bgzf;
7use noodles_core::Region;
8use noodles_csi::BinningIndex;
9use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeek};
10
11use self::{header::read_header, query::query, record::read_record};
12use crate::{io::reader::resolve_region, variant::RecordBuf, Header, Record};
13
14const LINE_FEED: char = '\n';
15const CARRIAGE_RETURN: char = '\r';
16
17/// An async VCF reader.
18///
19/// The VCF format has two main parts: 1) a header and 2) a list of VCF records.
20///
21/// Each header line is prefixed with a `#` (number sign) and is terminated by the header header
22/// (`#CHROM`...; inclusive).
23///
24/// VCF records are line-based and follow directly after the header until EOF.
25///
26/// # Examples
27///
28/// ```no_run
29/// # #[tokio::main]
30/// # async fn main() -> std::io::Result<()> {
31/// use futures::TryStreamExt;
32/// use noodles_vcf as vcf;
33/// use tokio::{fs::File, io::BufReader};
34///
35/// let mut reader = File::open("sample.vcf")
36///     .await
37///     .map(BufReader::new)
38///     .map(vcf::r#async::io::Reader::new)?;
39///
40/// let _header = reader.read_header().await?;
41///
42/// let mut records = reader.records();
43///
44/// while let Some(_record) = records.try_next().await? {
45///     // ...
46/// }
47/// # Ok(())
48/// # }
49/// ```
50pub struct Reader<R> {
51    inner: R,
52    buf: String,
53}
54
55impl<R> Reader<R> {
56    /// Returns a reference to the underlying reader.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// use noodles_vcf as vcf;
62    /// let data = [];
63    /// let reader = vcf::r#async::io::Reader::new(&data[..]);
64    /// assert!(reader.get_ref().is_empty());
65    /// ```
66    pub fn get_ref(&self) -> &R {
67        &self.inner
68    }
69
70    /// Returns a mutable reference to the underlying reader.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use noodles_vcf as vcf;
76    /// let data = [];
77    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
78    /// assert!(reader.get_mut().is_empty());
79    /// ```
80    pub fn get_mut(&mut self) -> &mut R {
81        &mut self.inner
82    }
83
84    /// Returns the underlying reader.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use noodles_vcf as vcf;
90    /// let data = [];
91    /// let reader = vcf::r#async::io::Reader::new(&data[..]);
92    /// assert!(reader.into_inner().is_empty());
93    /// ```
94    pub fn into_inner(self) -> R {
95        self.inner
96    }
97}
98
99impl<R> Reader<R>
100where
101    R: AsyncBufRead + Unpin,
102{
103    /// Creates an async VCF reader.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use noodles_vcf as vcf;
109    /// let data = [];
110    /// let reader = vcf::r#async::io::Reader::new(&data[..]);
111    /// ```
112    pub fn new(inner: R) -> Self {
113        Self {
114            inner,
115            buf: String::new(),
116        }
117    }
118
119    /// Returns an async VCF header reader.
120    ///
121    /// This creates an adapter that reads at most the length of the header, i.e., all lines
122    /// prefixed with a `#` (number sign).
123    ///
124    /// It is more ergonomic to read and parse the header using [`Self::read_header`], but using
125    /// this adapter allows for control of how the header is read, e.g., to read the raw VCF
126    /// header.
127    ///
128    /// The position of the stream is expected to be at the start.
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// # #[tokio::main]
134    /// # async fn main() -> tokio::io::Result<()> {
135    /// use noodles_vcf as vcf;
136    /// use tokio::io::AsyncReadExt;
137    ///
138    /// let data = b"##fileformat=VCFv4.3
139    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
140    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
141    /// ";
142    ///
143    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
144    /// let mut header_reader = reader.header_reader();
145    ///
146    /// let mut raw_header = String::new();
147    /// header_reader.read_to_string(&mut raw_header).await?;
148    ///
149    /// assert_eq!(raw_header, "##fileformat=VCFv4.3\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n");
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub fn header_reader(&mut self) -> header::Reader<&mut R> {
154        header::Reader::new(&mut self.inner)
155    }
156
157    /// Reads the VCF header.
158    ///
159    /// This reads all header lines prefixed with a `#` (number sign), which includes the header
160    /// header (`#CHROM`...), and parses it as a [`crate::Header`].
161    ///
162    /// The position of the stream is expected to be at the start.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// # use std::io;
168    /// #
169    /// # #[tokio::main]
170    /// # async fn main() -> io::Result<()> {
171    /// use noodles_vcf as vcf;
172    ///
173    /// let data = b"##fileformat=VCFv4.3
174    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
175    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
176    /// ";
177    ///
178    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
179    /// let header = reader.read_header().await?;
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub async fn read_header(&mut self) -> io::Result<Header> {
184        read_header(&mut self.inner).await
185    }
186
187    /// Reads a single VCF record.
188    ///
189    /// This reads a line from the underlying stream until a newline is reached and parses that
190    /// line into the given record.
191    ///
192    /// The stream is expected to be directly after the header or at the start of another record.
193    ///
194    /// It is more ergonomic to read records using a stream (see [`Self::records`] and
195    /// [`Self::query`]), but using this method allows control of the record buffer.
196    ///
197    /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
198    /// stream reached EOF.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// # use std::io;
204    /// #
205    /// # #[tokio::main]
206    /// # async fn main() -> io::Result<()> {
207    /// use noodles_vcf as vcf;
208    ///
209    /// let data = b"##fileformat=VCFv4.3
210    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
211    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
212    /// ";
213    ///
214    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
215    /// let header = reader.read_header().await?;
216    ///
217    /// let mut record = vcf::variant::RecordBuf::default();
218    /// reader.read_record_buf(&header, &mut record).await?;
219    /// # Ok(())
220    /// # }
221    /// ```
222    pub async fn read_record_buf(
223        &mut self,
224        header: &Header,
225        record: &mut RecordBuf,
226    ) -> io::Result<usize> {
227        use crate::io::reader::parse_record_buf;
228
229        self.buf.clear();
230
231        match read_line(&mut self.inner, &mut self.buf).await? {
232            0 => Ok(0),
233            n => {
234                parse_record_buf(&self.buf, header, record)
235                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
236
237                Ok(n)
238            }
239        }
240    }
241
242    /// Reads a single record without eagerly parsing its fields.
243    ///
244    /// The reads VCF record fields from the underlying stream into the given record's buffer until
245    /// a newline is reached. No fields are parsed, meaning the record is not necessarily valid.
246    /// However, the structure of the line is guaranteed to be record-like.
247    ///
248    /// The stream is expected to be directly after the header or at the start of another record.
249    ///
250    /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
251    /// stream reached EOF.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// # #[tokio::main]
257    /// # async fn main() -> std::io::Result<()> {
258    /// use noodles_vcf as vcf;
259    ///
260    /// let data = b"##fileformat=VCFv4.3
261    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
262    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
263    /// ";
264    ///
265    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
266    /// reader.read_header().await?;
267    ///
268    /// let mut record = vcf::Record::default();
269    /// reader.read_record(&mut record).await?;
270    /// # Ok::<_, std::io::Error>(())
271    /// # }
272    /// ```
273    pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
274        read_record(&mut self.inner, &mut self.buf, record).await
275    }
276
277    /// Returns a stream over records.
278    ///
279    /// The (input) stream is expected to be directly after the header or at the start of another
280    /// record.
281    ///
282    /// # Examples
283    ///
284    /// ```
285    /// # #[tokio::main]
286    /// # async fn main() -> std::io::Result<()> {
287    /// use futures::TryStreamExt;
288    /// use noodles_vcf as vcf;
289    ///
290    /// const DATA: &[u8] = b"##fileformat=VCFv4.3
291    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
292    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
293    /// ";
294    ///
295    /// let mut reader = vcf::r#async::io::Reader::new(DATA);
296    /// let header = reader.read_header().await?;
297    ///
298    /// let mut records = reader.records();
299    ///
300    /// while let Some(record) = records.try_next().await? {
301    ///     // ...
302    /// }
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
307        Box::pin(stream::try_unfold(self, move |reader| async move {
308            let mut record = Record::default();
309
310            reader.read_record(&mut record).await.map(|n| match n {
311                0 => None,
312                _ => Some((record, reader)),
313            })
314        }))
315    }
316
317    /// Returns an (async) stream over records starting from the current (input) stream position.
318    ///
319    /// The (input) stream is expected to be directly after the header or at the start of another
320    /// record.
321    ///
322    /// Unlike [`Self::read_record`], each record is parsed as a [`Record`].
323    ///
324    /// # Examples
325    ///
326    /// ```
327    /// # #[tokio::main]
328    /// # async fn main() -> std::io::Result<()> {
329    /// use futures::TryStreamExt;
330    /// use noodles_vcf as vcf;
331    ///
332    /// let data = b"##fileformat=VCFv4.3
333    /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
334    /// sq0\t1\t.\tA\t.\t.\tPASS\t.
335    /// ";
336    ///
337    /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
338    /// let header = reader.read_header().await?;
339    ///
340    /// let mut records = reader.record_bufs(&header);
341    ///
342    /// while let Some(record) = records.try_next().await? {
343    ///     // ...
344    /// }
345    /// # Ok(())
346    /// # }
347    /// ```
348    pub fn record_bufs<'r, 'h: 'r>(
349        &'r mut self,
350        header: &'h Header,
351    ) -> impl Stream<Item = io::Result<RecordBuf>> + 'r {
352        Box::pin(stream::try_unfold(self, move |reader| async move {
353            let mut record = RecordBuf::default();
354
355            reader
356                .read_record_buf(header, &mut record)
357                .await
358                .map(|n| match n {
359                    0 => None,
360                    _ => Some((record, reader)),
361                })
362        }))
363    }
364}
365
366impl<R> Reader<bgzf::AsyncReader<R>>
367where
368    R: AsyncRead + AsyncSeek + Unpin,
369{
370    /// Returns a stream over records that intersects the given region.
371    ///
372    /// The position of the (input) stream is expected to after the header or at the start of
373    /// another record.
374    ///
375    /// # Examples
376    ///
377    /// ```no_run
378    /// # #[tokio::main]
379    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
380    /// use futures::TryStreamExt;
381    /// use noodles_bgzf as bgzf;
382    /// use noodles_core::Region;
383    /// use noodles_tabix as tabix;
384    /// use noodles_vcf as vcf;
385    /// use tokio::fs::File;
386    ///
387    /// let mut reader = File::open("sample.vcf.gz")
388    ///     .await
389    ///     .map(bgzf::AsyncReader::new)
390    ///     .map(vcf::r#async::io::Reader::new)?;
391    ///
392    /// let header = reader.read_header().await?;
393    ///
394    /// let index = tabix::r#async::read("sample.vcf.gz.tbi").await?;
395    /// let region = "sq0:8-13".parse()?;
396    /// let mut query = reader.query(&header, &index, &region)?;
397    ///
398    /// while let Some(record) = query.try_next().await? {
399    ///     // ...
400    /// }
401    /// # Ok(())
402    /// # }
403    /// ```
404    pub fn query<'r, I>(
405        &'r mut self,
406        header: &'r Header,
407        index: &I,
408        region: &Region,
409    ) -> io::Result<impl Stream<Item = io::Result<Record>> + 'r>
410    where
411        I: BinningIndex,
412    {
413        let (reference_sequence_id, reference_sequence_name) = resolve_region(index, region)?;
414
415        let chunks = index.query(reference_sequence_id, region.interval())?;
416
417        Ok(query(
418            self,
419            chunks,
420            reference_sequence_name,
421            region.interval(),
422            header,
423        ))
424    }
425}
426
427async fn read_line<R>(reader: &mut R, buf: &mut String) -> io::Result<usize>
428where
429    R: AsyncBufRead + Unpin,
430{
431    match reader.read_line(buf).await {
432        Ok(0) => Ok(0),
433        Ok(n) => {
434            if buf.ends_with(LINE_FEED) {
435                buf.pop();
436
437                if buf.ends_with(CARRIAGE_RETURN) {
438                    buf.pop();
439                }
440            }
441
442            Ok(n)
443        }
444        Err(e) => Err(e),
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[tokio::test]
453    async fn test_read_line() -> io::Result<()> {
454        async fn t(buf: &mut String, mut data: &[u8], expected: &str) -> io::Result<()> {
455            buf.clear();
456            read_line(&mut data, buf).await?;
457            assert_eq!(buf, expected);
458            Ok(())
459        }
460
461        let mut buf = String::new();
462
463        t(&mut buf, b"noodles\n", "noodles").await?;
464        t(&mut buf, b"noodles\r\n", "noodles").await?;
465        t(&mut buf, b"noodles", "noodles").await?;
466
467        Ok(())
468    }
469}