noodles_cram/async/io/
reader.rs

1//! Async CRAM reader.
2
3mod builder;
4mod container;
5mod crc_reader;
6pub mod header;
7mod num;
8mod query;
9mod records;
10
11use futures::Stream;
12use noodles_core::Region;
13use noodles_fasta as fasta;
14use noodles_sam as sam;
15use tokio::io::{self, AsyncRead, AsyncSeek, AsyncSeekExt, SeekFrom};
16
17pub use self::builder::Builder;
18use self::{container::read_container, crc_reader::CrcReader, header::read_header};
19use crate::{crai, io::reader::Container, FileDefinition};
20
21/// An async CRAM reader.
22pub struct Reader<R> {
23    inner: R,
24    reference_sequence_repository: fasta::Repository,
25}
26
27impl<R> Reader<R> {
28    /// Returns a reference to the underlying reader.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use noodles_cram as cram;
34    /// use tokio::io;
35    /// let reader = cram::r#async::io::Reader::new(io::empty());
36    /// let _inner = reader.get_ref();
37    /// ```
38    pub fn get_ref(&self) -> &R {
39        &self.inner
40    }
41
42    /// Returns a mutable reference to the underlying reader.
43    ///
44    /// # Examples
45    ///
46    /// ```
47    /// use noodles_cram as cram;
48    /// use tokio::io;
49    /// let mut reader = cram::r#async::io::Reader::new(io::empty());
50    /// let _inner = reader.get_mut();
51    /// ```
52    pub fn get_mut(&mut self) -> &mut R {
53        &mut self.inner
54    }
55
56    /// Returns the underlying reader.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// use noodles_cram as cram;
62    /// use tokio::io;
63    /// let reader = cram::r#async::io::Reader::new(io::empty());
64    /// let _inner = reader.into_inner();
65    /// ```
66    pub fn into_inner(self) -> R {
67        self.inner
68    }
69}
70
71impl<R> Reader<R>
72where
73    R: AsyncRead + Unpin,
74{
75    /// Creates an async CRAM reader.
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use noodles_cram as cram;
81    /// use tokio::io;
82    /// let reader = cram::r#async::io::Reader::new(io::empty());
83    /// ```
84    pub fn new(inner: R) -> Self {
85        Builder::default().build_from_reader(inner)
86    }
87
88    /// Returns an async CRAM header reader.
89    ///
90    /// # Examples
91    ///
92    /// ```no_run
93    /// # #[tokio::main]
94    /// # async fn main() -> tokio::io::Result<()> {
95    /// use noodles_cram as cram;
96    /// use tokio::{fs::File, io::AsyncReadExt};
97    ///
98    /// let mut reader = File::open("sample.cram")
99    ///     .await
100    ///     .map(cram::r#async::io::Reader::new)?;
101    ///
102    /// let mut header_reader = reader.header_reader();
103    /// header_reader.read_magic_number().await?;
104    /// header_reader.read_format_version().await?;
105    /// header_reader.read_file_id().await?;
106    ///
107    /// let mut container_reader = header_reader.container_reader().await?;
108    ///
109    /// let _raw_header = {
110    ///     let mut raw_sam_header_reader = container_reader.raw_sam_header_reader().await?;
111    ///     let mut raw_header = String::new();
112    ///     raw_sam_header_reader.read_to_string(&mut raw_header).await?;
113    ///     raw_sam_header_reader.discard_to_end().await?;
114    ///     raw_header
115    /// };
116    ///
117    /// container_reader.discard_to_end().await?;
118    /// # Ok(())
119    /// # }
120    /// ```
121    pub fn header_reader(&mut self) -> header::Reader<&mut R> {
122        header::Reader::new(&mut self.inner)
123    }
124
125    /// Reads the CRAM file definition.
126    ///
127    /// This also checks the magic number.
128    ///
129    /// The position of the stream is expected to be at the start.
130    ///
131    /// # Examples
132    ///
133    /// ```no_run
134    /// # #[tokio::main]
135    /// # async fn main() -> tokio::io::Result<()> {
136    /// use noodles_cram as cram;
137    /// use tokio::fs::File;
138    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
139    /// let file_definition = reader.read_file_definition().await?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub async fn read_file_definition(&mut self) -> io::Result<FileDefinition> {
144        header::read_file_definition(&mut self.inner).await
145    }
146
147    /// Reads the SAM header.
148    ///
149    /// The position of the stream is expected to be at the CRAM header container, i.e., directly
150    /// after the file definition.
151    ///
152    /// # Examples
153    ///
154    /// ```no_run
155    /// # #[tokio::main]
156    /// # async fn main() -> tokio::io::Result<()> {
157    /// use noodles_cram as cram;
158    /// use tokio::fs::File;
159    ///
160    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
161    /// reader.read_file_definition().await?;
162    ///
163    /// let header = reader.read_file_header().await?;
164    /// # Ok(())
165    /// # }
166    /// ```
167    pub async fn read_file_header(&mut self) -> io::Result<sam::Header> {
168        header::read_file_header(&mut self.inner).await
169    }
170
171    /// Reads the SAM header.
172    ///
173    /// This verifies the CRAM magic number, discards the file definition, and reads and parses the
174    /// file header as a SAM header.
175    ///
176    /// The position of the stream is expected to be at the start.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// # #[tokio::main]
182    /// # async fn main() -> tokio::io::Result<()> {
183    /// use noodles_cram as cram;
184    /// use tokio::fs::File;
185    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
186    /// let _header = reader.read_header().await?;
187    /// # Ok(())
188    /// # }
189    /// ```
190    pub async fn read_header(&mut self) -> io::Result<sam::Header> {
191        read_header(&mut self.inner).await
192    }
193
194    /// Reads a container.
195    ///
196    /// This returns `None` if the container header is the EOF container header, which signals the
197    /// end of the stream.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// # #[tokio::main]
203    /// # async fn main() -> tokio::io::Result<()> {
204    /// use noodles_cram::{self as cram, io::reader::Container};
205    /// use tokio::fs::File;
206    ///
207    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
208    /// reader.read_header().await?;
209    ///
210    /// let mut container = Container::default();
211    ///
212    /// while reader.read_container(&mut container).await? != 0 {
213    ///     // ...
214    /// }
215    /// # Ok(())
216    /// # }
217    /// ```
218    pub async fn read_container(&mut self, container: &mut Container) -> io::Result<usize> {
219        read_container(&mut self.inner, container).await
220    }
221
222    /// Reads a container.
223    #[deprecated(since = "0.78.0", note = "Use `Reader::read_container` instead.")]
224    pub async fn read_data_container(&mut self) -> io::Result<Option<Container>> {
225        let mut container = Container::default();
226
227        read_container(&mut self.inner, &mut container)
228            .await
229            .map(|n| match n {
230                0 => None,
231                _ => Some(container),
232            })
233    }
234
235    /// Returns an (async) stream over records starting from the current (input) stream position.
236    ///
237    /// The (input) stream position is expected to be at the start of a container.
238    ///
239    /// # Examples
240    ///
241    /// ```no_run
242    /// # #[tokio::main]
243    /// # async fn main() -> tokio::io::Result<()> {
244    /// use futures::TryStreamExt;
245    /// use noodles_cram as cram;
246    /// use tokio::fs::File;
247    ///
248    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
249    /// let header = reader.read_header().await?;
250    ///
251    /// let mut records = reader.records(&header);
252    ///
253    /// while let Some(record) = records.try_next().await? {
254    ///     // ...
255    /// }
256    /// # Ok(())
257    /// # }
258    /// ```
259    pub fn records<'r, 'h: 'r>(
260        &'r mut self,
261        header: &'h sam::Header,
262    ) -> impl Stream<Item = io::Result<sam::alignment::RecordBuf>> + 'r {
263        use self::records::records;
264
265        records(self, header)
266    }
267}
268
269impl<R> Reader<R>
270where
271    R: AsyncRead + AsyncSeek + Unpin,
272{
273    /// Seeks the underlying reader to the given position.
274    ///
275    /// Positions typically come from an associated CRAM index file.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// # #[tokio::main]
281    /// # async fn main() -> tokio::io::Result<()> {
282    /// use noodles_cram as cram;
283    /// use tokio::io::{self, SeekFrom};
284    /// let mut reader = cram::r#async::io::Reader::new(io::empty());
285    /// reader.seek(SeekFrom::Start(0)).await?;
286    /// # Ok(())
287    /// # }
288    /// ```
289    pub async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
290        self.inner.seek(pos).await
291    }
292
293    /// Returns the current position of the underlying reader.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// # #[tokio::main]
299    /// # async fn main() -> tokio::io::Result<()> {
300    /// use tokio::io;
301    /// use noodles_cram as cram;
302    /// let mut reader = cram::r#async::io::Reader::new(io::empty());
303    /// assert_eq!(reader.position().await?, 0);
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn position(&mut self) -> io::Result<u64> {
308        self.inner.stream_position().await
309    }
310
311    /// Returns a stream over records that intersects the given region.
312    ///
313    /// # Examples
314    ///
315    /// ```no_run
316    /// # #[tokio::main]
317    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
318    /// use futures::TryStreamExt;
319    /// use noodles_core::Region;
320    /// use noodles_cram::{self as cram, crai};
321    /// use tokio::fs::File;
322    ///
323    /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
324    /// let header = reader.read_header().await?;
325    ///
326    /// let index = crai::r#async::read("sample.cram.crai").await?;
327    /// let region = "sq0:8-13".parse()?;
328    /// let mut query = reader.query(&header, &index, &region)?;
329    ///
330    /// while let Some(record) = query.try_next().await? {
331    ///     // ...
332    /// }
333    /// # Ok(())
334    /// # }
335    /// ```
336    pub fn query<'r, 'h: 'r, 'i: 'r>(
337        &'r mut self,
338        header: &'h sam::Header,
339        index: &'i crai::Index,
340        region: &Region,
341    ) -> io::Result<impl Stream<Item = io::Result<sam::alignment::RecordBuf>> + 'r> {
342        use self::query::query;
343
344        let reference_sequence_id = header
345            .reference_sequences()
346            .get_index_of(region.name())
347            .ok_or_else(|| {
348                io::Error::new(
349                    io::ErrorKind::InvalidInput,
350                    "invalid reference sequence name",
351                )
352            })?;
353
354        Ok(query(
355            self,
356            header,
357            index,
358            reference_sequence_id,
359            region.interval(),
360        ))
361    }
362}