noodles_bcf/async/io/
reader.rs

1pub mod header;
2mod query;
3mod record;
4
5use futures::{stream, Stream};
6use noodles_bgzf as bgzf;
7use noodles_core::Region;
8use noodles_csi::BinningIndex;
9use noodles_vcf as vcf;
10use tokio::io::{self, AsyncRead, AsyncSeek};
11
12use self::{header::read_header, query::query, record::read_record};
13use crate::Record;
14
15/// An async BCF reader.
16///
17/// # Examples
18///
19/// ```no_run
20/// # use std::io;
21/// #
22/// # #[tokio::main]
23/// # async fn main() -> io::Result<()> {
24/// use futures::TryStreamExt;
25/// use noodles_bcf as bcf;
26/// use tokio::fs::File;
27///
28/// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
29/// reader.read_header().await?;
30///
31/// let mut records = reader.records();
32///
33/// while let Some(record) = records.try_next().await? {
34///     // ...
35/// }
36/// # Ok(())
37/// # }
38/// ```
39pub struct Reader<R> {
40    inner: R,
41}
42
43impl<R> Reader<R> {
44    /// Returns a reference to the underlying reader.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// use noodles_bcf as bcf;
50    /// use tokio::io;
51    /// let reader = bcf::r#async::io::Reader::from(io::empty());
52    /// let _inner = reader.get_ref();
53    /// ```
54    pub fn get_ref(&self) -> &R {
55        &self.inner
56    }
57
58    /// Returns a mutable reference to the underlying reader.
59    ///
60    /// # Examples
61    ///
62    /// ```
63    /// use noodles_bcf as bcf;
64    /// use tokio::io;
65    /// let mut reader = bcf::r#async::io::Reader::from(io::empty());
66    /// let _inner = reader.get_mut();
67    /// ```
68    pub fn get_mut(&mut self) -> &mut R {
69        &mut self.inner
70    }
71
72    /// Returns the underlying reader.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use noodles_bcf as bcf;
78    /// use tokio::io;
79    /// let reader = bcf::r#async::io::Reader::from(io::empty());
80    /// let _inner = reader.into_inner();
81    /// ```
82    pub fn into_inner(self) -> R {
83        self.inner
84    }
85}
86
87impl<R> Reader<R>
88where
89    R: AsyncRead + Unpin,
90{
91    /// Returns a BCF header reader.
92    ///
93    /// # Examples
94    ///
95    /// ```no_run
96    /// # #[tokio::main]
97    /// # async fn main() -> tokio::io::Result<()> {
98    /// use noodles_bcf as bcf;
99    /// use tokio::{fs::File, io::AsyncReadExt};
100    ///
101    /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
102    ///
103    /// let mut header_reader = reader.header_reader();
104    /// header_reader.read_magic_number().await?;
105    /// header_reader.read_format_version().await?;
106    ///
107    /// let mut raw_vcf_header_reader = header_reader.raw_vcf_header_reader().await?;
108    /// let mut raw_header = String::new();
109    /// raw_vcf_header_reader.read_to_string(&mut raw_header).await?;
110    /// raw_vcf_header_reader.discard_to_end().await?;
111    /// # Ok(())
112    /// # }
113    /// ```
114    pub fn header_reader(&mut self) -> header::Reader<&mut R> {
115        header::Reader::new(&mut self.inner)
116    }
117
118    /// Reads the VCF header.
119    ///
120    /// The BCF magic number is checked, and the file format version is discarded.
121    ///
122    /// The position of the stream is expected to be at the start.
123    ///
124    /// # Examples
125    ///
126    /// ```no_run
127    /// # #[tokio::main]
128    /// # async fn main() -> tokio::io::Result<()> {
129    /// use noodles_bcf as bcf;
130    /// use tokio::fs::File;
131    ///
132    /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
133    /// let header = reader.read_header().await?;
134    /// # Ok(())
135    /// # }
136    /// ```
137    pub async fn read_header(&mut self) -> io::Result<vcf::Header> {
138        read_header(&mut self.inner).await
139    }
140
141    /// Reads a single record without decoding (most of) its fields.
142    ///
143    /// The stream is expected to be directly after the header or at the start of another record.
144    ///
145    /// It is more ergonomic to read records using a stream (see [`Self::records`]), but using
146    /// this method directly allows the reuse of a single [`Record`] buffer.
147    ///
148    /// If successful, the record size is returned. If a record size of 0 is returned, the stream
149    /// reached EOF.
150    ///
151    /// ```no_run
152    /// # #[tokio::main]
153    /// # async fn main() -> tokio::io::Result<()> {
154    /// use noodles_bcf as bcf;
155    /// use tokio::fs::File;
156    ///
157    /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
158    /// reader.read_header().await?;
159    ///
160    /// let mut record = bcf::Record::default();
161    /// reader.read_record(&mut record).await?;
162    /// # Ok(())
163    /// # }
164    /// ```
165    pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
166        read_record(&mut self.inner, record).await
167    }
168
169    /// Returns an (async) stream over lazy records starting from the current (input) stream
170    /// position.
171    ///
172    /// The (input) stream is expected to be directly after the header or at the start of another
173    /// record.
174    ///
175    /// # Examples
176    ///
177    /// ```no_run
178    /// # #[tokio::main]
179    /// # async fn main() -> tokio::io::Result<()> {
180    /// use futures::TryStreamExt;
181    /// use noodles_bcf as bcf;
182    /// use tokio::fs::File;
183    ///
184    /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
185    /// reader.read_header().await?;
186    ///
187    /// let mut records = reader.records();
188    ///
189    /// while let Some(record) = records.try_next().await? {
190    ///     // ...
191    /// }
192    /// # Ok(())
193    /// # }
194    /// ```
195    pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
196        Box::pin(stream::try_unfold(
197            (&mut self.inner, Record::default()),
198            |(mut reader, mut record)| async {
199                read_record(&mut reader, &mut record)
200                    .await
201                    .map(|n| match n {
202                        0 => None,
203                        _ => Some((record.clone(), (reader, record))),
204                    })
205            },
206        ))
207    }
208}
209
210impl<R> Reader<bgzf::AsyncReader<R>>
211where
212    R: AsyncRead + Unpin,
213{
214    /// Creates an async BCF reader.
215    ///
216    /// # Examples
217    ///
218    /// ```
219    /// use noodles_bcf as bcf;
220    /// use tokio::io;
221    /// let reader = bcf::r#async::io::Reader::new(io::empty());
222    /// ```
223    pub fn new(inner: R) -> Self {
224        Self::from(bgzf::AsyncReader::new(inner))
225    }
226}
227
228impl<R> Reader<bgzf::AsyncReader<R>>
229where
230    R: AsyncRead + AsyncSeek + Unpin,
231{
232    /// Returns a stream over records that intersect the given region.
233    ///
234    /// # Examples
235    ///
236    /// ```no_run
237    /// # #[tokio::main]
238    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
239    /// use futures::TryStreamExt;
240    /// use noodles_bcf as bcf;
241    /// use noodles_core::Region;
242    /// use noodles_csi as csi;
243    /// use tokio::fs::File;
244    ///
245    /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
246    /// let header = reader.read_header().await?;
247    ///
248    /// let index = csi::r#async::read("sample.bcf.csi").await?;
249    /// let region = "sq0:8-13".parse()?;
250    /// let mut query = reader.query(&header, &index, &region)?;
251    ///
252    /// while let Some(record) = query.try_next().await? {
253    ///     // ...
254    /// }
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub fn query<I>(
259        &mut self,
260        header: &vcf::Header,
261        index: &I,
262        region: &Region,
263    ) -> io::Result<impl Stream<Item = io::Result<Record>> + '_>
264    where
265        I: BinningIndex,
266    {
267        use crate::io::reader::resolve_region;
268
269        let reference_sequence_id = resolve_region(header.string_maps().contigs(), region)?;
270        let chunks = index.query(reference_sequence_id, region.interval())?;
271
272        Ok(query(
273            self,
274            chunks,
275            reference_sequence_id,
276            region.interval(),
277        ))
278    }
279}
280
281impl<R> From<R> for Reader<R> {
282    fn from(inner: R) -> Self {
283        Self { inner }
284    }
285}