noodles_vcf/async/io/reader.rs
1pub mod header;
2mod query;
3mod record;
4
5use futures::{stream, Stream};
6use noodles_bgzf as bgzf;
7use noodles_core::Region;
8use noodles_csi::BinningIndex;
9use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeek};
10
11use self::{header::read_header, query::query, record::read_record};
12use crate::{io::reader::resolve_region, variant::RecordBuf, Header, Record};
13
14const LINE_FEED: char = '\n';
15const CARRIAGE_RETURN: char = '\r';
16
17/// An async VCF reader.
18///
19/// The VCF format has two main parts: 1) a header and 2) a list of VCF records.
20///
21/// Each header line is prefixed with a `#` (number sign) and is terminated by the header header
22/// (`#CHROM`...; inclusive).
23///
24/// VCF records are line-based and follow directly after the header until EOF.
25///
26/// # Examples
27///
28/// ```no_run
29/// # #[tokio::main]
30/// # async fn main() -> std::io::Result<()> {
31/// use futures::TryStreamExt;
32/// use noodles_vcf as vcf;
33/// use tokio::{fs::File, io::BufReader};
34///
35/// let mut reader = File::open("sample.vcf")
36/// .await
37/// .map(BufReader::new)
38/// .map(vcf::r#async::io::Reader::new)?;
39///
40/// let _header = reader.read_header().await?;
41///
42/// let mut records = reader.records();
43///
44/// while let Some(_record) = records.try_next().await? {
45/// // ...
46/// }
47/// # Ok(())
48/// # }
49/// ```
50pub struct Reader<R> {
51 inner: R,
52 buf: String,
53}
54
55impl<R> Reader<R> {
56 /// Returns a reference to the underlying reader.
57 ///
58 /// # Examples
59 ///
60 /// ```
61 /// use noodles_vcf as vcf;
62 /// let data = [];
63 /// let reader = vcf::r#async::io::Reader::new(&data[..]);
64 /// assert!(reader.get_ref().is_empty());
65 /// ```
66 pub fn get_ref(&self) -> &R {
67 &self.inner
68 }
69
70 /// Returns a mutable reference to the underlying reader.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use noodles_vcf as vcf;
76 /// let data = [];
77 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
78 /// assert!(reader.get_mut().is_empty());
79 /// ```
80 pub fn get_mut(&mut self) -> &mut R {
81 &mut self.inner
82 }
83
84 /// Returns the underlying reader.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use noodles_vcf as vcf;
90 /// let data = [];
91 /// let reader = vcf::r#async::io::Reader::new(&data[..]);
92 /// assert!(reader.into_inner().is_empty());
93 /// ```
94 pub fn into_inner(self) -> R {
95 self.inner
96 }
97}
98
99impl<R> Reader<R>
100where
101 R: AsyncBufRead + Unpin,
102{
103 /// Creates an async VCF reader.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use noodles_vcf as vcf;
109 /// let data = [];
110 /// let reader = vcf::r#async::io::Reader::new(&data[..]);
111 /// ```
112 pub fn new(inner: R) -> Self {
113 Self {
114 inner,
115 buf: String::new(),
116 }
117 }
118
119 /// Returns an async VCF header reader.
120 ///
121 /// This creates an adapter that reads at most the length of the header, i.e., all lines
122 /// prefixed with a `#` (number sign).
123 ///
124 /// It is more ergonomic to read and parse the header using [`Self::read_header`], but using
125 /// this adapter allows for control of how the header is read, e.g., to read the raw VCF
126 /// header.
127 ///
128 /// The position of the stream is expected to be at the start.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// # #[tokio::main]
134 /// # async fn main() -> tokio::io::Result<()> {
135 /// use noodles_vcf as vcf;
136 /// use tokio::io::AsyncReadExt;
137 ///
138 /// let data = b"##fileformat=VCFv4.3
139 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
140 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
141 /// ";
142 ///
143 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
144 /// let mut header_reader = reader.header_reader();
145 ///
146 /// let mut raw_header = String::new();
147 /// header_reader.read_to_string(&mut raw_header).await?;
148 ///
149 /// assert_eq!(raw_header, "##fileformat=VCFv4.3\n#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\n");
150 /// # Ok(())
151 /// # }
152 /// ```
153 pub fn header_reader(&mut self) -> header::Reader<&mut R> {
154 header::Reader::new(&mut self.inner)
155 }
156
157 /// Reads the VCF header.
158 ///
159 /// This reads all header lines prefixed with a `#` (number sign), which includes the header
160 /// header (`#CHROM`...), and parses it as a [`crate::Header`].
161 ///
162 /// The position of the stream is expected to be at the start.
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// # use std::io;
168 /// #
169 /// # #[tokio::main]
170 /// # async fn main() -> io::Result<()> {
171 /// use noodles_vcf as vcf;
172 ///
173 /// let data = b"##fileformat=VCFv4.3
174 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
175 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
176 /// ";
177 ///
178 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
179 /// let header = reader.read_header().await?;
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub async fn read_header(&mut self) -> io::Result<Header> {
184 read_header(&mut self.inner).await
185 }
186
187 /// Reads a single VCF record.
188 ///
189 /// This reads a line from the underlying stream until a newline is reached and parses that
190 /// line into the given record.
191 ///
192 /// The stream is expected to be directly after the header or at the start of another record.
193 ///
194 /// It is more ergonomic to read records using a stream (see [`Self::records`] and
195 /// [`Self::query`]), but using this method allows control of the record buffer.
196 ///
197 /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
198 /// stream reached EOF.
199 ///
200 /// # Examples
201 ///
202 /// ```
203 /// # use std::io;
204 /// #
205 /// # #[tokio::main]
206 /// # async fn main() -> io::Result<()> {
207 /// use noodles_vcf as vcf;
208 ///
209 /// let data = b"##fileformat=VCFv4.3
210 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
211 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
212 /// ";
213 ///
214 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
215 /// let header = reader.read_header().await?;
216 ///
217 /// let mut record = vcf::variant::RecordBuf::default();
218 /// reader.read_record_buf(&header, &mut record).await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 pub async fn read_record_buf(
223 &mut self,
224 header: &Header,
225 record: &mut RecordBuf,
226 ) -> io::Result<usize> {
227 use crate::io::reader::parse_record_buf;
228
229 self.buf.clear();
230
231 match read_line(&mut self.inner, &mut self.buf).await? {
232 0 => Ok(0),
233 n => {
234 parse_record_buf(&self.buf, header, record)
235 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
236
237 Ok(n)
238 }
239 }
240 }
241
242 /// Reads a single record without eagerly parsing its fields.
243 ///
244 /// The reads VCF record fields from the underlying stream into the given record's buffer until
245 /// a newline is reached. No fields are parsed, meaning the record is not necessarily valid.
246 /// However, the structure of the line is guaranteed to be record-like.
247 ///
248 /// The stream is expected to be directly after the header or at the start of another record.
249 ///
250 /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
251 /// stream reached EOF.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// # #[tokio::main]
257 /// # async fn main() -> std::io::Result<()> {
258 /// use noodles_vcf as vcf;
259 ///
260 /// let data = b"##fileformat=VCFv4.3
261 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
262 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
263 /// ";
264 ///
265 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
266 /// reader.read_header().await?;
267 ///
268 /// let mut record = vcf::Record::default();
269 /// reader.read_record(&mut record).await?;
270 /// # Ok::<_, std::io::Error>(())
271 /// # }
272 /// ```
273 pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
274 read_record(&mut self.inner, &mut self.buf, record).await
275 }
276
277 /// Returns a stream over records.
278 ///
279 /// The (input) stream is expected to be directly after the header or at the start of another
280 /// record.
281 ///
282 /// # Examples
283 ///
284 /// ```
285 /// # #[tokio::main]
286 /// # async fn main() -> std::io::Result<()> {
287 /// use futures::TryStreamExt;
288 /// use noodles_vcf as vcf;
289 ///
290 /// const DATA: &[u8] = b"##fileformat=VCFv4.3
291 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
292 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
293 /// ";
294 ///
295 /// let mut reader = vcf::r#async::io::Reader::new(DATA);
296 /// let header = reader.read_header().await?;
297 ///
298 /// let mut records = reader.records();
299 ///
300 /// while let Some(record) = records.try_next().await? {
301 /// // ...
302 /// }
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
307 Box::pin(stream::try_unfold(self, move |reader| async move {
308 let mut record = Record::default();
309
310 reader.read_record(&mut record).await.map(|n| match n {
311 0 => None,
312 _ => Some((record, reader)),
313 })
314 }))
315 }
316
317 /// Returns an (async) stream over records starting from the current (input) stream position.
318 ///
319 /// The (input) stream is expected to be directly after the header or at the start of another
320 /// record.
321 ///
322 /// Unlike [`Self::read_record`], each record is parsed as a [`Record`].
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// # #[tokio::main]
328 /// # async fn main() -> std::io::Result<()> {
329 /// use futures::TryStreamExt;
330 /// use noodles_vcf as vcf;
331 ///
332 /// let data = b"##fileformat=VCFv4.3
333 /// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
334 /// sq0\t1\t.\tA\t.\t.\tPASS\t.
335 /// ";
336 ///
337 /// let mut reader = vcf::r#async::io::Reader::new(&data[..]);
338 /// let header = reader.read_header().await?;
339 ///
340 /// let mut records = reader.record_bufs(&header);
341 ///
342 /// while let Some(record) = records.try_next().await? {
343 /// // ...
344 /// }
345 /// # Ok(())
346 /// # }
347 /// ```
348 pub fn record_bufs<'r, 'h: 'r>(
349 &'r mut self,
350 header: &'h Header,
351 ) -> impl Stream<Item = io::Result<RecordBuf>> + 'r {
352 Box::pin(stream::try_unfold(self, move |reader| async move {
353 let mut record = RecordBuf::default();
354
355 reader
356 .read_record_buf(header, &mut record)
357 .await
358 .map(|n| match n {
359 0 => None,
360 _ => Some((record, reader)),
361 })
362 }))
363 }
364}
365
366impl<R> Reader<bgzf::AsyncReader<R>>
367where
368 R: AsyncRead + AsyncSeek + Unpin,
369{
370 /// Returns a stream over records that intersects the given region.
371 ///
372 /// The position of the (input) stream is expected to after the header or at the start of
373 /// another record.
374 ///
375 /// # Examples
376 ///
377 /// ```no_run
378 /// # #[tokio::main]
379 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
380 /// use futures::TryStreamExt;
381 /// use noodles_bgzf as bgzf;
382 /// use noodles_core::Region;
383 /// use noodles_tabix as tabix;
384 /// use noodles_vcf as vcf;
385 /// use tokio::fs::File;
386 ///
387 /// let mut reader = File::open("sample.vcf.gz")
388 /// .await
389 /// .map(bgzf::AsyncReader::new)
390 /// .map(vcf::r#async::io::Reader::new)?;
391 ///
392 /// let header = reader.read_header().await?;
393 ///
394 /// let index = tabix::r#async::read("sample.vcf.gz.tbi").await?;
395 /// let region = "sq0:8-13".parse()?;
396 /// let mut query = reader.query(&header, &index, ®ion)?;
397 ///
398 /// while let Some(record) = query.try_next().await? {
399 /// // ...
400 /// }
401 /// # Ok(())
402 /// # }
403 /// ```
404 pub fn query<'r, I>(
405 &'r mut self,
406 header: &'r Header,
407 index: &I,
408 region: &Region,
409 ) -> io::Result<impl Stream<Item = io::Result<Record>> + 'r>
410 where
411 I: BinningIndex,
412 {
413 let (reference_sequence_id, reference_sequence_name) = resolve_region(index, region)?;
414
415 let chunks = index.query(reference_sequence_id, region.interval())?;
416
417 Ok(query(
418 self,
419 chunks,
420 reference_sequence_name,
421 region.interval(),
422 header,
423 ))
424 }
425}
426
427async fn read_line<R>(reader: &mut R, buf: &mut String) -> io::Result<usize>
428where
429 R: AsyncBufRead + Unpin,
430{
431 match reader.read_line(buf).await {
432 Ok(0) => Ok(0),
433 Ok(n) => {
434 if buf.ends_with(LINE_FEED) {
435 buf.pop();
436
437 if buf.ends_with(CARRIAGE_RETURN) {
438 buf.pop();
439 }
440 }
441
442 Ok(n)
443 }
444 Err(e) => Err(e),
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[tokio::test]
453 async fn test_read_line() -> io::Result<()> {
454 async fn t(buf: &mut String, mut data: &[u8], expected: &str) -> io::Result<()> {
455 buf.clear();
456 read_line(&mut data, buf).await?;
457 assert_eq!(buf, expected);
458 Ok(())
459 }
460
461 let mut buf = String::new();
462
463 t(&mut buf, b"noodles\n", "noodles").await?;
464 t(&mut buf, b"noodles\r\n", "noodles").await?;
465 t(&mut buf, b"noodles", "noodles").await?;
466
467 Ok(())
468 }
469}