use std::io::{Read, Seek, SeekFrom};
use prost::Message;
use crate::error::Error;
use crate::proto::stream::Kind;
use crate::proto::{CompressionKind, Footer, Metadata, PostScript, StripeFooter};
mod column;
pub mod decode;
pub mod decompress;
pub use column::Column;
const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
let old_pos = seek.seek(SeekFrom::Current(0))?;
let len = seek.seek(SeekFrom::End(0))?;
if old_pos != len {
seek.seek(SeekFrom::Start(old_pos))?;
}
Ok(len)
}
#[derive(Debug)]
pub struct FileMetadata {
pub postscript: PostScript,
pub footer: Footer,
pub metadata: Metadata,
}
pub fn read_metadata<R>(reader: &mut R) -> Result<FileMetadata, Error>
where
R: Read + Seek,
{
let file_len = stream_len(reader)?;
let footer_len = if file_len < DEFAULT_FOOTER_SIZE {
file_len
} else {
DEFAULT_FOOTER_SIZE
};
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
let mut tail_bytes = Vec::with_capacity(footer_len as usize);
reader.take(footer_len).read_to_end(&mut tail_bytes)?;
let postscript_len = tail_bytes[tail_bytes.len() - 1] as usize;
tail_bytes.truncate(tail_bytes.len() - 1);
let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len..])?;
tail_bytes.truncate(tail_bytes.len() - postscript_len);
let footer_length = postscript.footer_length.ok_or(Error::OutOfSpec)? as usize; let footer = &tail_bytes[tail_bytes.len() - footer_length..];
let footer = deserialize_footer(footer, postscript.compression())?;
tail_bytes.truncate(tail_bytes.len() - footer_length);
let metadata_length = postscript.metadata_length.ok_or(Error::OutOfSpec)? as usize; let metadata = &tail_bytes[tail_bytes.len() - metadata_length..];
let metadata = deserialize_footer_metadata(metadata, postscript.compression())?;
Ok(FileMetadata {
postscript,
footer,
metadata,
})
}
pub fn read_stripe_footer<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
stripe: usize,
scratch: &mut Vec<u8>,
) -> Result<StripeFooter, Error> {
let stripe = &metadata.footer.stripes[stripe];
let start = stripe.offset() + stripe.index_length() + stripe.data_length();
let len = stripe.footer_length();
reader.seek(SeekFrom::Start(start))?;
scratch.clear();
scratch.reserve(len as usize);
reader.take(len).read_to_end(scratch)?;
deserialize_stripe_footer(scratch, metadata.postscript.compression())
}
pub fn read_stripe_column<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
stripe: usize,
footer: StripeFooter,
column: u32,
mut scratch: Vec<u8>,
) -> Result<Column, Error> {
let stripe = &metadata.footer.stripes[stripe];
let mut start = 0; let start = footer
.streams
.iter()
.map(|stream| {
start += stream.length();
(start, stream)
})
.find(|(_, stream)| stream.column() == column && stream.kind() != Kind::RowIndex)
.map(|(start, stream)| start - stream.length())
.ok_or(Error::InvalidColumn(column))?;
let length = footer
.streams
.iter()
.filter(|stream| stream.column() == column && stream.kind() != Kind::RowIndex)
.fold(0, |acc, stream| acc + stream.length());
let start = stripe.offset() + start;
reader.seek(SeekFrom::Start(start))?;
scratch.clear();
scratch.reserve(length as usize);
reader.take(length).read_to_end(&mut scratch)?;
Ok(Column::new(
scratch,
column,
stripe.number_of_rows(),
footer,
metadata.postscript.compression(),
))
}
fn deserialize_footer(bytes: &[u8], compression: CompressionKind) -> Result<Footer, Error> {
let mut buffer = vec![];
decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
Ok(Footer::decode(&*buffer)?)
}
fn deserialize_footer_metadata(
bytes: &[u8],
compression: CompressionKind,
) -> Result<Metadata, Error> {
let mut buffer = vec![];
decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
Ok(Metadata::decode(&*buffer)?)
}
fn deserialize_stripe_footer(
bytes: &[u8],
compression: CompressionKind,
) -> Result<StripeFooter, Error> {
let mut buffer = vec![];
decompress::Decompressor::new(bytes, compression, vec![]).read_to_end(&mut buffer)?;
Ok(StripeFooter::decode(&*buffer)?)
}