use std::borrow::Cow;
use std::collections::VecDeque;
use std::ops::{Deref, Range};
use arrow::array::BooleanArray;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowSchemaRef;
use polars_core::chunked_array::builder::NullChunkedBuilder;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::{config, POOL};
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{
self, ColumnChunkMetadata, FileMetadata, Filter, PhysicalType, RowGroupMetadata,
};
use rayon::prelude::*;
#[cfg(feature = "cloud")]
use super::async_impl::FetchRowGroupsFromObjectStore;
use super::mmap::{mmap_columns, ColumnStore};
use super::predicates::read_this_row_group;
use super::to_metadata::ToMetadata;
use super::utils::materialize_empty_df;
use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetadataRef;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::get_reader_bytes;
use crate::utils::slice::split_slice_at_file;
use crate::RowIndex;
#[cfg(debug_assertions)]
fn assert_dtypes(dtype: &ArrowDataType) {
use ArrowDataType as D;
match dtype {
D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),
D::Float16 => unreachable!(),
D::List(_) => unreachable!(),
D::Map(_, _) => unreachable!(),
D::Dictionary(_, dtype, _) => assert_dtypes(dtype),
D::Extension(_, dtype, _) => assert_dtypes(dtype),
D::LargeList(inner) => assert_dtypes(&inner.dtype),
D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),
D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),
_ => {},
}
}
fn should_copy_sortedness(dtype: &DataType) -> bool {
use DataType as D;
matches!(
dtype,
D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
)
}
pub fn try_set_sorted_flag(
series: &mut Series,
col_idx: usize,
sorting_map: &PlHashMap<usize, IsSorted>,
) {
if let Some(is_sorted) = sorting_map.get(&col_idx) {
if should_copy_sortedness(series.dtype()) {
if config::verbose() {
eprintln!(
"Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
series.name()
);
}
series.set_sorted_flag(*is_sorted);
}
}
}
pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
let capacity = md.sorting_columns().map_or(0, |s| s.len());
let mut sorting_map = PlHashMap::with_capacity(capacity);
if let Some(sorting_columns) = md.sorting_columns() {
for sorting in sorting_columns {
let prev_value = sorting_map.insert(
sorting.column_idx as usize,
if sorting.descending {
IsSorted::Descending
} else {
IsSorted::Ascending
},
);
debug_assert!(prev_value.is_none());
}
}
sorting_map
}
fn column_idx_to_series(
column_i: usize,
field_md: &[&ColumnChunkMetadata],
filter: Option<Filter>,
file_schema: &ArrowSchema,
store: &mmap::ColumnStore,
) -> PolarsResult<Series> {
let did_filter = filter.is_some();
let field = file_schema.get_at_index(column_i).unwrap().1;
#[cfg(debug_assertions)]
{
assert_dtypes(field.dtype())
}
let columns = mmap_columns(store, field_md);
let stats = columns
.iter()
.map(|(col_md, _)| col_md.statistics().transpose())
.collect::<ParquetResult<Vec<Option<Statistics>>>>();
let array = mmap::to_deserializer(columns, field.clone(), filter)?;
let mut series = Series::try_from((field, array))?;
use ArrowDataType as AD;
match field.dtype() {
AD::List(_) | AD::LargeList(_) | AD::Struct(_) | AD::FixedSizeList(_, _) => {
return Ok(series)
},
_ => {},
}
if did_filter {
return Ok(series);
}
let Ok(Some(stats)) = stats.map(|mut s| s.pop().flatten()) else {
return Ok(series);
};
let series_trait = series.as_ref();
macro_rules! match_dtypes_into_metadata {
($(($dtype:pat, $phystype:pat) => ($stats:ident, $pldtype:ty),)+) => {
match (series_trait.dtype(), stats.physical_type()) {
$(
($dtype, $phystype) => {
series.try_set_metadata(
ToMetadata::<$pldtype>::to_metadata(stats.$stats())
);
})+
_ => {},
}
};
}
use {DataType as D, PhysicalType as P};
match_dtypes_into_metadata! {
(D::Boolean, P::Boolean ) => (expect_as_boolean, BooleanType),
(D::UInt8, P::Int32 ) => (expect_as_int32, UInt8Type ),
(D::UInt16, P::Int32 ) => (expect_as_int32, UInt16Type ),
(D::UInt32, P::Int32 ) => (expect_as_int32, UInt32Type ),
(D::UInt64, P::Int64 ) => (expect_as_int64, UInt64Type ),
(D::Int8, P::Int32 ) => (expect_as_int32, Int8Type ),
(D::Int16, P::Int32 ) => (expect_as_int32, Int16Type ),
(D::Int32, P::Int32 ) => (expect_as_int32, Int32Type ),
(D::Int64, P::Int64 ) => (expect_as_int64, Int64Type ),
(D::Float32, P::Float ) => (expect_as_float, Float32Type),
(D::Float64, P::Double ) => (expect_as_double, Float64Type),
(D::String, P::ByteArray) => (expect_as_binary, StringType ),
(D::Binary, P::ByteArray) => (expect_as_binary, BinaryType ),
}
Ok(series)
}
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs(
store: &mmap::ColumnStore,
previous_row_count: &mut IdxSize,
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
projection: &[usize],
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
if projection.is_empty() {
if let Some(row_index) = row_index {
let placeholder =
NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), slice.1).finish();
return Ok(vec![DataFrame::new(vec![placeholder
.into_series()
.into_column()])?
.with_row_index(
row_index.name.clone(),
Some(row_index.offset + IdxSize::try_from(slice.0).unwrap()),
)?
.select(std::iter::once(row_index.name))?]);
}
}
use ParallelStrategy as S;
if parallel == S::Prefiltered {
if let Some(predicate) = predicate {
if let Some(live_variables) = predicate.live_variables() {
return rg_to_dfs_prefiltered(
store,
previous_row_count,
row_group_start,
row_group_end,
file_metadata,
schema,
live_variables,
predicate,
row_index,
projection,
use_statistics,
hive_partition_columns,
);
}
}
}
match parallel {
S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(
store,
previous_row_count,
row_group_start,
row_group_end,
slice,
file_metadata,
schema,
predicate,
row_index,
parallel,
projection,
use_statistics,
hive_partition_columns,
),
_ => rg_to_dfs_par_over_rg(
store,
row_group_start,
row_group_end,
previous_row_count,
slice,
file_metadata,
schema,
predicate,
row_index,
projection,
use_statistics,
hive_partition_columns,
),
}
}
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_prefiltered(
store: &mmap::ColumnStore,
previous_row_count: &mut IdxSize,
row_group_start: usize,
row_group_end: usize,
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
live_variables: Vec<PlSmallStr>,
predicate: &dyn PhysicalIoExpr,
row_index: Option<RowIndex>,
projection: &[usize],
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
if row_group_end > u32::MAX as usize {
polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX);
}
let mut row_offset = *previous_row_count;
let rg_offsets: Vec<IdxSize> = match row_index {
None => Vec::new(),
Some(_) => (row_group_start..row_group_end)
.map(|index| {
let md = &file_metadata.row_groups[index];
let current_offset = row_offset;
let current_row_count = md.num_rows() as IdxSize;
row_offset += current_row_count;
current_offset
})
.collect(),
};
let live_variables = live_variables
.iter()
.map(Deref::deref)
.collect::<PlHashSet<_>>();
let num_live_columns = live_variables.len();
let num_dead_columns = projection.len() - num_live_columns;
let mut projection_sorted = projection.to_vec();
projection_sorted.sort();
let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns);
for &i in projection_sorted.iter() {
let name = schema.get_at_index(i).unwrap().0.as_str();
if live_variables.contains(name) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
}
}
debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns);
debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns);
let mask_setting = PrefilterMaskSetting::init_from_env();
let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
(row_group_start..row_group_end)
.into_par_iter()
.map(|rg_idx| {
let md = &file_metadata.row_groups[rg_idx];
if use_statistics {
match read_this_row_group(Some(predicate), md, schema) {
Ok(false) => return Ok(None),
Ok(true) => {},
Err(e) => return Err(e),
}
}
let sorting_map = create_sorting_map(md);
let live_columns = (0..num_live_columns)
.into_par_iter()
.map(|i| {
let col_idx = live_idx_to_col_idx[i];
let (name, field) = schema.get_at_index(col_idx).unwrap();
let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};
let part = iter.collect::<Vec<_>>();
let mut series =
column_idx_to_series(col_idx, part.as_slice(), None, schema, store)?;
try_set_sorted_flag(&mut series, col_idx, &sorting_map);
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;
let md = &file_metadata.row_groups[rg_idx];
let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(rg_offsets[rg_idx] + rc.offset));
}
df = df.filter(mask)?;
let mut filter_mask = MutableBitmap::with_capacity(mask.len());
for chunk in mask.downcast_iter() {
match chunk.validity() {
None => filter_mask.extend_from_bitmap(chunk.values()),
Some(validity) => {
filter_mask.extend_from_bitmap(&(validity & chunk.values()))
},
}
}
let filter_mask = filter_mask.freeze();
debug_assert_eq!(md.num_rows(), filter_mask.len());
debug_assert_eq!(df.height(), filter_mask.set_bits());
if filter_mask.set_bits() == 0 {
return Ok(None);
}
if num_dead_columns == 0 {
return Ok(Some(df));
}
let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto)
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();
#[cfg(debug_assertions)]
{
let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
}
let n_rows_in_result = filter_mask.set_bits();
let mut dead_columns = (0..num_dead_columns)
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i];
let (name, field) = schema.get_at_index(col_idx).unwrap();
let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
n_rows_in_result,
&DataType::from_arrow(&field.dtype, true),
));
};
let field_md = iter.collect::<Vec<_>>();
let pre = || {
column_idx_to_series(
col_idx,
field_md.as_slice(),
Some(Filter::new_masked(filter_mask.clone())),
schema,
store,
)
};
let post = || {
let array = column_idx_to_series(
col_idx,
field_md.as_slice(),
None,
schema,
store,
)?;
debug_assert_eq!(array.len(), mask.len());
let mask_arr = BooleanArray::new(
ArrowDataType::Boolean,
filter_mask.clone(),
None,
);
let mask_arr = BooleanChunked::from(mask_arr);
array.filter(&mask_arr)
};
let mut series = if mask_setting.should_prefilter(
prefilter_cost,
&schema.get_at_index(col_idx).unwrap().1.dtype,
) {
pre()?
} else {
post()?
};
debug_assert_eq!(series.len(), filter_mask.set_bits());
try_set_sorted_flag(&mut series, col_idx, &sorting_map);
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;
debug_assert!(dead_columns.iter().all(|v| v.len() == df.height()));
let height = df.height();
let mut live_columns = df.take_columns();
assert_eq!(
live_columns.len() + dead_columns.len(),
projection_sorted.len()
);
let mut live_idx = 0;
let mut dead_idx = 0;
let columns = projection_sorted
.iter()
.map(|&i| {
let name = schema.get_at_index(i).unwrap().0.as_str();
if live_variables.contains(name) {
debug_assert!(live_idx < live_columns.len());
let column = unsafe { live_columns.as_ptr().add(live_idx).read() };
live_idx += 1;
column
} else {
debug_assert!(dead_idx < dead_columns.len());
let column = unsafe { dead_columns.as_ptr().add(dead_idx).read() };
dead_idx += 1;
column
}
})
.collect::<Vec<Column>>();
debug_assert_eq!(live_idx, live_columns.len());
debug_assert_eq!(dead_idx, dead_columns.len());
debug_assert_eq!(columns.len(), projection_sorted.len());
unsafe {
live_columns.set_len(0);
dead_columns.set_len(0);
}
let df = unsafe { DataFrame::new_no_checks(height, columns) };
PolarsResult::Ok(Some(df))
})
.collect::<PolarsResult<Vec<Option<DataFrame>>>>()
})?;
let dfs: Vec<DataFrame> = dfs.into_iter().flatten().collect();
let row_count: usize = dfs.iter().map(|df| df.height()).sum();
let row_count = IdxSize::try_from(row_count).map_err(|_| ROW_COUNT_OVERFLOW_ERR)?;
*previous_row_count = previous_row_count
.checked_add(row_count)
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;
Ok(dfs)
}
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_optionally_par_over_columns(
store: &mmap::ColumnStore,
previous_row_count: &mut IdxSize,
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
projection: &[usize],
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);
let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.sum();
let slice_end = slice.0 + slice.1;
for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
let rg_slice =
split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
let current_row_count = md.num_rows() as IdxSize;
if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
*previous_row_count += rg_slice.1 as IdxSize;
continue;
}
#[cfg(debug_assertions)]
{
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}
let sorting_map = create_sorting_map(md);
let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let (name, field) = schema.get_at_index(*column_i).unwrap();
let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};
let part = iter.collect::<Vec<_>>();
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)?;
try_set_sorted_flag(&mut series, *column_i, &sorting_map);
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()
})?
} else {
projection
.iter()
.map(|column_i| {
let (name, field) = schema.get_at_index(*column_i).unwrap();
let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};
let part = iter.collect::<Vec<_>>();
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)?;
try_set_sorted_flag(&mut series, *column_i, &sorting_map);
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?
};
let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };
if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(*previous_row_count + rc.offset));
}
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
apply_predicate(&mut df, predicate, true)?;
*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
polars_err!(
ComputeError: "Parquet file produces more than pow(2, 32) rows; \
consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
or set 'streaming'"
),
)?;
dfs.push(df);
if *previous_row_count as usize >= slice_end {
break;
}
}
Ok(dfs)
}
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_par_over_rg(
store: &mmap::ColumnStore,
row_group_start: usize,
row_group_end: usize,
previous_row_count: &mut IdxSize,
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
projection: &[usize],
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);
let mut n_rows_processed: usize = (0..row_group_start)
.map(|i| file_metadata.row_groups[i].num_rows())
.sum();
let slice_end = slice.0 + slice.1;
for i in row_group_start..row_group_end {
let row_count_start = *previous_row_count;
let rg_md = &file_metadata.row_groups[i];
let rg_slice =
split_slice_at_file(&mut n_rows_processed, rg_md.num_rows(), slice.0, slice_end);
*previous_row_count = previous_row_count
.checked_add(rg_slice.1 as IdxSize)
.ok_or(ROW_COUNT_OVERFLOW_ERR)?;
if rg_slice.1 == 0 {
continue;
}
row_groups.push((rg_md, rg_slice, row_count_start));
}
let dfs = POOL.install(|| {
row_groups
.into_par_iter()
.map(|(md, slice, row_count_start)| {
if slice.1 == 0 || use_statistics && !read_this_row_group(predicate, md, schema)? {
return Ok(None);
}
#[cfg(debug_assertions)]
{
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}
let sorting_map = create_sorting_map(md);
let columns = projection
.iter()
.map(|column_i| {
let (name, field) = schema.get_at_index(*column_i).unwrap();
let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};
let part = iter.collect::<Vec<_>>();
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
)?;
try_set_sorted_flag(&mut series, *column_i, &sorting_map);
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;
let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };
if let Some(rc) = &row_index {
df.with_row_index_mut(
rc.name.clone(),
Some(row_count_start as IdxSize + rc.offset),
);
}
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
slice.1,
);
apply_predicate(&mut df, predicate, false)?;
Ok(Some(df))
})
.collect::<PolarsResult<Vec<_>>>()
})?;
Ok(dfs.into_iter().flatten().collect())
}
#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
mut reader: R,
slice: (usize, usize),
projection: Option<&[usize]>,
reader_schema: &ArrowSchemaRef,
metadata: Option<FileMetadataRef>,
predicate: Option<&dyn PhysicalIoExpr>,
mut parallel: ParallelStrategy,
row_index: Option<RowIndex>,
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<DataFrame> {
if slice.1 == 0 {
return Ok(materialize_empty_df(
projection,
reader_schema,
hive_partition_columns,
row_index.as_ref(),
));
}
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;
let n_row_groups = file_metadata.row_groups.len();
let _sc = if n_row_groups > 1 {
#[cfg(feature = "dtype-categorical")]
{
Some(polars_core::StringCacheHolder::hold())
}
#[cfg(not(feature = "dtype-categorical"))]
{
Some(0u8)
}
} else {
None
};
let materialized_projection = projection
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));
if let Some(predicate) = predicate {
if std::env::var("POLARS_PARQUET_AUTO_PREFILTERED").is_ok_and(|v| v == "1")
&& predicate.live_variables().map_or(0, |v| v.len()) * n_row_groups
>= POOL.current_num_threads()
{
parallel = ParallelStrategy::Prefiltered;
}
}
if ParallelStrategy::Auto == parallel {
if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()
{
parallel = ParallelStrategy::RowGroups;
} else {
parallel = ParallelStrategy::Columns;
}
}
if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {
parallel = ParallelStrategy::None;
}
let reader = ReaderBytes::from(&mut reader);
let store = mmap::ColumnStore::Local(unsafe {
std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
});
let dfs = rg_to_dfs(
&store,
&mut 0,
0,
n_row_groups,
slice,
&file_metadata,
reader_schema,
predicate,
row_index.clone(),
parallel,
&materialized_projection,
use_statistics,
hive_partition_columns,
)?;
if dfs.is_empty() {
Ok(materialize_empty_df(
projection,
reader_schema,
hive_partition_columns,
row_index.as_ref(),
))
} else {
accumulate_dataframes_vertical(dfs)
}
}
pub struct FetchRowGroupsFromMmapReader(ReaderBytes<'static>);
impl FetchRowGroupsFromMmapReader {
pub fn new(mut reader: Box<dyn MmapBytesReader>) -> PolarsResult<Self> {
assert!(reader.to_file().is_some());
let reader_ptr = unsafe {
std::mem::transmute::<&mut dyn MmapBytesReader, &'static mut dyn MmapBytesReader>(
reader.as_mut(),
)
};
let reader_bytes = get_reader_bytes(reader_ptr)?;
Ok(FetchRowGroupsFromMmapReader(reader_bytes))
}
fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
Ok(mmap::ColumnStore::Local(self.0.to_memslice()))
}
}
pub enum RowGroupFetcher {
#[cfg(feature = "cloud")]
ObjectStore(FetchRowGroupsFromObjectStore),
Local(FetchRowGroupsFromMmapReader),
}
#[cfg(feature = "cloud")]
impl From<FetchRowGroupsFromObjectStore> for RowGroupFetcher {
fn from(value: FetchRowGroupsFromObjectStore) -> Self {
RowGroupFetcher::ObjectStore(value)
}
}
impl From<FetchRowGroupsFromMmapReader> for RowGroupFetcher {
fn from(value: FetchRowGroupsFromMmapReader) -> Self {
RowGroupFetcher::Local(value)
}
}
impl RowGroupFetcher {
async fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
match self {
RowGroupFetcher::Local(f) => f.fetch_row_groups(_row_groups),
#[cfg(feature = "cloud")]
RowGroupFetcher::ObjectStore(f) => f.fetch_row_groups(_row_groups).await,
}
}
}
pub(super) fn compute_row_group_range(
row_group_start: usize,
row_group_end: usize,
slice: (usize, usize),
row_groups: &[RowGroupMetadata],
) -> std::ops::Range<usize> {
let mut start = row_group_start;
let mut cum_rows: usize = (0..row_group_start).map(|i| row_groups[i].num_rows()).sum();
let row_group_end = row_groups.len().min(row_group_end);
loop {
if start == row_group_end {
break;
}
cum_rows += row_groups[start].num_rows();
if cum_rows >= slice.0 {
break;
}
start += 1;
}
let slice_end = slice.0 + slice.1;
let mut end = (1 + start).min(row_group_end);
loop {
if end == row_group_end {
break;
}
if cum_rows >= slice_end {
break;
}
cum_rows += row_groups[end].num_rows();
end += 1;
}
start..end
}
pub struct BatchedParquetReader {
#[allow(dead_code)]
row_group_fetcher: RowGroupFetcher,
slice: (usize, usize),
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetadataRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
row_group_offset: usize,
n_row_groups: usize,
chunks_fifo: VecDeque<DataFrame>,
parallel: ParallelStrategy,
chunk_size: usize,
use_statistics: bool,
hive_partition_columns: Option<Arc<[Series]>>,
include_file_path: Option<StringChunked>,
has_returned: bool,
}
impl BatchedParquetReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetadataRef,
schema: ArrowSchemaRef,
slice: (usize, usize),
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
chunk_size: usize,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(PlSmallStr, Arc<str>)>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
parallel = match parallel {
ParallelStrategy::Auto => {
if n_row_groups > projection.len() || n_row_groups > POOL.current_num_threads() {
ParallelStrategy::RowGroups
} else {
ParallelStrategy::Columns
}
},
_ => parallel,
};
if let (ParallelStrategy::Columns, true) = (parallel, projection.len() == 1) {
parallel = ParallelStrategy::None;
}
Ok(BatchedParquetReader {
row_group_fetcher,
slice,
projection,
schema,
metadata,
row_index,
rows_read: 0,
predicate,
row_group_offset: 0,
n_row_groups,
chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
parallel,
chunk_size,
use_statistics,
hive_partition_columns: hive_partition_columns.map(Arc::from),
include_file_path: include_file_path
.map(|(col, path)| StringChunked::full(col, &path, 1)),
has_returned: false,
})
}
pub fn schema(&self) -> &ArrowSchemaRef {
&self.schema
}
pub fn is_finished(&self) -> bool {
self.row_group_offset >= self.n_row_groups
}
pub fn finishes_this_batch(&self, n: usize) -> bool {
self.row_group_offset + n > self.n_row_groups
}
pub async fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
if self.rows_read as usize == self.slice.0 + self.slice.1 && self.has_returned {
return if self.chunks_fifo.is_empty() {
Ok(None)
} else {
let n_drainable = std::cmp::min(n, self.chunks_fifo.len());
Ok(Some(self.chunks_fifo.drain(..n_drainable).collect()))
};
}
let mut skipped_all_rgs = false;
if (self.rows_read as usize) < self.slice.0 + self.slice.1
&& self.row_group_offset < self.n_row_groups
&& self.chunks_fifo.len() < n
{
let row_group_range = compute_row_group_range(
self.row_group_offset,
self.row_group_offset + n,
self.slice,
&self.metadata.row_groups,
);
let store = self
.row_group_fetcher
.fetch_row_groups(row_group_range.clone())
.await?;
let mut dfs = match store {
ColumnStore::Local(_) => rg_to_dfs(
&store,
&mut self.rows_read,
row_group_range.start,
row_group_range.end,
self.slice,
&self.metadata,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
self.parallel,
&self.projection,
self.use_statistics,
self.hive_partition_columns.as_deref(),
),
#[cfg(feature = "async")]
ColumnStore::Fetched(b) => {
let store = ColumnStore::Fetched(b);
let (tx, rx) = tokio::sync::oneshot::channel();
let mut rows_read = self.rows_read;
let row_index = self.row_index.clone();
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
let hive_partition_columns = self.hive_partition_columns.clone();
let slice = self.slice;
let f = move || {
let dfs = rg_to_dfs(
&store,
&mut rows_read,
row_group_range.start,
row_group_range.end,
slice,
&metadata,
&schema,
predicate.as_deref(),
row_index,
parallel,
&projection,
use_statistics,
hive_partition_columns.as_deref(),
);
let _ = tx.send((dfs, rows_read));
};
if POOL.current_thread_index().is_some() {
tokio::task::block_in_place(f);
} else {
POOL.spawn(f);
};
let (dfs, rows_read) = rx.await.unwrap();
self.rows_read = rows_read;
dfs
},
}?;
if let Some(ca) = self.include_file_path.as_mut() {
let mut max_len = 0;
if self.projection.is_empty() {
max_len = self.metadata.num_rows;
} else {
for df in &dfs {
max_len = std::cmp::max(max_len, df.height());
}
}
if ca.len() < max_len {
*ca = ca.new_from_index(0, max_len);
}
for df in &mut dfs {
unsafe {
df.with_column_unchecked(
ca.slice(
0,
if !self.projection.is_empty() {
df.height()
} else {
self.metadata.num_rows
},
)
.into_column(),
)
};
}
}
self.row_group_offset += n;
if self.rows_read == 0 && dfs.is_empty() {
let mut df = materialize_empty_df(
Some(self.projection.as_ref()),
&self.schema,
self.hive_partition_columns.as_deref(),
self.row_index.as_ref(),
);
if let Some(ca) = &self.include_file_path {
unsafe {
df.with_column_unchecked(ca.clear().into_column());
}
};
return Ok(Some(vec![df]));
}
skipped_all_rgs |= dfs.is_empty();
for mut df in dfs {
let n = df.height() / self.chunk_size;
if n > 1 {
for df in split_df(&mut df, n, false) {
self.chunks_fifo.push_back(df)
}
} else {
self.chunks_fifo.push_back(df)
}
}
} else {
skipped_all_rgs = !self.has_returned;
};
if self.chunks_fifo.is_empty() {
if skipped_all_rgs {
self.has_returned = true;
let mut df = materialize_empty_df(
Some(self.projection.as_ref()),
&self.schema,
self.hive_partition_columns.as_deref(),
self.row_index.as_ref(),
);
if let Some(ca) = &self.include_file_path {
unsafe {
df.with_column_unchecked(ca.clear().into_column());
}
};
Ok(Some(vec![df]))
} else {
Ok(None)
}
} else {
let mut chunks = Vec::with_capacity(n);
let mut i = 0;
while let Some(df) = self.chunks_fifo.pop_front() {
chunks.push(df);
i += 1;
if i == n {
break;
}
}
self.has_returned = true;
Ok(Some(chunks))
}
}
#[cfg(feature = "async")]
pub fn iter(self, batches_per_iter: usize) -> BatchedParquetIter {
BatchedParquetIter {
batches_per_iter,
inner: self,
current_batch: vec![].into_iter(),
}
}
}
#[cfg(feature = "async")]
pub struct BatchedParquetIter {
batches_per_iter: usize,
inner: BatchedParquetReader,
current_batch: std::vec::IntoIter<DataFrame>,
}
#[cfg(feature = "async")]
impl BatchedParquetIter {
pub(crate) async fn next_(&mut self) -> Option<PolarsResult<DataFrame>> {
match self.current_batch.next() {
Some(df) => Some(Ok(df)),
None => match self.inner.next_batches(self.batches_per_iter).await {
Err(e) => Some(Err(e)),
Ok(opt_batch) => {
let batch = opt_batch?;
self.current_batch = batch.into_iter();
self.current_batch.next().map(Ok)
},
},
}
}
}
pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
let num_edges = mask.num_edges() as f64;
let rg_len = mask.len() as f64;
(num_edges / rg_len).clamp(0.0, 1.0)
}
pub enum PrefilterMaskSetting {
Auto,
Pre,
Post,
}
impl PrefilterMaskSetting {
pub fn init_from_env() -> Self {
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
"auto" => Self::Auto,
"pre" => Self::Pre,
"post" => Self::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
})
}
pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
match self {
Self::Auto => {
let is_nested = dtype.is_nested();
(is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02)
},
Self::Pre => true,
Self::Post => false,
}
}
}