noodles_cram/crai/async/io/
reader.rs1use async_compression::tokio::bufread::GzipDecoder;
2use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader};
3
4use crate::crai::Index;
5
6pub struct Reader<R> {
8 inner: BufReader<GzipDecoder<BufReader<R>>>,
9}
10
11impl<R> Reader<R>
12where
13 R: AsyncRead + Unpin,
14{
15 pub fn new(inner: R) -> Self {
25 Self {
26 inner: BufReader::new(GzipDecoder::new(BufReader::new(inner))),
27 }
28 }
29
30 pub async fn read_index(&mut self) -> io::Result<Index> {
53 read_index(&mut self.inner).await
54 }
55}
56
57async fn read_index<R>(reader: &mut R) -> io::Result<Index>
58where
59 R: AsyncBufRead + Unpin,
60{
61 let mut index = Vec::new();
62 let mut buf = String::new();
63
64 loop {
65 buf.clear();
66
67 match read_line(reader, &mut buf).await {
68 Ok(0) => break,
69 Ok(_) => {
70 let record = buf
71 .parse()
72 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
73
74 index.push(record);
75 }
76 Err(e) => return Err(e),
77 }
78 }
79
80 Ok(index)
81}
82
83async fn read_line<R>(reader: &mut R, buf: &mut String) -> io::Result<usize>
84where
85 R: AsyncBufRead + Unpin,
86{
87 const LINE_FEED: char = '\n';
88
89 match reader.read_line(buf).await {
90 Ok(0) => Ok(0),
91 Ok(n) => {
92 if buf.ends_with(LINE_FEED) {
93 buf.pop();
94 }
95
96 Ok(n)
97 }
98 Err(e) => Err(e),
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use noodles_core::Position;
105
106 use super::*;
107 use crate::crai::Record;
108
109 #[tokio::test]
110 async fn test_read_index() -> Result<(), Box<dyn std::error::Error>> {
111 let data = b"0\t10946\t6765\t17711\t233\t317811\n";
112
113 let mut reader = &data[..];
114 let actual = read_index(&mut reader).await?;
115
116 let expected = vec![Record::new(
117 Some(0),
118 Position::new(10946),
119 6765,
120 17711,
121 233,
122 317811,
123 )];
124
125 assert_eq!(actual, expected);
126
127 Ok(())
128 }
129}