noodles_cram/crai/async/io/
reader.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader};
3
4use crate::crai::Index;
5
6/// An async CRAM index reader.
7pub struct Reader<R> {
8    inner: BufReader<GzipDecoder<BufReader<R>>>,
9}
10
11impl<R> Reader<R>
12where
13    R: AsyncRead + Unpin,
14{
15    /// Creates an async CRAM index reader.
16    ///
17    /// # Examples
18    ///
19    /// ```
20    /// use noodles_cram::crai;
21    /// let data = [];
22    /// let reader = crai::r#async::io::Reader::new(&data[..]);
23    /// ```
24    pub fn new(inner: R) -> Self {
25        Self {
26            inner: BufReader::new(GzipDecoder::new(BufReader::new(inner))),
27        }
28    }
29
30    /// Reads a CRAM index.
31    ///
32    /// The position of the stream is expected to be at the start.
33    ///
34    /// # Examples
35    ///
36    /// ```no_run
37    /// # use std::io;
38    /// #
39    /// # #[tokio::main]
40    /// # async fn main() -> io::Result<()> {
41    /// use noodles_cram::crai;
42    /// use tokio::fs::File;
43    ///
44    /// let mut reader = File::open("sample.cram.crai")
45    ///     .await
46    ///     .map(crai::r#async::io::Reader::new)?;
47    ///
48    /// let index = reader.read_index().await?;
49    /// # Ok(())
50    /// # }
51    /// ```
52    pub async fn read_index(&mut self) -> io::Result<Index> {
53        read_index(&mut self.inner).await
54    }
55}
56
57async fn read_index<R>(reader: &mut R) -> io::Result<Index>
58where
59    R: AsyncBufRead + Unpin,
60{
61    let mut index = Vec::new();
62    let mut buf = String::new();
63
64    loop {
65        buf.clear();
66
67        match read_line(reader, &mut buf).await {
68            Ok(0) => break,
69            Ok(_) => {
70                let record = buf
71                    .parse()
72                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
73
74                index.push(record);
75            }
76            Err(e) => return Err(e),
77        }
78    }
79
80    Ok(index)
81}
82
83async fn read_line<R>(reader: &mut R, buf: &mut String) -> io::Result<usize>
84where
85    R: AsyncBufRead + Unpin,
86{
87    const LINE_FEED: char = '\n';
88
89    match reader.read_line(buf).await {
90        Ok(0) => Ok(0),
91        Ok(n) => {
92            if buf.ends_with(LINE_FEED) {
93                buf.pop();
94            }
95
96            Ok(n)
97        }
98        Err(e) => Err(e),
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use noodles_core::Position;
105
106    use super::*;
107    use crate::crai::Record;
108
109    #[tokio::test]
110    async fn test_read_index() -> Result<(), Box<dyn std::error::Error>> {
111        let data = b"0\t10946\t6765\t17711\t233\t317811\n";
112
113        let mut reader = &data[..];
114        let actual = read_index(&mut reader).await?;
115
116        let expected = vec![Record::new(
117            Some(0),
118            Position::new(10946),
119            6765,
120            17711,
121            233,
122            317811,
123        )];
124
125        assert_eq!(actual, expected);
126
127        Ok(())
128    }
129}