use std::path::PathBuf;
use polars_core::prelude::*;
use crate::prelude::*;
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub struct FileFingerPrint {
pub paths: Arc<[PathBuf]>,
pub predicate: Option<Expr>,
pub slice: (usize, Option<usize>),
}
#[allow(clippy::type_complexity)]
fn process_with_columns(
paths: &Arc<[PathBuf]>,
with_columns: Option<&Vec<String>>,
predicate: Option<Expr>,
slice: (usize, Option<usize>),
file_count_and_column_union: &mut PlHashMap<FileFingerPrint, (FileCount, PlIndexSet<String>)>,
schema: &Schema,
) {
let cols = file_count_and_column_union
.entry(FileFingerPrint {
paths: paths.clone(),
predicate,
slice,
})
.or_insert_with(|| {
(
0,
PlIndexSet::with_capacity_and_hasher(32, Default::default()),
)
});
cols.0 += 1;
match with_columns {
Some(with_columns) => cols.1.extend(with_columns.iter().cloned()),
None => {
cols.1.extend(schema.iter_names().map(|t| t.to_string()));
},
}
}
#[allow(clippy::type_complexity)]
pub fn collect_fingerprints(
root: Node,
fps: &mut Vec<FileFingerPrint>,
lp_arena: &Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
) {
use ALogicalPlan::*;
match lp_arena.get(root) {
Scan {
paths,
file_options: options,
predicate,
scan_type,
..
} => {
let slice = (scan_type.skip_rows(), options.n_rows);
let predicate = predicate.map(|node| node_to_expr(node, expr_arena));
let fp = FileFingerPrint {
paths: paths.clone(),
predicate,
slice,
};
fps.push(fp);
},
lp => {
for input in lp.get_inputs() {
collect_fingerprints(input, fps, lp_arena, expr_arena)
}
},
}
}
#[allow(clippy::type_complexity)]
pub fn find_column_union_and_fingerprints(
root: Node,
columns: &mut PlHashMap<FileFingerPrint, (FileCount, PlIndexSet<String>)>,
lp_arena: &Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
) {
use ALogicalPlan::*;
match lp_arena.get(root) {
Scan {
paths,
file_options: options,
predicate,
file_info,
scan_type,
..
} => {
let slice = (scan_type.skip_rows(), options.n_rows);
let predicate = predicate.map(|node| node_to_expr(node, expr_arena));
process_with_columns(
paths,
options.with_columns.as_deref(),
predicate,
slice,
columns,
&file_info.schema,
);
},
lp => {
for input in lp.get_inputs() {
find_column_union_and_fingerprints(input, columns, lp_arena, expr_arena)
}
},
}
}
pub struct FileCacher {
file_count_and_column_union: PlHashMap<FileFingerPrint, (FileCount, Arc<Vec<String>>)>,
}
impl FileCacher {
pub(crate) fn new(
columns: PlHashMap<FileFingerPrint, (FileCount, PlIndexSet<String>)>,
) -> Self {
let new_columns_mapping = columns
.into_iter()
.map(|(k, agg)| {
let file_count = agg.0;
let columns = agg.1.iter().cloned().collect::<Vec<_>>();
(k, (file_count, Arc::new(columns)))
})
.collect();
Self {
file_count_and_column_union: new_columns_mapping,
}
}
fn finish_rewrite(
&self,
mut lp: ALogicalPlan,
expr_arena: &mut Arena<AExpr>,
lp_arena: &mut Arena<ALogicalPlan>,
finger_print: &FileFingerPrint,
with_columns: Option<Arc<Vec<String>>>,
behind_cache: bool,
) -> ALogicalPlan {
if let Some(mut with_columns) = with_columns {
let do_projection = match self.file_count_and_column_union.get(finger_print) {
Some((_file_count, agg_columns)) => with_columns.len() < agg_columns.len(),
None => true,
};
if !behind_cache && do_projection {
let node = lp_arena.add(lp);
let projections = std::mem::take(Arc::make_mut(&mut with_columns))
.into_iter()
.map(|s| expr_arena.add(AExpr::Column(Arc::from(s))))
.collect();
lp = ALogicalPlanBuilder::new(node, expr_arena, lp_arena)
.project(projections, Default::default())
.build();
}
}
lp
}
fn extract_columns_and_count(
&mut self,
finger_print: &FileFingerPrint,
) -> Option<(FileCount, Arc<Vec<String>>)> {
self.file_count_and_column_union.get(finger_print).cloned()
}
pub(crate) fn assign_unions(
&mut self,
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
) {
scratch.clear();
let mut stack = Vec::with_capacity(lp_arena.len() / 3 + 1);
stack.push((root, false));
while let Some((root, behind_cache)) = stack.pop() {
let lp = lp_arena.take(root);
match lp {
ALogicalPlan::Scan {
paths,
file_info,
predicate,
output_schema,
scan_type,
file_options: mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
paths,
predicate: predicate_expr,
slice: (scan_type.skip_rows(), options.n_rows),
};
let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != file_info.schema.len() {
Some(t.1)
} else {
None
}
});
options.with_columns = with_columns;
let lp = ALogicalPlan::Scan {
paths: finger_print.paths.clone(),
file_info,
output_schema,
predicate,
file_options: options.clone(),
scan_type: scan_type.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
},
lp => {
let behind_cache = behind_cache || matches!(&lp, ALogicalPlan::Cache { .. });
lp.copy_inputs(scratch);
while let Some(input) = scratch.pop() {
stack.push((input, behind_cache))
}
lp_arena.replace(root, lp);
},
}
}
scratch.clear();
}
}