use indexmap::IndexMap;
use noodles_bgzf as bgzf;
use tokio::io::{self, AsyncRead, AsyncReadExt};
use crate::{
binning_index::index::{
reference_sequence::{bin::Chunk, index::BinnedIndex, Bin, Metadata},
Header, ReferenceSequence,
},
Index,
};
pub struct Reader<R>
where
R: AsyncRead,
{
inner: bgzf::AsyncReader<R>,
}
impl<R> Reader<R>
where
R: AsyncRead + Unpin,
{
pub fn new(inner: R) -> Self {
Self {
inner: bgzf::AsyncReader::new(inner),
}
}
pub fn get_ref(&self) -> &bgzf::AsyncReader<R> {
&self.inner
}
pub fn get_mut(&mut self) -> &mut bgzf::AsyncReader<R> {
&mut self.inner
}
pub fn into_inner(self) -> bgzf::AsyncReader<R> {
self.inner
}
pub async fn read_index(&mut self) -> io::Result<Index> {
read_magic(&mut self.inner).await?;
let (min_shift, depth, header) = read_header(&mut self.inner).await?;
let reference_sequences = read_reference_sequences(&mut self.inner, depth).await?;
let unplaced_unmapped_record_count =
read_unplaced_unmapped_record_count(&mut self.inner).await?;
let mut builder = Index::builder()
.set_min_shift(min_shift)
.set_depth(depth)
.set_reference_sequences(reference_sequences);
if let Some(hdr) = header {
builder = builder.set_header(hdr);
}
if let Some(count) = unplaced_unmapped_record_count {
builder = builder.set_unplaced_unmapped_record_count(count);
}
Ok(builder.build())
}
}
async fn read_magic<R>(reader: &mut R) -> io::Result<()>
where
R: AsyncRead + Unpin,
{
use crate::io::MAGIC_NUMBER;
let mut magic = [0; 4];
reader.read_exact(&mut magic).await?;
if magic == MAGIC_NUMBER {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid CSI header",
))
}
}
async fn read_header<R>(reader: &mut R) -> io::Result<(u8, u8, Option<Header>)>
where
R: AsyncRead + Unpin,
{
let min_shift = reader
.read_i32_le()
.await
.and_then(|n| u8::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)))?;
let depth = reader
.read_i32_le()
.await
.and_then(|n| u8::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)))?;
let header = read_aux(reader).await?;
Ok((min_shift, depth, header))
}
async fn read_aux<R>(reader: &mut R) -> io::Result<Option<Header>>
where
R: AsyncRead + Unpin,
{
use crate::io::reader::index::header::read_header as read_tabix_header;
let l_aux = reader.read_i32_le().await.and_then(|len| {
usize::try_from(len).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;
if l_aux > 0 {
let mut aux = vec![0; l_aux];
reader.read_exact(&mut aux).await?;
let mut rdr = &aux[..];
read_tabix_header(&mut rdr)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.map(Some)
} else {
Ok(None)
}
}
async fn read_reference_sequences<R>(
reader: &mut R,
depth: u8,
) -> io::Result<Vec<ReferenceSequence<BinnedIndex>>>
where
R: AsyncRead + Unpin,
{
let n_ref = reader.read_i32_le().await.and_then(|n| {
usize::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;
let mut reference_sequences = Vec::with_capacity(n_ref);
for _ in 0..n_ref {
let reference_sequence = read_reference_sequence(reader, depth).await?;
reference_sequences.push(reference_sequence);
}
Ok(reference_sequences)
}
async fn read_reference_sequence<R>(
reader: &mut R,
depth: u8,
) -> io::Result<ReferenceSequence<BinnedIndex>>
where
R: AsyncRead + Unpin,
{
let (bins, index, metadata) = read_bins(reader, depth).await?;
Ok(ReferenceSequence::new(bins, index, metadata))
}
async fn read_bins<R>(
reader: &mut R,
depth: u8,
) -> io::Result<(
IndexMap<usize, Bin>,
IndexMap<usize, bgzf::VirtualPosition>,
Option<Metadata>,
)>
where
R: AsyncRead + Unpin,
{
#[allow(clippy::type_complexity)]
fn duplicate_bin_error(
id: usize,
) -> io::Result<(
IndexMap<usize, Bin>,
IndexMap<usize, bgzf::VirtualPosition>,
Option<Metadata>,
)> {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("duplicate bin ID: {id}"),
))
}
let n_bin = reader.read_i32_le().await.and_then(|n| {
usize::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;
let mut bins = IndexMap::with_capacity(n_bin);
let mut index = IndexMap::with_capacity(n_bin);
let metadata_id = Bin::metadata_id(depth);
let mut metadata = None;
for _ in 0..n_bin {
let id = reader.read_u32_le().await.and_then(|n| {
usize::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;
let loffset = reader
.read_u64_le()
.await
.map(bgzf::VirtualPosition::from)?;
if id == metadata_id {
let m = read_metadata(reader).await?;
if metadata.replace(m).is_some() {
return duplicate_bin_error(id);
}
} else {
let chunks = read_chunks(reader).await?;
let bin = Bin::new(chunks);
if bins.insert(id, bin).is_some() {
return duplicate_bin_error(id);
}
index.insert(id, loffset);
}
}
Ok((bins, index, metadata))
}
async fn read_chunks<R>(reader: &mut R) -> io::Result<Vec<Chunk>>
where
R: AsyncRead + Unpin,
{
let n_chunk = reader.read_i32_le().await.and_then(|n| {
usize::try_from(n).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
})?;
let mut chunks = Vec::with_capacity(n_chunk);
for _ in 0..n_chunk {
let chunk = read_chunk(reader).await?;
chunks.push(chunk);
}
Ok(chunks)
}
async fn read_chunk<R>(reader: &mut R) -> io::Result<Chunk>
where
R: AsyncRead + Unpin,
{
let chunk_beg = reader
.read_u64_le()
.await
.map(bgzf::VirtualPosition::from)?;
let chunk_end = reader
.read_u64_le()
.await
.map(bgzf::VirtualPosition::from)?;
Ok(Chunk::new(chunk_beg, chunk_end))
}
async fn read_metadata<R>(reader: &mut R) -> io::Result<Metadata>
where
R: AsyncRead + Unpin,
{
use crate::binning_index::index::reference_sequence::bin::METADATA_CHUNK_COUNT;
let n_chunk = reader.read_u32_le().await?;
if n_chunk != METADATA_CHUNK_COUNT {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"invalid metadata pseudo-bin chunk count: expected {METADATA_CHUNK_COUNT}, got {n_chunk}"
),
));
}
let ref_beg = reader
.read_u64_le()
.await
.map(bgzf::VirtualPosition::from)?;
let ref_end = reader
.read_u64_le()
.await
.map(bgzf::VirtualPosition::from)?;
let n_mapped = reader.read_u64_le().await?;
let n_unmapped = reader.read_u64_le().await?;
Ok(Metadata::new(ref_beg, ref_end, n_mapped, n_unmapped))
}
async fn read_unplaced_unmapped_record_count<R>(reader: &mut R) -> io::Result<Option<u64>>
where
R: AsyncRead + Unpin,
{
match reader.read_u64_le().await {
Ok(n_no_coor) => Ok(Some(n_no_coor)),
Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_read_magic() {
let data = b"CSI\x01";
let mut reader = &data[..];
assert!(read_magic(&mut reader).await.is_ok());
let data = [];
let mut reader = &data[..];
assert!(matches!(
read_magic(&mut reader).await,
Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof
));
let data = b"MThd";
let mut reader = &data[..];
assert!(matches!(
read_magic(&mut reader).await,
Err(ref e) if e.kind() == io::ErrorKind::InvalidData
));
}
#[tokio::test]
async fn test_read_header() -> io::Result<()> {
let data = [
0x0e, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
let mut reader = &data[..];
let (min_shift, depth, header) = read_header(&mut reader).await?;
assert_eq!(min_shift, 14);
assert_eq!(depth, 5);
assert!(header.is_none());
Ok(())
}
}