use polars_core::frame::column::ScalarColumn;
use polars_core::frame::DataFrame;
use polars_core::prelude::Column;
use polars_core::series::Series;
pub(crate) fn materialize_hive_partitions<D>(
df: &mut DataFrame,
reader_schema: &polars_schema::Schema<D>,
hive_partition_columns: Option<&[Series]>,
num_rows: usize,
) {
if let Some(hive_columns) = hive_partition_columns {
if hive_columns.is_empty() {
return;
}
let hive_columns_sc = hive_columns
.iter()
.map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
.collect::<Vec<Column>>();
if reader_schema.index_of(hive_columns[0].name()).is_none() || df.width() == 0 {
if df.width() == 0 {
unsafe { df.set_height(num_rows) };
}
unsafe { df.hstack_mut_unchecked(&hive_columns_sc) };
return;
}
let out_width: usize = df.width() + hive_columns.len();
let df_columns = df.get_columns();
let mut out_columns = Vec::with_capacity(out_width);
let mut series_arr = [df_columns, hive_columns_sc.as_slice()];
let mut schema_idx_arr = [
reader_schema.index_of(series_arr[0][0].name()).unwrap(),
reader_schema.index_of(series_arr[1][0].name()).unwrap(),
];
loop {
let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] {
0
} else {
1
};
out_columns.push(series_arr[arg_min][0].clone());
series_arr[arg_min] = &series_arr[arg_min][1..];
if series_arr[arg_min].is_empty() {
break;
}
let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else {
break;
};
schema_idx_arr[arg_min] = i;
}
out_columns.extend_from_slice(series_arr[0]);
out_columns.extend_from_slice(series_arr[1]);
*df = unsafe { DataFrame::new_no_checks(num_rows, out_columns) };
}
}