polars_arrow/io/ipc/write/
stream.rsuse std::io::Write;
use std::sync::Arc;
use polars_error::{PolarsError, PolarsResult};
use super::super::IpcField;
use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions};
use super::common_sync::{write_continuation, write_message};
use super::{default_ipc_fields, schema_to_bytes};
use crate::array::Array;
use crate::datatypes::*;
use crate::record_batch::RecordBatchT;
pub struct StreamWriter<W: Write> {
writer: W,
write_options: WriteOptions,
finished: bool,
dictionary_tracker: DictionaryTracker,
custom_schema_metadata: Option<Arc<Metadata>>,
ipc_fields: Option<Vec<IpcField>>,
}
impl<W: Write> StreamWriter<W> {
pub fn new(writer: W, write_options: WriteOptions) -> Self {
Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
},
ipc_fields: None,
custom_schema_metadata: None,
}
}
pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
self.custom_schema_metadata = Some(custom_metadata);
}
pub fn start(
&mut self,
schema: &ArrowSchema,
ipc_fields: Option<Vec<IpcField>>,
) -> PolarsResult<()> {
self.ipc_fields = Some(if let Some(ipc_fields) = ipc_fields {
ipc_fields
} else {
default_ipc_fields(schema.iter_values())
});
let encoded_message = EncodedData {
ipc_message: schema_to_bytes(
schema,
self.ipc_fields.as_ref().unwrap(),
self.custom_schema_metadata.as_deref(),
),
arrow_data: vec![],
};
write_message(&mut self.writer, &encoded_message)?;
Ok(())
}
pub fn write(
&mut self,
columns: &RecordBatchT<Box<dyn Array>>,
ipc_fields: Option<&[IpcField]>,
) -> PolarsResult<()> {
if self.finished {
let io_err = std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Cannot write to a finished stream".to_string(),
);
return Err(PolarsError::from(io_err));
}
#[allow(clippy::or_fun_call)]
let fields = ipc_fields.unwrap_or(self.ipc_fields.as_ref().unwrap());
let (encoded_dictionaries, encoded_message) = encode_chunk(
columns,
fields,
&mut self.dictionary_tracker,
&self.write_options,
)?;
for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, &encoded_dictionary)?;
}
write_message(&mut self.writer, &encoded_message)?;
Ok(())
}
pub fn finish(&mut self) -> PolarsResult<()> {
write_continuation(&mut self.writer, 0)?;
self.finished = true;
Ok(())
}
pub fn into_inner(self) -> W {
self.writer
}
}