polars_parquet/parquet/write/
file.rsuse std::io::Write;
use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;
use polars_parquet_format::RowGroup;
use super::indexes::{write_column_index, write_offset_index};
use super::page::PageWriteSpec;
use super::row_group::write_row_group;
use super::{RowGroupIterColumns, WriteOptions};
use crate::parquet::error::{ParquetError, ParquetResult};
pub use crate::parquet::metadata::KeyValue;
use crate::parquet::metadata::{SchemaDescriptor, ThriftFileMetadata};
use crate::parquet::write::State;
use crate::parquet::{FOOTER_SIZE, PARQUET_MAGIC};
pub(super) fn start_file<W: Write>(writer: &mut W) -> ParquetResult<u64> {
writer.write_all(&PARQUET_MAGIC)?;
Ok(PARQUET_MAGIC.len() as u64)
}
pub(super) fn end_file<W: Write>(
mut writer: &mut W,
metadata: &ThriftFileMetadata,
) -> ParquetResult<u64> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
let metadata_bytes = metadata_len.to_le_bytes();
let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
(0..4).for_each(|i| {
footer_buffer[i] = metadata_bytes[i];
});
(&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
writer.write_all(&footer_buffer)?;
writer.flush()?;
Ok(metadata_len as u64 + FOOTER_SIZE)
}
fn create_column_orders(schema_desc: &SchemaDescriptor) -> Vec<polars_parquet_format::ColumnOrder> {
(0..schema_desc.columns().len())
.map(|_| {
polars_parquet_format::ColumnOrder::TYPEORDER(
polars_parquet_format::TypeDefinedOrder {},
)
})
.collect()
}
pub struct FileWriter<W: Write> {
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
offset: u64,
row_groups: Vec<RowGroup>,
page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
state: State,
metadata: Option<ThriftFileMetadata>,
}
pub fn write_metadata_sidecar<W: Write>(
writer: &mut W,
metadata: &ThriftFileMetadata,
) -> ParquetResult<u64> {
let mut len = start_file(writer)?;
len += end_file(writer, metadata)?;
Ok(len)
}
impl<W: Write> FileWriter<W> {
pub fn options(&self) -> &WriteOptions {
&self.options
}
pub fn schema(&self) -> &SchemaDescriptor {
&self.schema
}
pub fn metadata(&self) -> Option<&ThriftFileMetadata> {
self.metadata.as_ref()
}
}
impl<W: Write> FileWriter<W> {
pub fn new(
writer: W,
schema: SchemaDescriptor,
options: WriteOptions,
created_by: Option<String>,
) -> Self {
Self {
writer,
schema,
options,
created_by,
offset: 0,
row_groups: vec![],
page_specs: vec![],
state: State::Initialised,
metadata: None,
}
}
fn start(&mut self) -> ParquetResult<()> {
if self.offset == 0 {
self.offset = start_file(&mut self.writer)?;
self.state = State::Started;
Ok(())
} else {
Err(ParquetError::InvalidParameter(
"Start cannot be called twice".to_string(),
))
}
}
pub fn write<E>(&mut self, row_group: RowGroupIterColumns<'_, E>) -> ParquetResult<()>
where
ParquetError: From<E>,
E: std::error::Error,
{
if self.offset == 0 {
self.start()?;
}
let ordinal = self.row_groups.len();
let (group, specs, size) = write_row_group(
&mut self.writer,
self.offset,
self.schema.columns(),
row_group,
ordinal,
)?;
self.offset += size;
self.row_groups.push(group);
self.page_specs.push(specs);
Ok(())
}
pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> ParquetResult<u64> {
if self.offset == 0 {
self.start()?;
}
if self.state != State::Started {
return Err(ParquetError::InvalidParameter(
"End cannot be called twice".to_string(),
));
}
let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
if self.options.write_statistics {
self.row_groups
.iter_mut()
.zip(self.page_specs.iter())
.try_for_each(|(group, pages)| {
group.columns.iter_mut().zip(pages.iter()).try_for_each(
|(column, pages)| {
let offset = self.offset;
column.column_index_offset = Some(offset as i64);
self.offset += write_column_index(&mut self.writer, pages)?;
let length = self.offset - offset;
column.column_index_length = Some(length as i32);
ParquetResult::Ok(())
},
)?;
ParquetResult::Ok(())
})?;
};
self.row_groups
.iter_mut()
.zip(self.page_specs.iter())
.try_for_each(|(group, pages)| {
group
.columns
.iter_mut()
.zip(pages.iter())
.try_for_each(|(column, pages)| {
let offset = self.offset;
column.offset_index_offset = Some(offset as i64);
self.offset += write_offset_index(&mut self.writer, pages)?;
column.offset_index_length = Some((self.offset - offset) as i32);
ParquetResult::Ok(())
})?;
ParquetResult::Ok(())
})?;
let metadata = ThriftFileMetadata::new(
self.options.version.into(),
self.schema.clone().into_thrift(),
num_rows,
self.row_groups.clone(),
key_value_metadata,
self.created_by.clone(),
Some(create_column_orders(&self.schema)),
None,
None,
);
let len = end_file(&mut self.writer, &metadata)?;
self.state = State::Finished;
self.metadata = Some(metadata);
Ok(self.offset + len)
}
pub fn into_inner(self) -> W {
self.writer
}
pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetadata) {
(self.writer, self.metadata.expect("File to have ended"))
}
}