use std::io::{Read, Seek};
use std::path::PathBuf;
use arrow::datatypes::{ArrowSchemaRef, Metadata};
use arrow::io::ipc::read::{self, get_row_count};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::hive::materialize_hive_partitions;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::shared::{finish_reader, ArrowReader};
use crate::RowIndex;
#[derive(Clone, Debug, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions;
#[must_use]
pub struct IpcReader<R: MmapBytesReader> {
pub(super) reader: R,
rechunk: bool,
pub(super) n_rows: Option<usize>,
pub(super) projection: Option<Vec<usize>>,
pub(crate) columns: Option<Vec<String>>,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(PlSmallStr, Arc<str>)>,
pub(super) row_index: Option<RowIndex>,
pub(super) memory_map: Option<PathBuf>,
metadata: Option<read::FileMetadata>,
schema: Option<ArrowSchemaRef>,
}
fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
if let PolarsError::ComputeError(s) = &err {
if s.as_ref() == "memory_map can only be done on uncompressed IPC files" {
eprintln!(
"Could not memory_map compressed IPC file, defaulting to normal read. \
Toggle off 'memory_map' to silence this warning."
);
return Ok(());
}
}
Err(err)
}
impl<R: MmapBytesReader> IpcReader<R> {
fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
if self.metadata.is_none() {
let metadata = read::read_file_metadata(&mut self.reader)?;
self.schema = Some(metadata.schema.clone());
self.metadata = Some(metadata);
}
Ok(self.metadata.as_ref().unwrap())
}
pub fn schema(&mut self) -> PolarsResult<ArrowSchemaRef> {
self.get_metadata()?;
Ok(self.schema.as_ref().unwrap().clone())
}
pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
self.get_metadata()?;
Ok(self
.metadata
.as_ref()
.and_then(|meta| meta.custom_schema_metadata.clone()))
}
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}
pub fn with_hive_partition_columns(mut self, columns: Option<Vec<Series>>) -> Self {
self.hive_partition_columns = columns;
self
}
pub fn with_include_file_path(
mut self,
include_file_path: Option<(PlSmallStr, Arc<str>)>,
) -> Self {
self.include_file_path = include_file_path;
self
}
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn memory_mapped(mut self, path_buf: Option<PathBuf>) -> Self {
self.memory_map = path_buf;
self
}
#[cfg(feature = "lazy")]
pub fn finish_with_scan_ops(
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
verbose: bool,
) -> PolarsResult<DataFrame> {
if self.memory_map.is_some() && self.reader.to_file().is_some() {
if verbose {
eprintln!("memory map ipc file")
}
match self.finish_memmapped(predicate.clone()) {
Ok(df) => return Ok(df),
Err(err) => check_mmap_err(err)?,
}
}
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
}
let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
} else {
metadata.schema.clone()
};
let reader = read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
finish_reader(reader, rechunk, None, predicate, &schema, self.row_index)
}
}
impl<R: MmapBytesReader> ArrowReader for read::FileReader<R>
where
R: Read + Seek,
{
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
self.next().map_or(Ok(None), |v| v.map(Some))
}
}
impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
fn new(reader: R) -> Self {
IpcReader {
reader,
rechunk: true,
n_rows: None,
columns: None,
hive_partition_columns: None,
include_file_path: None,
projection: None,
row_index: None,
memory_map: None,
metadata: None,
schema: None,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let reader_schema = if let Some(ref schema) = self.schema {
schema.clone()
} else {
self.get_metadata()?.schema.clone()
};
let reader_schema = reader_schema.as_ref();
let hive_partition_columns = self.hive_partition_columns.take();
let include_file_path = self.include_file_path.take();
let (mut df, row_count) = (|| {
if self
.projection
.as_ref()
.map(|x| x.is_empty())
.unwrap_or(false)
{
return PolarsResult::Ok((
Default::default(),
get_row_count(&mut self.reader)? as usize,
));
}
if self.memory_map.is_some() && self.reader.to_file().is_some() {
match self.finish_memmapped(None) {
Ok(df) => {
let n = df.height();
return Ok((df, n));
},
Err(err) => check_mmap_err(err)?,
}
}
let rechunk = self.rechunk;
let schema = self.get_metadata()?.schema.clone();
if let Some(columns) = &self.columns {
let prj = columns_to_projection(columns, schema.as_ref())?;
self.projection = Some(prj);
}
let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(schema.as_ref(), projection))
} else {
schema
};
let metadata = self.get_metadata()?.clone();
let ipc_reader =
read::FileReader::new(self.reader, metadata, self.projection, self.n_rows);
let df = finish_reader(ipc_reader, rechunk, None, None, &schema, self.row_index)?;
let n = df.height();
Ok((df, n))
})()?;
if let Some(hive_cols) = hive_partition_columns {
materialize_hive_partitions(
&mut df,
reader_schema,
Some(hive_cols.as_slice()),
row_count,
);
};
if let Some((col, value)) = include_file_path {
unsafe {
df.with_column_unchecked(Column::new_scalar(
col,
Scalar::new(
DataType::String,
AnyValue::StringOwned(value.as_ref().into()),
),
row_count,
))
};
}
Ok(df)
}
}