noodles_cram/async/io/reader.rs
1//! Async CRAM reader.
2
3mod builder;
4mod container;
5mod crc_reader;
6pub mod header;
7mod num;
8mod query;
9mod records;
10
11use futures::Stream;
12use noodles_core::Region;
13use noodles_fasta as fasta;
14use noodles_sam as sam;
15use tokio::io::{self, AsyncRead, AsyncSeek, AsyncSeekExt, SeekFrom};
16
17pub use self::builder::Builder;
18use self::{container::read_container, crc_reader::CrcReader, header::read_header};
19use crate::{crai, io::reader::Container, FileDefinition};
20
21/// An async CRAM reader.
22pub struct Reader<R> {
23 inner: R,
24 reference_sequence_repository: fasta::Repository,
25}
26
27impl<R> Reader<R> {
28 /// Returns a reference to the underlying reader.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use noodles_cram as cram;
34 /// use tokio::io;
35 /// let reader = cram::r#async::io::Reader::new(io::empty());
36 /// let _inner = reader.get_ref();
37 /// ```
38 pub fn get_ref(&self) -> &R {
39 &self.inner
40 }
41
42 /// Returns a mutable reference to the underlying reader.
43 ///
44 /// # Examples
45 ///
46 /// ```
47 /// use noodles_cram as cram;
48 /// use tokio::io;
49 /// let mut reader = cram::r#async::io::Reader::new(io::empty());
50 /// let _inner = reader.get_mut();
51 /// ```
52 pub fn get_mut(&mut self) -> &mut R {
53 &mut self.inner
54 }
55
56 /// Returns the underlying reader.
57 ///
58 /// # Examples
59 ///
60 /// ```
61 /// use noodles_cram as cram;
62 /// use tokio::io;
63 /// let reader = cram::r#async::io::Reader::new(io::empty());
64 /// let _inner = reader.into_inner();
65 /// ```
66 pub fn into_inner(self) -> R {
67 self.inner
68 }
69}
70
71impl<R> Reader<R>
72where
73 R: AsyncRead + Unpin,
74{
75 /// Creates an async CRAM reader.
76 ///
77 /// # Examples
78 ///
79 /// ```
80 /// use noodles_cram as cram;
81 /// use tokio::io;
82 /// let reader = cram::r#async::io::Reader::new(io::empty());
83 /// ```
84 pub fn new(inner: R) -> Self {
85 Builder::default().build_from_reader(inner)
86 }
87
88 /// Returns an async CRAM header reader.
89 ///
90 /// # Examples
91 ///
92 /// ```no_run
93 /// # #[tokio::main]
94 /// # async fn main() -> tokio::io::Result<()> {
95 /// use noodles_cram as cram;
96 /// use tokio::{fs::File, io::AsyncReadExt};
97 ///
98 /// let mut reader = File::open("sample.cram")
99 /// .await
100 /// .map(cram::r#async::io::Reader::new)?;
101 ///
102 /// let mut header_reader = reader.header_reader();
103 /// header_reader.read_magic_number().await?;
104 /// header_reader.read_format_version().await?;
105 /// header_reader.read_file_id().await?;
106 ///
107 /// let mut container_reader = header_reader.container_reader().await?;
108 ///
109 /// let _raw_header = {
110 /// let mut raw_sam_header_reader = container_reader.raw_sam_header_reader().await?;
111 /// let mut raw_header = String::new();
112 /// raw_sam_header_reader.read_to_string(&mut raw_header).await?;
113 /// raw_sam_header_reader.discard_to_end().await?;
114 /// raw_header
115 /// };
116 ///
117 /// container_reader.discard_to_end().await?;
118 /// # Ok(())
119 /// # }
120 /// ```
121 pub fn header_reader(&mut self) -> header::Reader<&mut R> {
122 header::Reader::new(&mut self.inner)
123 }
124
125 /// Reads the CRAM file definition.
126 ///
127 /// This also checks the magic number.
128 ///
129 /// The position of the stream is expected to be at the start.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// # #[tokio::main]
135 /// # async fn main() -> tokio::io::Result<()> {
136 /// use noodles_cram as cram;
137 /// use tokio::fs::File;
138 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
139 /// let file_definition = reader.read_file_definition().await?;
140 /// # Ok(())
141 /// # }
142 /// ```
143 pub async fn read_file_definition(&mut self) -> io::Result<FileDefinition> {
144 header::read_file_definition(&mut self.inner).await
145 }
146
147 /// Reads the SAM header.
148 ///
149 /// The position of the stream is expected to be at the CRAM header container, i.e., directly
150 /// after the file definition.
151 ///
152 /// # Examples
153 ///
154 /// ```no_run
155 /// # #[tokio::main]
156 /// # async fn main() -> tokio::io::Result<()> {
157 /// use noodles_cram as cram;
158 /// use tokio::fs::File;
159 ///
160 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
161 /// reader.read_file_definition().await?;
162 ///
163 /// let header = reader.read_file_header().await?;
164 /// # Ok(())
165 /// # }
166 /// ```
167 pub async fn read_file_header(&mut self) -> io::Result<sam::Header> {
168 header::read_file_header(&mut self.inner).await
169 }
170
171 /// Reads the SAM header.
172 ///
173 /// This verifies the CRAM magic number, discards the file definition, and reads and parses the
174 /// file header as a SAM header.
175 ///
176 /// The position of the stream is expected to be at the start.
177 ///
178 /// # Examples
179 ///
180 /// ```no_run
181 /// # #[tokio::main]
182 /// # async fn main() -> tokio::io::Result<()> {
183 /// use noodles_cram as cram;
184 /// use tokio::fs::File;
185 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
186 /// let _header = reader.read_header().await?;
187 /// # Ok(())
188 /// # }
189 /// ```
190 pub async fn read_header(&mut self) -> io::Result<sam::Header> {
191 read_header(&mut self.inner).await
192 }
193
194 /// Reads a container.
195 ///
196 /// This returns `None` if the container header is the EOF container header, which signals the
197 /// end of the stream.
198 ///
199 /// # Examples
200 ///
201 /// ```no_run
202 /// # #[tokio::main]
203 /// # async fn main() -> tokio::io::Result<()> {
204 /// use noodles_cram::{self as cram, io::reader::Container};
205 /// use tokio::fs::File;
206 ///
207 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
208 /// reader.read_header().await?;
209 ///
210 /// let mut container = Container::default();
211 ///
212 /// while reader.read_container(&mut container).await? != 0 {
213 /// // ...
214 /// }
215 /// # Ok(())
216 /// # }
217 /// ```
218 pub async fn read_container(&mut self, container: &mut Container) -> io::Result<usize> {
219 read_container(&mut self.inner, container).await
220 }
221
222 /// Reads a container.
223 #[deprecated(since = "0.78.0", note = "Use `Reader::read_container` instead.")]
224 pub async fn read_data_container(&mut self) -> io::Result<Option<Container>> {
225 let mut container = Container::default();
226
227 read_container(&mut self.inner, &mut container)
228 .await
229 .map(|n| match n {
230 0 => None,
231 _ => Some(container),
232 })
233 }
234
235 /// Returns an (async) stream over records starting from the current (input) stream position.
236 ///
237 /// The (input) stream position is expected to be at the start of a container.
238 ///
239 /// # Examples
240 ///
241 /// ```no_run
242 /// # #[tokio::main]
243 /// # async fn main() -> tokio::io::Result<()> {
244 /// use futures::TryStreamExt;
245 /// use noodles_cram as cram;
246 /// use tokio::fs::File;
247 ///
248 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
249 /// let header = reader.read_header().await?;
250 ///
251 /// let mut records = reader.records(&header);
252 ///
253 /// while let Some(record) = records.try_next().await? {
254 /// // ...
255 /// }
256 /// # Ok(())
257 /// # }
258 /// ```
259 pub fn records<'r, 'h: 'r>(
260 &'r mut self,
261 header: &'h sam::Header,
262 ) -> impl Stream<Item = io::Result<sam::alignment::RecordBuf>> + 'r {
263 use self::records::records;
264
265 records(self, header)
266 }
267}
268
269impl<R> Reader<R>
270where
271 R: AsyncRead + AsyncSeek + Unpin,
272{
273 /// Seeks the underlying reader to the given position.
274 ///
275 /// Positions typically come from an associated CRAM index file.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// # #[tokio::main]
281 /// # async fn main() -> tokio::io::Result<()> {
282 /// use noodles_cram as cram;
283 /// use tokio::io::{self, SeekFrom};
284 /// let mut reader = cram::r#async::io::Reader::new(io::empty());
285 /// reader.seek(SeekFrom::Start(0)).await?;
286 /// # Ok(())
287 /// # }
288 /// ```
289 pub async fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
290 self.inner.seek(pos).await
291 }
292
293 /// Returns the current position of the underlying reader.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// # #[tokio::main]
299 /// # async fn main() -> tokio::io::Result<()> {
300 /// use tokio::io;
301 /// use noodles_cram as cram;
302 /// let mut reader = cram::r#async::io::Reader::new(io::empty());
303 /// assert_eq!(reader.position().await?, 0);
304 /// # Ok(())
305 /// # }
306 /// ```
307 pub async fn position(&mut self) -> io::Result<u64> {
308 self.inner.stream_position().await
309 }
310
311 /// Returns a stream over records that intersects the given region.
312 ///
313 /// # Examples
314 ///
315 /// ```no_run
316 /// # #[tokio::main]
317 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
318 /// use futures::TryStreamExt;
319 /// use noodles_core::Region;
320 /// use noodles_cram::{self as cram, crai};
321 /// use tokio::fs::File;
322 ///
323 /// let mut reader = File::open("sample.cram").await.map(cram::r#async::io::Reader::new)?;
324 /// let header = reader.read_header().await?;
325 ///
326 /// let index = crai::r#async::read("sample.cram.crai").await?;
327 /// let region = "sq0:8-13".parse()?;
328 /// let mut query = reader.query(&header, &index, ®ion)?;
329 ///
330 /// while let Some(record) = query.try_next().await? {
331 /// // ...
332 /// }
333 /// # Ok(())
334 /// # }
335 /// ```
336 pub fn query<'r, 'h: 'r, 'i: 'r>(
337 &'r mut self,
338 header: &'h sam::Header,
339 index: &'i crai::Index,
340 region: &Region,
341 ) -> io::Result<impl Stream<Item = io::Result<sam::alignment::RecordBuf>> + 'r> {
342 use self::query::query;
343
344 let reference_sequence_id = header
345 .reference_sequences()
346 .get_index_of(region.name())
347 .ok_or_else(|| {
348 io::Error::new(
349 io::ErrorKind::InvalidInput,
350 "invalid reference sequence name",
351 )
352 })?;
353
354 Ok(query(
355 self,
356 header,
357 index,
358 reference_sequence_id,
359 region.interval(),
360 ))
361 }
362}