noodles_sam/async/io/reader.rs
1pub mod header;
2mod query;
3mod record;
4mod record_buf;
5
6use futures::{stream, Stream, TryStreamExt};
7use noodles_bgzf as bgzf;
8use noodles_core::Region;
9use noodles_csi::BinningIndex;
10use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeek};
11
12use self::{header::read_header, query::query, record::read_record, record_buf::read_record_buf};
13use crate::{alignment::RecordBuf, Header, Record};
14
15/// An async SAM reader.
16pub struct Reader<R> {
17 inner: R,
18 buf: Vec<u8>,
19}
20
21impl<R> Reader<R> {
22 /// Returns a reference to the underlying reader.
23 ///
24 /// # Examples
25 ///
26 /// ```
27 /// use noodles_sam as sam;
28 /// use tokio::io;
29 /// let reader = sam::r#async::io::Reader::new(io::empty());
30 /// let _inner = reader.get_ref();
31 /// ```
32 pub fn get_ref(&self) -> &R {
33 &self.inner
34 }
35
36 /// Returns a mutable reference to the underlying reader.
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// use noodles_sam as sam;
42 /// use tokio::io;
43 /// let mut reader = sam::r#async::io::Reader::new(io::empty());
44 /// let _inner = reader.get_mut();
45 /// ```
46 pub fn get_mut(&mut self) -> &mut R {
47 &mut self.inner
48 }
49
50 /// Returns the underlying reader.
51 ///
52 /// # Examples
53 ///
54 /// ```
55 /// use noodles_sam as sam;
56 /// use tokio::io;
57 /// let reader = sam::r#async::io::Reader::new(io::empty());
58 /// let _inner = reader.into_inner();
59 /// ```
60 pub fn into_inner(self) -> R {
61 self.inner
62 }
63}
64
65impl<R> Reader<R>
66where
67 R: AsyncBufRead + Unpin,
68{
69 /// Creates an async SAM reader.
70 ///
71 /// # Examples
72 ///
73 /// ```
74 /// use noodles_sam as sam;
75 /// use tokio::io;
76 /// let reader = sam::r#async::io::Reader::new(io::empty());
77 /// ```
78 pub fn new(inner: R) -> Self {
79 Self {
80 inner,
81 buf: Vec::new(),
82 }
83 }
84
85 /// Returns a SAM header reader.
86 ///
87 /// This creates an adapter that reads at most the length of the header, i.e., all lines
88 /// prefixed with a `@` (at sign).
89 ///
90 /// It is more ergonomic to read and parse the header using [`Self::read_header`], but using
91 /// this adapter allows for control of how the header is read, e.g., to read the raw SAM
92 /// header.
93 ///
94 /// The position of the stream is expected to be at the start.
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// # #[tokio::main]
100 /// # async fn main() -> tokio::io::Result<()> {
101 /// use noodles_sam as sam;
102 /// use tokio::io::AsyncReadExt;
103 ///
104 /// let data = b"@HD\tVN:1.6
105 /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
106 /// ";
107 ///
108 /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
109 /// let mut header_reader = reader.header_reader();
110 ///
111 /// let mut raw_header = String::new();
112 /// header_reader.read_to_string(&mut raw_header).await?;
113 ///
114 /// assert_eq!(raw_header, "@HD\tVN:1.6\n");
115 /// # Ok(())
116 /// # }
117 /// ```
118 pub fn header_reader(&mut self) -> header::Reader<&mut R> {
119 header::Reader::new(&mut self.inner)
120 }
121
122 /// Reads the SAM header.
123 ///
124 /// The position of the stream is expected to be at the start.
125 ///
126 /// The SAM header is optional, and if it is missing, an empty [`Header`] is returned.
127 ///
128 /// # Examples
129 ///
130 /// ```
131 /// # use std::io;
132 /// #
133 /// # #[tokio::main]
134 /// # async fn main() -> io::Result<()> {
135 /// use noodles_sam as sam;
136 ///
137 /// let data = b"@HD\tVN:1.6
138 /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
139 /// ";
140 ///
141 /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
142 /// let header = reader.read_header().await?;
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub async fn read_header(&mut self) -> io::Result<Header> {
147 read_header(&mut self.inner).await
148 }
149
150 /// Reads a record into an alignment record buffer.
151 ///
152 /// This reads a line from the underlying stream until a newline is reached and parses that
153 /// line into the given record.
154 ///
155 /// The stream is expected to be directly after the header or at the start of another record.
156 ///
157 /// It is more ergonomic to read records using an iterator (see [`Self::records`]), but using
158 /// this method directly allows reuse of a [`RecordBuf`].
159 ///
160 /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
161 /// stream reached EOF.
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// # #[tokio::main]
167 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
168 /// use noodles_sam::{self as sam, alignment::RecordBuf};
169 ///
170 /// let data = b"@HD\tVN:1.6
171 /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
172 /// ";
173 ///
174 /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
175 /// let header = reader.read_header().await?;
176 ///
177 /// let mut record = RecordBuf::default();
178 /// reader.read_record_buf(&header, &mut record).await?;
179 ///
180 /// assert_eq!(record, RecordBuf::default());
181 /// # Ok(())
182 /// # }
183 /// ```
184 pub async fn read_record_buf(
185 &mut self,
186 header: &Header,
187 record: &mut RecordBuf,
188 ) -> io::Result<usize> {
189 read_record_buf(&mut self.inner, &mut self.buf, header, record).await
190 }
191
192 /// Returns an (async) stream over alignment record buffers starting from the current (input)
193 /// stream position.
194 ///
195 /// The (input) stream is expected to be directly after the header or at the start of another
196 /// record.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// # #[tokio::main]
202 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
203 /// use futures::TryStreamExt;
204 /// use noodles_sam as sam;
205 ///
206 /// let data = b"@HD\tVN:1.6
207 /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
208 /// ";
209 ///
210 /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
211 /// let header = reader.read_header().await?;
212 ///
213 /// let mut records = reader.record_bufs(&header);
214 ///
215 /// while let Some(record) = records.try_next().await? {
216 /// // ...
217 /// }
218 /// # Ok(())
219 /// # }
220 /// ```
221 pub fn record_bufs<'a>(
222 &'a mut self,
223 header: &'a Header,
224 ) -> impl Stream<Item = io::Result<RecordBuf>> + 'a {
225 Box::pin(stream::try_unfold(
226 (self, RecordBuf::default()),
227 |(reader, mut record)| async {
228 match reader.read_record_buf(header, &mut record).await? {
229 0 => Ok(None),
230 _ => Ok(Some((record.clone(), (reader, record)))),
231 }
232 },
233 ))
234 }
235
236 /// Reads a record.
237 ///
238 /// This reads SAM fields from the underlying stream into the given record's buffer until a
239 /// newline is reached. No fields are decoded, meaning the record is not necessarily valid.
240 /// However, the structure of the buffer is guaranteed to be record-like.
241 ///
242 /// The stream is expected to be directly after the header or at the start of another record.
243 ///
244 /// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
245 /// stream reached EOF.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// # #[tokio::main]
251 /// # async fn main() -> std::io::Result<()> {
252 /// use noodles_sam as sam;
253 ///
254 /// let data = b"@HD\tVN:1.6
255 /// *\t4\t*\t0\t255\t*\t*\t0\t0\t*\t*
256 /// ";
257 ///
258 /// let mut reader = sam::r#async::io::Reader::new(&data[..]);
259 /// let header = reader.read_header().await?;
260 ///
261 /// let mut record = sam::Record::default();
262 /// reader.read_record(&mut record).await?;
263 ///
264 /// assert_eq!(record, sam::Record::default());
265 /// # Ok(())
266 /// # }
267 /// ```
268 pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
269 read_record(&mut self.inner, &mut self.buf, record).await
270 }
271
272 /// Returns an (async) stream over records.
273 ///
274 /// The (input) stream is expected to be directly after the header or at the start of a record.
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// # use tokio::io;
280 /// #
281 /// # #[tokio::main]
282 /// # async fn main() -> io::Result<()> {
283 /// use futures::TryStreamExt;
284 /// use noodles_sam as sam;
285 ///
286 /// let mut reader = sam::r#async::io::Reader::new(io::empty());
287 /// let header = reader.read_header().await?;
288 ///
289 /// let mut records = reader.records();
290 ///
291 /// while let Some(record) = records.try_next().await? {
292 /// // ...
293 /// }
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
298 Box::pin(stream::try_unfold(
299 (self, Record::default()),
300 |(reader, mut record)| async {
301 match reader.read_record(&mut record).await? {
302 0 => Ok(None),
303 _ => Ok(Some((record.clone(), (reader, record)))),
304 }
305 },
306 ))
307 }
308}
309
310impl<R> Reader<bgzf::AsyncReader<R>>
311where
312 R: AsyncRead + AsyncSeek + Unpin,
313{
314 // Seeks to the first record by setting the cursor to the beginning of the stream and
315 // (re)reading the header.
316 async fn seek_to_first_record(&mut self) -> io::Result<bgzf::VirtualPosition> {
317 self.get_mut()
318 .seek(bgzf::VirtualPosition::default())
319 .await?;
320
321 self.read_header().await?;
322
323 Ok(self.get_ref().virtual_position())
324 }
325
326 /// Returns a stream over records that intersect the given region.
327 ///
328 /// To query for unmapped records, use [`Self::query_unmapped`].
329 ///
330 /// # Examples
331 ///
332 /// ```no_run
333 /// # #[tokio::main]
334 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
335 /// use futures::TryStreamExt;
336 /// use noodles_bgzf as bgzf;
337 /// use noodles_csi as csi;
338 /// use noodles_sam as sam;
339 /// use tokio::fs::File;
340 ///
341 /// let mut reader = File::open("sample.sam")
342 /// .await
343 /// .map(bgzf::AsyncReader::new)
344 /// .map(sam::r#async::io::Reader::new)?;
345 ///
346 /// let header = reader.read_header().await?;
347 ///
348 /// let index = csi::r#async::read("sample.sam.csi").await?;
349 /// let region = "sq0:8-13".parse()?;
350 /// let mut query = reader.query(&header, &index, ®ion)?;
351 ///
352 /// while let Some(record) = query.try_next().await? {
353 /// // ...
354 /// }
355 /// # Ok(())
356 /// # }
357 /// ```
358 pub fn query<'r, 'h: 'r, I>(
359 &'r mut self,
360 header: &'h Header,
361 index: &I,
362 region: &Region,
363 ) -> io::Result<impl Stream<Item = io::Result<Record>> + 'r>
364 where
365 I: BinningIndex,
366 {
367 use crate::io::reader::resolve_region;
368
369 let reference_sequence_id = resolve_region(header.reference_sequences(), region)?;
370 let chunks = index.query(reference_sequence_id, region.interval())?;
371
372 Ok(query(
373 self,
374 chunks,
375 header,
376 reference_sequence_id,
377 region.interval(),
378 ))
379 }
380
381 /// Returns an iterator of unmapped records after querying for the unmapped region.
382 ///
383 ///
384 /// # Examples
385 ///
386 /// ```no_run
387 /// # #[tokio::main]
388 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
389 /// use futures::TryStreamExt;
390 /// use noodles_bgzf as bgzf;
391 /// use noodles_csi as csi;
392 /// use noodles_sam as sam;
393 /// use tokio::fs::File;
394 ///
395 /// let mut reader = File::open("sample.sam")
396 /// .await
397 /// .map(bgzf::AsyncReader::new)
398 /// .map(sam::r#async::io::Reader::new)?;
399 ///
400 /// let index = csi::r#async::read("sample.sam.csi").await?;
401 /// let mut query = reader.query_unmapped(&index).await?;
402 ///
403 /// while let Some(record) = query.try_next().await? {
404 /// // ...
405 /// }
406 /// # Ok(())
407 /// # }
408 /// ```
409 pub async fn query_unmapped<I>(
410 &mut self,
411 index: &I,
412 ) -> io::Result<impl Stream<Item = io::Result<Record>> + '_>
413 where
414 I: BinningIndex,
415 {
416 if let Some(pos) = index.last_first_record_start_position() {
417 self.get_mut().seek(pos).await?;
418 } else {
419 self.seek_to_first_record().await?;
420 }
421
422 Ok(Box::pin(stream::try_unfold(
423 self.records(),
424 |mut records| async {
425 loop {
426 match records.try_next().await? {
427 Some(record) => {
428 if record.flags()?.is_unmapped() {
429 return Ok(Some((record, records)));
430 }
431 }
432 None => return Ok(None),
433 }
434 }
435 },
436 )))
437 }
438}
439
440async fn read_line<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<usize>
441where
442 R: AsyncBufRead + Unpin,
443{
444 const LINE_FEED: u8 = b'\n';
445 const CARRIAGE_RETURN: u8 = b'\r';
446
447 match reader.read_until(LINE_FEED, buf).await? {
448 0 => Ok(0),
449 n => {
450 if buf.ends_with(&[LINE_FEED]) {
451 buf.pop();
452
453 if buf.ends_with(&[CARRIAGE_RETURN]) {
454 buf.pop();
455 }
456 }
457
458 Ok(n)
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[tokio::test]
468 async fn test_read_line() -> io::Result<()> {
469 async fn t(buf: &mut Vec<u8>, mut data: &[u8], expected: &[u8]) -> io::Result<()> {
470 buf.clear();
471 read_line(&mut data, buf).await?;
472 assert_eq!(buf, expected);
473 Ok(())
474 }
475
476 let mut buf = Vec::new();
477
478 t(&mut buf, b"noodles\n", b"noodles").await?;
479 t(&mut buf, b"noodles\r\n", b"noodles").await?;
480 t(&mut buf, b"noodles", b"noodles").await?;
481
482 Ok(())
483 }
484}