noodles_sam/async/io/
reader.rs

1pub mod header;
2mod query;
3mod record;
4mod record_buf;
5
6use futures::{stream, Stream, TryStreamExt};
7use noodles_bgzf as bgzf;
8use noodles_core::Region;
9use noodles_csi::BinningIndex;
10use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeek};
11
12use self::{header::read_header, query::query, record::read_record, record_buf::read_record_buf};
13use crate::{alignment::RecordBuf, Header, Record};
14
15/// An async SAM reader.
16pub struct Reader<R> {
17    inner: R,
18    buf: Vec<u8>,
19}
20
21impl<R> Reader<R> {
22    /// Returns a reference to the underlying reader.
23    ///
24    /// # Examples
25    ///
26    /// ```
27    /// use noodles_sam as sam;
28    /// use tokio::io;
29    /// let reader = sam::r#async::io::Reader::new(io::empty());
30    /// let _inner = reader.get_ref();
31    /// ```
32    pub fn get_ref(&self) -> &R {
33        &self.inner
34    }
35
36    /// Returns a mutable reference to the underlying reader.
37    ///
38    /// # Examples
39    ///
40    /// ```
41    /// use noodles_sam as sam;
42    /// use tokio::io;
43    /// let mut reader = sam::r#async::io::Reader::new(io::empty());
44    /// let _inner = reader.get_mut();
45    /// ```
46    pub fn get_mut(&mut self) -> &mut R {
47        &mut self.inner
48    }
49
50    /// Returns the underlying reader.
51    ///
52    /// # Examples
53    ///
54    /// ```
55    /// use noodles_sam as sam;
56    /// use tokio::io;
57    /// let reader = sam::r#async::io::Reader::new(io::empty());
58    /// let _inner = reader.into_inner();
59    /// ```
60    pub fn into_inner(self) -> R {
61        self.inner
62    }
63}
64
65impl<R> Reader<R>
66where
67    R: AsyncBufRead + Unpin,
68{
69    /// Creates an async SAM reader.
70    ///
71    /// # Examples
72    ///
73    /// ```
74    /// use noodles_sam as sam;
75    /// use tokio::io;
76    /// let reader = sam::r#async::io::Reader::new(io::empty());
77    /// ```
78    pub fn new(inner: R) -> Self {
79        Self {
80            inner,
81            buf: Vec::new(),
82        }
83    }
84
85    /// Returns a SAM header reader.
86    ///
87    /// This creates an adapter that reads at most the length of the header, i.e., all lines
88    /// prefixed with a `@` (at sign).
89    ///
90    /// It is more ergonomic to read and parse the header using [`Self::read_header`], but using
91    /// this adapter allows for control of how the header is read, e.g., to read the raw SAM
92    /// header.
93    ///
94    /// The position of the stream is expected to be at the start.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// # #[tokio::main]
100    /// # async fn main() -> tokio::io::Result<()> {
101    /// use noodles_sam as sam;
102    /// use tokio::io::AsyncReadExt;
103    ///
104    /// let data = b"@HD\tVN:1.6
105    /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
106    /// ";
107    ///
108    /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
109    /// let mut header_reader = reader.header_reader();
110    ///
111    /// let mut raw_header = String::new();
112    /// header_reader.read_to_string(&mut raw_header).await?;
113    ///
114    /// assert_eq!(raw_header, "@HD\tVN:1.6\n");
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub fn header_reader(&mut self) -> header::Reader<&mut R> {
119        header::Reader::new(&mut self.inner)
120    }
121
122    /// Reads the SAM header.
123    ///
124    /// The position of the stream is expected to be at the start.
125    ///
126    /// The SAM header is optional, and if it is missing, an empty [`Header`] is returned.
127    ///
128    /// # Examples
129    ///
130    /// ```
131    /// # use std::io;
132    /// #
133    /// # #[tokio::main]
134    /// # async fn main() -> io::Result<()> {
135    /// use noodles_sam as sam;
136    ///
137    /// let data = b"@HD\tVN:1.6
138    /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
139    /// ";
140    ///
141    /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
142    /// let header = reader.read_header().await?;
143    /// # Ok(())
144    /// # }
145    /// ```
146    pub async fn read_header(&mut self) -> io::Result<Header> {
147        read_header(&mut self.inner).await
148    }
149
150    /// Reads a record into an alignment record buffer.
151    ///
152    /// This reads a line from the underlying stream until a newline is reached and parses that
153    /// line into the given record.
154    ///
155    /// The stream is expected to be directly after the header or at the start of another record.
156    ///
157    /// It is more ergonomic to read records using an iterator (see [`Self::records`]), but using
158    /// this method directly allows reuse of a [`RecordBuf`].
159    ///
160    /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
161    /// stream reached EOF.
162    ///
163    /// # Examples
164    ///
165    /// ```
166    /// # #[tokio::main]
167    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
168    /// use noodles_sam::{self as sam, alignment::RecordBuf};
169    ///
170    /// let data = b"@HD\tVN:1.6
171    /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
172    /// ";
173    ///
174    /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
175    /// let header = reader.read_header().await?;
176    ///
177    /// let mut record = RecordBuf::default();
178    /// reader.read_record_buf(&header, &mut record).await?;
179    ///
180    /// assert_eq!(record, RecordBuf::default());
181    /// # Ok(())
182    /// # }
183    /// ```
184    pub async fn read_record_buf(
185        &mut self,
186        header: &Header,
187        record: &mut RecordBuf,
188    ) -> io::Result<usize> {
189        read_record_buf(&mut self.inner, &mut self.buf, header, record).await
190    }
191
192    /// Returns an (async) stream over alignment record buffers starting from the current (input)
193    /// stream position.
194    ///
195    /// The (input) stream is expected to be directly after the header or at the start of another
196    /// record.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// # #[tokio::main]
202    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
203    /// use futures::TryStreamExt;
204    /// use noodles_sam as sam;
205    ///
206    /// let data = b"@HD\tVN:1.6
207    /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
208    /// ";
209    ///
210    /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
211    /// let header = reader.read_header().await?;
212    ///
213    /// let mut records = reader.record_bufs(&header);
214    ///
215    /// while let Some(record) = records.try_next().await? {
216    ///     // ...
217    /// }
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub fn record_bufs<'a>(
222        &'a mut self,
223        header: &'a Header,
224    ) -> impl Stream<Item = io::Result<RecordBuf>> + 'a {
225        Box::pin(stream::try_unfold(
226            (self, RecordBuf::default()),
227            |(reader, mut record)| async {
228                match reader.read_record_buf(header, &mut record).await? {
229                    0 => Ok(None),
230                    _ => Ok(Some((record.clone(), (reader, record)))),
231                }
232            },
233        ))
234    }
235
236    /// Reads a record.
237    ///
238    /// This reads SAM fields from the underlying stream into the given record's buffer until a
239    /// newline is reached. No fields are decoded, meaning the record is not necessarily valid.
240    /// However, the structure of the buffer is guaranteed to be record-like.
241    ///
242    /// The stream is expected to be directly after the header or at the start of another record.
243    ///
244    /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
245    /// stream reached EOF.
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// # #[tokio::main]
251    /// # async fn main() -> std::io::Result<()> {
252    /// use noodles_sam as sam;
253    ///
254    /// let data = b"@HD\tVN:1.6
255    /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
256    /// ";
257    ///
258    /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
259    /// let header = reader.read_header().await?;
260    ///
261    /// let mut record = sam::Record::default();
262    /// reader.read_record(&mut record).await?;
263    ///
264    /// assert_eq!(record, sam::Record::default());
265    /// # Ok(())
266    /// # }
267    /// ```
268    pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
269        read_record(&mut self.inner, &mut self.buf, record).await
270    }
271
272    /// Returns an (async) stream over records.
273    ///
274    /// The (input) stream is expected to be directly after the header or at the start of a record.
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// # use tokio::io;
280    /// #
281    /// # #[tokio::main]
282    /// # async fn main() -> io::Result<()> {
283    /// use futures::TryStreamExt;
284    /// use noodles_sam as sam;
285    ///
286    /// let mut reader = sam::r#async::io::Reader::new(io::empty());
287    /// let header = reader.read_header().await?;
288    ///
289    /// let mut records = reader.records();
290    ///
291    /// while let Some(record) = records.try_next().await? {
292    ///     // ...
293    /// }
294    /// # Ok(())
295    /// # }
296    /// ```
297    pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
298        Box::pin(stream::try_unfold(
299            (self, Record::default()),
300            |(reader, mut record)| async {
301                match reader.read_record(&mut record).await? {
302                    0 => Ok(None),
303                    _ => Ok(Some((record.clone(), (reader, record)))),
304                }
305            },
306        ))
307    }
308}
309
310impl<R> Reader<bgzf::AsyncReader<R>>
311where
312    R: AsyncRead + AsyncSeek + Unpin,
313{
314    // Seeks to the first record by setting the cursor to the beginning of the stream and
315    // (re)reading the header.
316    async fn seek_to_first_record(&mut self) -> io::Result<bgzf::VirtualPosition> {
317        self.get_mut()
318            .seek(bgzf::VirtualPosition::default())
319            .await?;
320
321        self.read_header().await?;
322
323        Ok(self.get_ref().virtual_position())
324    }
325
326    /// Returns a stream over records that intersect the given region.
327    ///
328    /// To query for unmapped records, use [`Self::query_unmapped`].
329    ///
330    /// # Examples
331    ///
332    /// ```no_run
333    /// # #[tokio::main]
334    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
335    /// use futures::TryStreamExt;
336    /// use noodles_bgzf as bgzf;
337    /// use noodles_csi as csi;
338    /// use noodles_sam as sam;
339    /// use tokio::fs::File;
340    ///
341    /// let mut reader = File::open("sample.sam")
342    ///     .await
343    ///     .map(bgzf::AsyncReader::new)
344    ///     .map(sam::r#async::io::Reader::new)?;
345    ///
346    /// let header = reader.read_header().await?;
347    ///
348    /// let index = csi::r#async::read("sample.sam.csi").await?;
349    /// let region = "sq0:8-13".parse()?;
350    /// let mut query = reader.query(&header, &index, &region)?;
351    ///
352    /// while let Some(record) = query.try_next().await? {
353    ///     // ...
354    /// }
355    /// # Ok(())
356    /// # }
357    /// ```
358    pub fn query<'r, 'h: 'r, I>(
359        &'r mut self,
360        header: &'h Header,
361        index: &I,
362        region: &Region,
363    ) -> io::Result<impl Stream<Item = io::Result<Record>> + 'r>
364    where
365        I: BinningIndex,
366    {
367        use crate::io::reader::resolve_region;
368
369        let reference_sequence_id = resolve_region(header.reference_sequences(), region)?;
370        let chunks = index.query(reference_sequence_id, region.interval())?;
371
372        Ok(query(
373            self,
374            chunks,
375            header,
376            reference_sequence_id,
377            region.interval(),
378        ))
379    }
380
381    /// Returns an iterator of unmapped records after querying for the unmapped region.
382    ///
383    ///
384    /// # Examples
385    ///
386    /// ```no_run
387    /// # #[tokio::main]
388    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
389    /// use futures::TryStreamExt;
390    /// use noodles_bgzf as bgzf;
391    /// use noodles_csi as csi;
392    /// use noodles_sam as sam;
393    /// use tokio::fs::File;
394    ///
395    /// let mut reader = File::open("sample.sam")
396    ///     .await
397    ///     .map(bgzf::AsyncReader::new)
398    ///     .map(sam::r#async::io::Reader::new)?;
399    ///
400    /// let index = csi::r#async::read("sample.sam.csi").await?;
401    /// let mut query = reader.query_unmapped(&index).await?;
402    ///
403    /// while let Some(record) = query.try_next().await? {
404    ///     // ...
405    /// }
406    /// # Ok(())
407    /// # }
408    /// ```
409    pub async fn query_unmapped<I>(
410        &mut self,
411        index: &I,
412    ) -> io::Result<impl Stream<Item = io::Result<Record>> + '_>
413    where
414        I: BinningIndex,
415    {
416        if let Some(pos) = index.last_first_record_start_position() {
417            self.get_mut().seek(pos).await?;
418        } else {
419            self.seek_to_first_record().await?;
420        }
421
422        Ok(Box::pin(stream::try_unfold(
423            self.records(),
424            |mut records| async {
425                loop {
426                    match records.try_next().await? {
427                        Some(record) => {
428                            if record.flags()?.is_unmapped() {
429                                return Ok(Some((record, records)));
430                            }
431                        }
432                        None => return Ok(None),
433                    }
434                }
435            },
436        )))
437    }
438}
439
440async fn read_line<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<usize>
441where
442    R: AsyncBufRead + Unpin,
443{
444    const LINE_FEED: u8 = b'\n';
445    const CARRIAGE_RETURN: u8 = b'\r';
446
447    match reader.read_until(LINE_FEED, buf).await? {
448        0 => Ok(0),
449        n => {
450            if buf.ends_with(&[LINE_FEED]) {
451                buf.pop();
452
453                if buf.ends_with(&[CARRIAGE_RETURN]) {
454                    buf.pop();
455                }
456            }
457
458            Ok(n)
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[tokio::test]
468    async fn test_read_line() -> io::Result<()> {
469        async fn t(buf: &mut Vec<u8>, mut data: &[u8], expected: &[u8]) -> io::Result<()> {
470            buf.clear();
471            read_line(&mut data, buf).await?;
472            assert_eq!(buf, expected);
473            Ok(())
474        }
475
476        let mut buf = Vec::new();
477
478        t(&mut buf, b"noodles\n", b"noodles").await?;
479        t(&mut buf, b"noodles\r\n", b"noodles").await?;
480        t(&mut buf, b"noodles", b"noodles").await?;
481
482        Ok(())
483    }
484}