polars_parquet/parquet/read/
metadata.rsuse std::cmp::min;
use std::io::{Read, Seek, SeekFrom};
use polars_parquet_format::thrift::protocol::TCompactInputProtocol;
use polars_parquet_format::FileMetaData as TFileMetadata;
use super::super::metadata::FileMetadata;
use super::super::{DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC};
use crate::parquet::error::{ParquetError, ParquetResult};
pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
}
fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
let old_pos = seek.stream_position()?;
let len = seek.seek(SeekFrom::End(0))?;
if old_pos != len {
seek.seek(SeekFrom::Start(old_pos))?;
}
Ok(len)
}
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> ParquetResult<FileMetadata> {
let file_size = stream_len(reader)?;
read_metadata_with_size(reader, file_size)
}
pub fn read_metadata_with_size<R: Read + Seek>(
reader: &mut R,
file_size: u64,
) -> ParquetResult<FileMetadata> {
if file_size < HEADER_SIZE + FOOTER_SIZE {
return Err(ParquetError::oos(
"A parquet file must contain a header and footer with at least 12 bytes",
));
}
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
let mut buffer = Vec::with_capacity(default_end_len);
reader
.by_ref()
.take(default_end_len as u64)
.read_to_end(&mut buffer)?;
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(ParquetError::oos("The file must end with PAR1"));
}
let metadata_len = metadata_len(&buffer, default_end_len);
let metadata_len: u64 = metadata_len.try_into()?;
let footer_len = FOOTER_SIZE + metadata_len;
if footer_len > file_size {
return Err(ParquetError::oos(
"The footer size must be smaller or equal to the file's size",
));
}
let reader: &[u8] = if (footer_len as usize) < buffer.len() {
let remaining = buffer.len() - footer_len as usize;
&buffer[remaining..]
} else {
reader.seek(SeekFrom::End(-(footer_len as i64)))?;
buffer.clear();
buffer.try_reserve(footer_len as usize)?;
reader.take(footer_len).read_to_end(&mut buffer)?;
&buffer
};
let max_size = reader.len() * 2 + 1024;
deserialize_metadata(reader, max_size)
}
pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> ParquetResult<FileMetadata> {
let mut prot = TCompactInputProtocol::new(reader, max_size);
let metadata = TFileMetadata::read_from_in_protocol(&mut prot)?;
FileMetadata::try_from_thrift(metadata)
}