polars_parquet/parquet/metadata/
column_chunk_metadata.rsuse polars_parquet_format::{ColumnChunk, ColumnMetaData, Encoding};
use super::column_descriptor::ColumnDescriptor;
use crate::parquet::compression::Compression;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::schema::types::PhysicalType;
use crate::parquet::statistics::Statistics;
#[cfg(feature = "serde_types")]
mod serde_types {
pub use std::io::Cursor;
pub use polars_parquet_format::thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol,
};
pub use serde::de::Error as DeserializeError;
pub use serde::ser::Error as SerializeError;
pub use serde::{Deserialize, Deserializer, Serialize, Serializer};
}
#[cfg(feature = "serde_types")]
use serde_types::*;
#[derive(Debug)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub struct ColumnChunkMetadata {
#[cfg_attr(
feature = "serde_types",
serde(serialize_with = "serialize_column_chunk")
)]
#[cfg_attr(
feature = "serde_types",
serde(deserialize_with = "deserialize_column_chunk")
)]
column_chunk: ColumnChunk,
column_descr: ColumnDescriptor,
}
#[cfg(feature = "serde_types")]
fn serialize_column_chunk<S>(
column_chunk: &ColumnChunk,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut buf = vec![];
let cursor = Cursor::new(&mut buf[..]);
let mut protocol = TCompactOutputProtocol::new(cursor);
column_chunk
.write_to_out_protocol(&mut protocol)
.map_err(S::Error::custom)?;
serializer.serialize_bytes(&buf)
}
#[cfg(feature = "serde_types")]
fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>
where
D: Deserializer<'de>,
{
let buf = Vec::<u8>::deserialize(deserializer)?;
let mut cursor = Cursor::new(&buf[..]);
let mut protocol = TCompactInputProtocol::new(&mut cursor, usize::MAX);
ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)
}
impl ColumnChunkMetadata {
pub fn new(column_chunk: ColumnChunk, column_descr: ColumnDescriptor) -> Self {
Self {
column_chunk,
column_descr,
}
}
pub fn file_path(&self) -> &Option<String> {
&self.column_chunk.file_path
}
pub fn file_offset(&self) -> i64 {
self.column_chunk.file_offset
}
pub fn column_chunk(&self) -> &ColumnChunk {
&self.column_chunk
}
pub fn metadata(&self) -> &ColumnMetaData {
self.column_chunk.meta_data.as_ref().unwrap()
}
pub fn descriptor(&self) -> &ColumnDescriptor {
&self.column_descr
}
pub fn physical_type(&self) -> PhysicalType {
self.column_descr.descriptor.primitive_type.physical_type
}
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
self.metadata().statistics.as_ref().map(|x| {
Statistics::deserialize(x, self.column_descr.descriptor.primitive_type.clone())
})
}
pub fn num_values(&self) -> i64 {
self.metadata().num_values
}
pub fn compression(&self) -> Compression {
self.metadata().codec.try_into().unwrap()
}
pub fn compressed_size(&self) -> i64 {
self.metadata().total_compressed_size
}
pub fn uncompressed_size(&self) -> i64 {
self.metadata().total_uncompressed_size
}
pub fn data_page_offset(&self) -> i64 {
self.metadata().data_page_offset
}
pub fn has_index_page(&self) -> bool {
self.metadata().index_page_offset.is_some()
}
pub fn index_page_offset(&self) -> Option<i64> {
self.metadata().index_page_offset
}
pub fn dictionary_page_offset(&self) -> Option<i64> {
self.metadata().dictionary_page_offset
}
pub fn column_encoding(&self) -> &Vec<Encoding> {
&self.metadata().encodings
}
pub fn byte_range(&self) -> core::ops::Range<u64> {
column_metadata_byte_range(self.metadata())
}
pub(crate) fn try_from_thrift(
column_descr: ColumnDescriptor,
column_chunk: ColumnChunk,
) -> ParquetResult<Self> {
if let Some(meta) = &column_chunk.meta_data {
let _: u64 = meta.total_compressed_size.try_into()?;
if let Some(offset) = meta.dictionary_page_offset {
let _: u64 = offset.try_into()?;
}
let _: u64 = meta.data_page_offset.try_into()?;
let _: Compression = meta.codec.try_into()?;
} else {
return Err(ParquetError::oos("Column chunk requires metadata"));
}
Ok(Self {
column_chunk,
column_descr,
})
}
pub fn into_thrift(self) -> ColumnChunk {
self.column_chunk
}
}
pub(super) fn column_metadata_byte_range(
column_metadata: &ColumnMetaData,
) -> core::ops::Range<u64> {
let offset = if let Some(dict_page_offset) = column_metadata.dictionary_page_offset {
dict_page_offset as u64
} else {
column_metadata.data_page_offset as u64
};
let len = column_metadata.total_compressed_size as u64;
offset..offset.checked_add(len).unwrap()
}