noodles_bcf/async/io/reader.rs
1pub mod header;
2mod query;
3mod record;
4
5use futures::{stream, Stream};
6use noodles_bgzf as bgzf;
7use noodles_core::Region;
8use noodles_csi::BinningIndex;
9use noodles_vcf as vcf;
10use tokio::io::{self, AsyncRead, AsyncSeek};
11
12use self::{header::read_header, query::query, record::read_record};
13use crate::Record;
14
15/// An async BCF reader.
16///
17/// # Examples
18///
19/// ```no_run
20/// # use std::io;
21/// #
22/// # #[tokio::main]
23/// # async fn main() -> io::Result<()> {
24/// use futures::TryStreamExt;
25/// use noodles_bcf as bcf;
26/// use tokio::fs::File;
27///
28/// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
29/// reader.read_header().await?;
30///
31/// let mut records = reader.records();
32///
33/// while let Some(record) = records.try_next().await? {
34/// // ...
35/// }
36/// # Ok(())
37/// # }
38/// ```
39pub struct Reader<R> {
40 inner: R,
41}
42
43impl<R> Reader<R> {
44 /// Returns a reference to the underlying reader.
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// use noodles_bcf as bcf;
50 /// use tokio::io;
51 /// let reader = bcf::r#async::io::Reader::from(io::empty());
52 /// let _inner = reader.get_ref();
53 /// ```
54 pub fn get_ref(&self) -> &R {
55 &self.inner
56 }
57
58 /// Returns a mutable reference to the underlying reader.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use noodles_bcf as bcf;
64 /// use tokio::io;
65 /// let mut reader = bcf::r#async::io::Reader::from(io::empty());
66 /// let _inner = reader.get_mut();
67 /// ```
68 pub fn get_mut(&mut self) -> &mut R {
69 &mut self.inner
70 }
71
72 /// Returns the underlying reader.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// use noodles_bcf as bcf;
78 /// use tokio::io;
79 /// let reader = bcf::r#async::io::Reader::from(io::empty());
80 /// let _inner = reader.into_inner();
81 /// ```
82 pub fn into_inner(self) -> R {
83 self.inner
84 }
85}
86
87impl<R> Reader<R>
88where
89 R: AsyncRead + Unpin,
90{
91 /// Returns a BCF header reader.
92 ///
93 /// # Examples
94 ///
95 /// ```no_run
96 /// # #[tokio::main]
97 /// # async fn main() -> tokio::io::Result<()> {
98 /// use noodles_bcf as bcf;
99 /// use tokio::{fs::File, io::AsyncReadExt};
100 ///
101 /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
102 ///
103 /// let mut header_reader = reader.header_reader();
104 /// header_reader.read_magic_number().await?;
105 /// header_reader.read_format_version().await?;
106 ///
107 /// let mut raw_vcf_header_reader = header_reader.raw_vcf_header_reader().await?;
108 /// let mut raw_header = String::new();
109 /// raw_vcf_header_reader.read_to_string(&mut raw_header).await?;
110 /// raw_vcf_header_reader.discard_to_end().await?;
111 /// # Ok(())
112 /// # }
113 /// ```
114 pub fn header_reader(&mut self) -> header::Reader<&mut R> {
115 header::Reader::new(&mut self.inner)
116 }
117
118 /// Reads the VCF header.
119 ///
120 /// The BCF magic number is checked, and the file format version is discarded.
121 ///
122 /// The position of the stream is expected to be at the start.
123 ///
124 /// # Examples
125 ///
126 /// ```no_run
127 /// # #[tokio::main]
128 /// # async fn main() -> tokio::io::Result<()> {
129 /// use noodles_bcf as bcf;
130 /// use tokio::fs::File;
131 ///
132 /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
133 /// let header = reader.read_header().await?;
134 /// # Ok(())
135 /// # }
136 /// ```
137 pub async fn read_header(&mut self) -> io::Result<vcf::Header> {
138 read_header(&mut self.inner).await
139 }
140
141 /// Reads a single record without decoding (most of) its fields.
142 ///
143 /// The stream is expected to be directly after the header or at the start of another record.
144 ///
145 /// It is more ergonomic to read records using a stream (see [`Self::records`]), but using
146 /// this method directly allows the reuse of a single [`Record`] buffer.
147 ///
148 /// If successful, the record size is returned. If a record size of 0 is returned, the stream
149 /// reached EOF.
150 ///
151 /// ```no_run
152 /// # #[tokio::main]
153 /// # async fn main() -> tokio::io::Result<()> {
154 /// use noodles_bcf as bcf;
155 /// use tokio::fs::File;
156 ///
157 /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
158 /// reader.read_header().await?;
159 ///
160 /// let mut record = bcf::Record::default();
161 /// reader.read_record(&mut record).await?;
162 /// # Ok(())
163 /// # }
164 /// ```
165 pub async fn read_record(&mut self, record: &mut Record) -> io::Result<usize> {
166 read_record(&mut self.inner, record).await
167 }
168
169 /// Returns an (async) stream over lazy records starting from the current (input) stream
170 /// position.
171 ///
172 /// The (input) stream is expected to be directly after the header or at the start of another
173 /// record.
174 ///
175 /// # Examples
176 ///
177 /// ```no_run
178 /// # #[tokio::main]
179 /// # async fn main() -> tokio::io::Result<()> {
180 /// use futures::TryStreamExt;
181 /// use noodles_bcf as bcf;
182 /// use tokio::fs::File;
183 ///
184 /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
185 /// reader.read_header().await?;
186 ///
187 /// let mut records = reader.records();
188 ///
189 /// while let Some(record) = records.try_next().await? {
190 /// // ...
191 /// }
192 /// # Ok(())
193 /// # }
194 /// ```
195 pub fn records(&mut self) -> impl Stream<Item = io::Result<Record>> + '_ {
196 Box::pin(stream::try_unfold(
197 (&mut self.inner, Record::default()),
198 |(mut reader, mut record)| async {
199 read_record(&mut reader, &mut record)
200 .await
201 .map(|n| match n {
202 0 => None,
203 _ => Some((record.clone(), (reader, record))),
204 })
205 },
206 ))
207 }
208}
209
210impl<R> Reader<bgzf::AsyncReader<R>>
211where
212 R: AsyncRead + Unpin,
213{
214 /// Creates an async BCF reader.
215 ///
216 /// # Examples
217 ///
218 /// ```
219 /// use noodles_bcf as bcf;
220 /// use tokio::io;
221 /// let reader = bcf::r#async::io::Reader::new(io::empty());
222 /// ```
223 pub fn new(inner: R) -> Self {
224 Self::from(bgzf::AsyncReader::new(inner))
225 }
226}
227
228impl<R> Reader<bgzf::AsyncReader<R>>
229where
230 R: AsyncRead + AsyncSeek + Unpin,
231{
232 /// Returns a stream over records that intersect the given region.
233 ///
234 /// # Examples
235 ///
236 /// ```no_run
237 /// # #[tokio::main]
238 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
239 /// use futures::TryStreamExt;
240 /// use noodles_bcf as bcf;
241 /// use noodles_core::Region;
242 /// use noodles_csi as csi;
243 /// use tokio::fs::File;
244 ///
245 /// let mut reader = File::open("sample.bcf").await.map(bcf::r#async::io::Reader::new)?;
246 /// let header = reader.read_header().await?;
247 ///
248 /// let index = csi::r#async::read("sample.bcf.csi").await?;
249 /// let region = "sq0:8-13".parse()?;
250 /// let mut query = reader.query(&header, &index, ®ion)?;
251 ///
252 /// while let Some(record) = query.try_next().await? {
253 /// // ...
254 /// }
255 /// # Ok(())
256 /// # }
257 /// ```
258 pub fn query<I>(
259 &mut self,
260 header: &vcf::Header,
261 index: &I,
262 region: &Region,
263 ) -> io::Result<impl Stream<Item = io::Result<Record>> + '_>
264 where
265 I: BinningIndex,
266 {
267 use crate::io::reader::resolve_region;
268
269 let reference_sequence_id = resolve_region(header.string_maps().contigs(), region)?;
270 let chunks = index.query(reference_sequence_id, region.interval())?;
271
272 Ok(query(
273 self,
274 chunks,
275 reference_sequence_id,
276 region.interval(),
277 ))
278 }
279}
280
281impl<R> From<R> for Reader<R> {
282 fn from(inner: R) -> Self {
283 Self { inner }
284 }
285}