use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use super::{
ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::common::can_project;
use crate::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan,
};
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, plan_err, project_schema, DataFusionError, Result,
};
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr,
};
use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
use log::trace;
#[derive(Debug, Clone)]
pub struct FilterExec {
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
default_selectivity: u8,
cache: PlanProperties,
projection: Option<Vec<usize>>,
}
impl FilterExec {
pub fn try_new(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
match predicate.data_type(input.schema().as_ref())? {
DataType::Boolean => {
let default_selectivity = 20;
let cache = Self::compute_properties(
&input,
&predicate,
default_selectivity,
None,
)?;
Ok(Self {
predicate,
input: Arc::clone(&input),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity,
cache,
projection: None,
})
}
other => {
plan_err!("Filter predicate must return BOOLEAN values, got {other:?}")
}
}
}
pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!(
"Default filter selectivity value needs to be less than or equal to 100"
);
}
self.default_selectivity = default_selectivity;
Ok(self)
}
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> {
can_project(&self.schema(), projection.as_ref())?;
let projection = match projection {
Some(projection) => match &self.projection {
Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
None => Some(projection),
},
None => None,
};
let cache = Self::compute_properties(
&self.input,
&self.predicate,
self.default_selectivity,
projection.as_ref(),
)?;
Ok(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache,
projection,
})
}
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
pub fn default_selectivity(&self) -> u8 {
self.default_selectivity
}
pub fn projection(&self) -> Option<&Vec<usize>> {
self.projection.as_ref()
}
fn statistics_helper(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
) -> Result<Statistics> {
let input_stats = input.statistics()?;
let schema = input.schema();
if !check_support(predicate, &schema) {
let selectivity = default_selectivity as f64 / 100.0;
let mut stats = input_stats.to_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
.total_byte_size
.with_estimated_selectivity(selectivity);
return Ok(stats);
}
let num_rows = input_stats.num_rows;
let total_byte_size = input_stats.total_byte_size;
let input_analysis_ctx = AnalysisContext::try_from_statistics(
&input.schema(),
&input_stats.column_statistics,
)?;
let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?;
let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
let num_rows = num_rows.with_estimated_selectivity(selectivity);
let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity);
let column_statistics = collect_new_statistics(
&input_stats.column_statistics,
analysis_ctx.boundaries,
);
Ok(Statistics {
num_rows,
total_byte_size,
column_statistics,
})
}
fn extend_constants(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
) -> Vec<ConstExpr> {
let mut res_constants = Vec::new();
let input_eqs = input.equivalence_properties();
let conjunctions = split_conjunction(predicate);
for conjunction in conjunctions {
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
if binary.op() == &Operator::Eq {
if input_eqs.is_expr_constant(binary.left()) {
res_constants.push(
ConstExpr::from(binary.right()).with_across_partitions(true),
)
} else if input_eqs.is_expr_constant(binary.right()) {
res_constants.push(
ConstExpr::from(binary.left()).with_across_partitions(true),
)
}
}
}
}
res_constants
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
default_selectivity: u8,
projection: Option<&Vec<usize>>,
) -> Result<PlanProperties> {
let stats = Self::statistics_helper(input, predicate, default_selectivity)?;
let mut eq_properties = input.equivalence_properties().clone();
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
for (lhs, rhs) in equal_pairs {
eq_properties.add_equal_conditions(lhs, rhs)?
}
let constants = collect_columns(predicate)
.into_iter()
.filter(|column| stats.column_statistics[column.index()].is_singleton())
.map(|column| {
let expr = Arc::new(column) as _;
ConstExpr::new(expr).with_across_partitions(true)
});
eq_properties = eq_properties.with_constants(constants);
eq_properties =
eq_properties.with_constants(Self::extend_constants(input, predicate));
let mut output_partitioning = input.output_partitioning().clone();
if let Some(projection) = projection {
let schema = eq_properties.schema();
let projection_mapping = ProjectionMapping::from_indices(projection, schema)?;
let out_schema = project_schema(schema, Some(projection))?;
output_partitioning =
output_partitioning.project(&projection_mapping, &eq_properties);
eq_properties = eq_properties.project(&projection_mapping, out_schema);
}
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
input.execution_mode(),
))
}
}
impl DisplayAs for FilterExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let display_projections = if let Some(projection) =
self.projection.as_ref()
{
format!(
", projection=[{}]",
projection
.iter()
.map(|index| format!(
"{}@{}",
self.input.schema().fields().get(*index).unwrap().name(),
index
))
.collect::<Vec<_>>()
.join(", ")
)
} else {
"".to_string()
};
write!(f, "FilterExec: {}{}", self.predicate, display_projections)
}
}
}
}
impl ExecutionPlan for FilterExec {
fn name(&self) -> &'static str {
"FilterExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0))
.and_then(|e| {
let selectivity = e.default_selectivity();
e.with_default_selectivity(selectivity)
})
.and_then(|e| e.with_projection(self.projection().cloned()))
.map(|e| Arc::new(e) as _)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.schema(),
predicate: Arc::clone(&self.predicate),
input: self.input.execute(partition, context)?,
baseline_metrics,
projection: self.projection.clone(),
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
let stats = Self::statistics_helper(
&self.input,
self.predicate(),
self.default_selectivity,
)?;
Ok(stats.project(self.projection.as_ref()))
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::LowerEqual
}
}
fn collect_new_statistics(
input_column_stats: &[ColumnStatistics],
analysis_boundaries: Vec<ExprBoundaries>,
) -> Vec<ColumnStatistics> {
analysis_boundaries
.into_iter()
.enumerate()
.map(
|(
idx,
ExprBoundaries {
interval,
distinct_count,
..
},
)| {
let (lower, upper) = interval.into_bounds();
let (min_value, max_value) = if lower.eq(&upper) {
(Precision::Exact(lower), Precision::Exact(upper))
} else {
(Precision::Inexact(lower), Precision::Inexact(upper))
};
ColumnStatistics {
null_count: input_column_stats[idx].null_count.to_inexact(),
max_value,
min_value,
distinct_count: distinct_count.to_inexact(),
}
},
)
.collect()
}
struct FilterExecStream {
schema: SchemaRef,
predicate: Arc<dyn PhysicalExpr>,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
projection: Option<Vec<usize>>,
}
pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
) -> Result<RecordBatch> {
filter_and_project(batch, predicate, None, &batch.schema())
}
fn filter_and_project(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
projection: Option<&Vec<usize>>,
output_schema: &SchemaRef,
) -> Result<RecordBatch> {
predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
Ok(match (as_boolean_array(&array), projection) {
(Ok(filter_array), None) => filter_record_batch(batch, filter_array)?,
(Ok(filter_array), Some(projection)) => {
let projected_columns = projection
.iter()
.map(|i| Arc::clone(batch.column(*i)))
.collect();
let projected_batch = RecordBatch::try_new(
Arc::clone(output_schema),
projected_columns,
)?;
filter_record_batch(&projected_batch, filter_array)?
}
(Err(_), _) => {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
);
}
})
})
}
impl Stream for FilterExecStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll;
loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = self.baseline_metrics.elapsed_compute().timer();
let filtered_batch = filter_and_project(
&batch,
&self.predicate,
self.projection.as_ref(),
&self.schema,
)?;
timer.done();
if filtered_batch.num_rows() == 0 {
continue;
}
poll = Poll::Ready(Some(Ok(filtered_batch)));
break;
}
value => {
poll = Poll::Ready(value);
break;
}
}
}
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
impl RecordBatchStream for FilterExecStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let predicates = split_conjunction(predicate);
predicates.into_iter().for_each(|p| {
if let Some(binary) = p.as_any().downcast_ref::<BinaryExpr>() {
match binary.op() {
Operator::Eq => {
eq_predicate_columns.push((binary.left(), binary.right()))
}
Operator::NotEq => {
ne_predicate_columns.push((binary.left(), binary.right()))
}
_ => {}
}
}
});
(eq_predicate_columns, ne_predicate_columns)
}
pub type PhysicalExprPairRef<'a> = (&'a Arc<dyn PhysicalExpr>, &'a Arc<dyn PhysicalExpr>);
pub type EqualAndNonEqual<'a> =
(Vec<PhysicalExprPairRef<'a>>, Vec<PhysicalExprPairRef<'a>>);
#[cfg(test)]
mod tests {
use super::*;
use crate::empty::EmptyExec;
use crate::expressions::*;
use crate::test;
use crate::test::exec::StatisticsExec;
use arrow::datatypes::{Field, Schema};
use arrow_schema::{UnionFields, UnionMode};
use datafusion_common::ScalarValue;
#[tokio::test]
async fn collect_columns_predicates() -> Result<()> {
let schema = test::aggr_test_schema();
let predicate: Arc<dyn PhysicalExpr> = binary(
binary(
binary(col("c2", &schema)?, Operator::GtEq, lit(1u32), &schema)?,
Operator::And,
binary(col("c2", &schema)?, Operator::Eq, lit(4u32), &schema)?,
&schema,
)?,
Operator::And,
binary(
binary(
col("c2", &schema)?,
Operator::Eq,
col("c9", &schema)?,
&schema,
)?,
Operator::And,
binary(
col("c1", &schema)?,
Operator::NotEq,
col("c13", &schema)?,
&schema,
)?,
&schema,
)?,
&schema,
)?;
let (equal_pairs, ne_pairs) = collect_columns_from_predicate(&predicate);
assert_eq!(2, equal_pairs.len());
assert!(equal_pairs[0].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[0].1.eq(&lit(4u32)));
assert!(equal_pairs[1].0.eq(&col("c2", &schema)?));
assert!(equal_pairs[1].1.eq(&col("c9", &schema)?));
assert_eq!(1, ne_pairs.len());
assert!(ne_pairs[0].0.eq(&col("c1", &schema)?));
assert!(ne_pairs[0].1.eq(&col("c13", &schema)?));
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_basic_expr() -> Result<()> {
let bytes_per_row = 4;
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
total_byte_size: Precision::Inexact(100 * bytes_per_row),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
}],
},
schema.clone(),
));
let predicate: Arc<dyn PhysicalExpr> =
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(25));
assert_eq!(
statistics.total_byte_size,
Precision::Inexact(25 * bytes_per_row)
);
assert_eq!(
statistics.column_statistics,
vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
}]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_column_level_nested() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
column_statistics: vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
}],
total_byte_size: Precision::Absent,
},
schema.clone(),
));
let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
sub_filter,
)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(16));
assert_eq!(
statistics.column_statistics,
vec![ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
}]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(100),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
..Default::default()
},
],
total_byte_size: Precision::Absent,
},
schema.clone(),
));
let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);
let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
a_lte_25,
)?);
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
b_gt_5,
)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(2));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(25))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(46))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(50))),
..Default::default()
}
]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema.clone(),
));
let predicate: Arc<dyn PhysicalExpr> =
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Absent);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_multiple_columns() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Float32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
max_value: Precision::Inexact(ScalarValue::Float32(Some(1100.0))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(53)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(3)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Float32(Some(1075.0)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Column::new("b", 1)),
)),
)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(134));
assert_eq!(statistics.total_byte_size, Precision::Inexact(533));
let exp_col_stats = vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(4))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(53))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Float32(Some(1000.0))),
max_value: Precision::Inexact(ScalarValue::Float32(Some(1075.0))),
..Default::default()
},
];
let _ = exp_col_stats
.into_iter()
.zip(statistics.column_statistics)
.map(|(expected, actual)| {
if let Some(val) = actual.min_value.get_value() {
if val.data_type().is_floating() {
let actual_min = actual.min_value.get_value().unwrap();
let actual_max = actual.max_value.get_value().unwrap();
let expected_min = expected.min_value.get_value().unwrap();
let expected_max = expected.max_value.get_value().unwrap();
let eps = ScalarValue::Float32(Some(1e-6));
assert!(actual_min.sub(expected_min).unwrap() < eps);
assert!(actual_min.sub(expected_min).unwrap() < eps);
assert!(actual_max.sub(expected_max).unwrap() < eps);
assert!(actual_max.sub(expected_max).unwrap() < eps);
} else {
assert_eq!(actual, expected);
}
} else {
assert_eq!(actual, expected);
}
});
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_full_selective() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::LtEq,
Arc::new(Column::new("b", 1)),
)),
));
let expected = input.statistics()?.column_statistics;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(1000));
assert_eq!(statistics.total_byte_size, Precision::Inexact(4000));
assert_eq!(statistics.column_statistics, expected);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_zero_selective() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(200)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::LtEq,
Arc::new(Column::new("b", 1)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(0));
assert_eq!(statistics.total_byte_size, Precision::Inexact(0));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(3))),
..Default::default()
},
]
);
Ok(())
}
#[tokio::test]
async fn test_filter_statistics_more_inputs() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(50)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(490));
assert_eq!(statistics.total_byte_size, Precision::Inexact(1960));
assert_eq!(
statistics.column_statistics,
vec![
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(49))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
..Default::default()
},
]
);
Ok(())
}
#[tokio::test]
async fn test_empty_input_statistics() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::LtEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)),
Operator::And,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
Operator::LtEq,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Minus,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let filter_statistics = filter.statistics()?;
let expected_filter_statistics = Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics {
null_count: Precision::Absent,
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
max_value: Precision::Inexact(ScalarValue::Int32(Some(10))),
distinct_count: Precision::Absent,
}],
};
assert_eq!(filter_statistics, expected_filter_statistics);
Ok(())
}
#[tokio::test]
async fn test_statistics_with_constant_column() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, input)?);
let filter_statistics = filter.statistics()?;
assert!(filter_statistics.column_statistics[0].is_singleton());
Ok(())
}
#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}
#[tokio::test]
async fn test_custom_filter_selectivity() -> Result<()> {
let schema =
Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![ColumnStatistics {
..Default::default()
}],
},
schema,
));
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
));
let filter = FilterExec::try_new(predicate, input)?;
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(200));
assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
let filter = filter.with_default_selectivity(40)?;
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(400));
assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
Ok(())
}
#[test]
fn test_equivalence_properties_union_type() -> Result<()> {
let union_type = DataType::Union(
UnionFields::new(
vec![0, 1],
vec![
Field::new("f1", DataType::Int32, true),
Field::new("f2", DataType::Utf8, true),
],
),
UnionMode::Sparse,
);
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", union_type, true),
]));
let exec = FilterExec::try_new(
binary(
binary(col("c1", &schema)?, Operator::GtEq, lit(1i32), &schema)?,
Operator::And,
binary(col("c1", &schema)?, Operator::LtEq, lit(4i32), &schema)?,
&schema,
)?,
Arc::new(EmptyExec::new(Arc::clone(&schema))),
)?;
exec.statistics().unwrap();
Ok(())
}
}