datafusion_physical_expr/
analysis.rs1use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::expressions::Column;
24use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
25use crate::utils::collect_columns;
26use crate::PhysicalExpr;
27
28use arrow::datatypes::Schema;
29use datafusion_common::stats::Precision;
30use datafusion_common::{
31 internal_datafusion_err, internal_err, ColumnStatistics, Result, ScalarValue,
32};
33use datafusion_expr::interval_arithmetic::{cardinality_ratio, Interval};
34
35#[derive(Clone, Debug, PartialEq)]
38pub struct AnalysisContext {
39 pub boundaries: Vec<ExprBoundaries>,
42 pub selectivity: Option<f64>,
46}
47
48impl AnalysisContext {
49 pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
50 Self {
51 boundaries,
52 selectivity: None,
53 }
54 }
55
56 pub fn with_selectivity(mut self, selectivity: f64) -> Self {
57 self.selectivity = Some(selectivity);
58 self
59 }
60
61 pub fn try_from_statistics(
63 input_schema: &Schema,
64 statistics: &[ColumnStatistics],
65 ) -> Result<Self> {
66 statistics
67 .iter()
68 .enumerate()
69 .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
70 .collect::<Result<Vec<_>>>()
71 .map(Self::new)
72 }
73}
74
75#[derive(Clone, Debug, PartialEq)]
82pub struct ExprBoundaries {
83 pub column: Column,
84 pub interval: Option<Interval>,
90 pub distinct_count: Precision<usize>,
92}
93
94impl ExprBoundaries {
95 pub fn try_from_column(
97 schema: &Schema,
98 col_stats: &ColumnStatistics,
99 col_index: usize,
100 ) -> Result<Self> {
101 let field = schema.fields().get(col_index).ok_or_else(|| {
102 internal_datafusion_err!(
103 "Could not create `ExprBoundaries`: in `try_from_column` `col_index`
104 has gone out of bounds with a value of {col_index}, the schema has {} columns.",
105 schema.fields.len()
106 )
107 })?;
108 let empty_field =
109 ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
110 let interval = Interval::try_new(
111 col_stats
112 .min_value
113 .get_value()
114 .cloned()
115 .unwrap_or(empty_field.clone()),
116 col_stats
117 .max_value
118 .get_value()
119 .cloned()
120 .unwrap_or(empty_field),
121 )?;
122 let column = Column::new(field.name(), col_index);
123 Ok(ExprBoundaries {
124 column,
125 interval: Some(interval),
126 distinct_count: col_stats.distinct_count,
127 })
128 }
129
130 pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
133 schema
134 .fields()
135 .iter()
136 .enumerate()
137 .map(|(i, field)| {
138 Ok(Self {
139 column: Column::new(field.name(), i),
140 interval: Some(Interval::make_unbounded(field.data_type())?),
141 distinct_count: Precision::Absent,
142 })
143 })
144 .collect()
145 }
146}
147
148pub fn analyze(
164 expr: &Arc<dyn PhysicalExpr>,
165 context: AnalysisContext,
166 schema: &Schema,
167) -> Result<AnalysisContext> {
168 let initial_boundaries = &context.boundaries;
169 if initial_boundaries
170 .iter()
171 .all(|bound| bound.interval.is_none())
172 {
173 if initial_boundaries
174 .iter()
175 .any(|bound| bound.distinct_count != Precision::Exact(0))
176 {
177 return internal_err!(
178 "ExprBoundaries has a non-zero distinct count although it represents an empty table"
179 );
180 }
181 if context.selectivity != Some(0.0) {
182 return internal_err!(
183 "AnalysisContext has a non-zero selectivity although it represents an empty table"
184 );
185 }
186 Ok(context)
187 } else if initial_boundaries
188 .iter()
189 .any(|bound| bound.interval.is_none())
190 {
191 internal_err!(
192 "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
193 )
194 } else {
195 let mut target_boundaries = context.boundaries;
196 let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
197 let columns = collect_columns(expr)
198 .into_iter()
199 .map(|c| Arc::new(c) as _)
200 .collect::<Vec<_>>();
201
202 let mut target_indices_and_boundaries = vec![];
203 let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());
204
205 for (expr, index) in &target_expr_and_indices {
206 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
207 if let Some(bound) =
208 target_boundaries.iter().find(|b| b.column == *column)
209 {
210 target_indices_and_boundaries
212 .push((*index, bound.interval.as_ref().unwrap().clone()));
213 }
214 }
215 }
216
217 match graph
218 .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)?
219 {
220 PropagationResult::Success => {
221 shrink_boundaries(graph, target_boundaries, target_expr_and_indices)
222 }
223 PropagationResult::Infeasible => {
224 target_boundaries
226 .iter_mut()
227 .for_each(|bound| bound.interval = None);
228 Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
229 }
230 PropagationResult::CannotPropagate => {
231 Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
232 }
233 }
234 }
235}
236
237fn shrink_boundaries(
242 graph: ExprIntervalGraph,
243 mut target_boundaries: Vec<ExprBoundaries>,
244 target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
245) -> Result<AnalysisContext> {
246 let initial_boundaries = target_boundaries.clone();
247 target_expr_and_indices.iter().for_each(|(expr, i)| {
248 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
249 if let Some(bound) = target_boundaries
250 .iter_mut()
251 .find(|bound| bound.column.eq(column))
252 {
253 bound.interval = Some(graph.get_interval(*i));
254 };
255 }
256 });
257
258 let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;
259
260 if !(0.0..=1.0).contains(&selectivity) {
261 return internal_err!("Selectivity is out of limit: {}", selectivity);
262 }
263
264 Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
265}
266
267fn calculate_selectivity(
271 target_boundaries: &[ExprBoundaries],
272 initial_boundaries: &[ExprBoundaries],
273) -> Result<f64> {
274 if target_boundaries.len() != initial_boundaries.len() {
278 return Err(internal_datafusion_err!(
279 "The number of columns in the initial and target boundaries should be the same"
280 ));
281 }
282 let mut acc: f64 = 1.0;
283 for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
284 match (initial.interval.as_ref(), target.interval.as_ref()) {
285 (Some(initial), Some(target)) => {
286 acc *= cardinality_ratio(initial, target);
287 }
288 (None, Some(_)) => {
289 return internal_err!(
290 "Initial boundary cannot be None while having a Some() target boundary"
291 );
292 }
293 _ => return Ok(0.0),
294 }
295 }
296
297 Ok(acc)
298}
299
300#[cfg(test)]
301mod tests {
302 use std::sync::Arc;
303
304 use arrow::datatypes::{DataType, Field, Schema};
305 use datafusion_common::{assert_contains, DFSchema};
306 use datafusion_expr::{
307 col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit, Expr,
308 };
309
310 use crate::{create_physical_expr, AnalysisContext};
311
312 use super::{analyze, ExprBoundaries};
313
314 fn make_field(name: &str, data_type: DataType) -> Field {
315 let nullable = false;
316 Field::new(name, data_type, nullable)
317 }
318
319 #[test]
320 fn test_analyze_boundary_exprs() {
321 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
322
323 type TestCase = (Expr, Option<i32>, Option<i32>);
325
326 let test_cases: Vec<TestCase> = vec![
327 (col("a").gt(lit(10)), Some(11), None),
329 (col("a").lt(lit(20)), None, Some(19)),
331 (
333 col("a").gt(lit(10)).and(col("a").lt(lit(20))),
334 Some(11),
335 Some(19),
336 ),
337 (col("a").gt_eq(lit(10)), Some(10), None),
339 (col("a").lt_eq(lit(20)), None, Some(20)),
341 (
343 col("a").gt_eq(lit(10)).and(col("a").lt_eq(lit(20))),
344 Some(10),
345 Some(20),
346 ),
347 (
349 col("a")
350 .gt(lit(10))
351 .and(col("a").lt(lit(20)))
352 .and(col("a").lt(lit(15))),
353 Some(11),
354 Some(14),
355 ),
356 (
358 col("a")
359 .gt(lit(10))
360 .and(col("a").lt(lit(20)))
361 .and(col("a").gt(lit(15)))
362 .and(col("a").lt(lit(25))),
363 Some(16),
364 Some(19),
365 ),
366 ];
367 for (expr, lower, upper) in test_cases {
368 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
369 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
370 let physical_expr =
371 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
372 let analysis_result = analyze(
373 &physical_expr,
374 AnalysisContext::new(boundaries),
375 df_schema.as_ref(),
376 )
377 .unwrap();
378 let Some(actual) = &analysis_result.boundaries[0].interval else {
379 panic!("The analysis result should contain non-empty intervals for all columns");
380 };
381 let expected = Interval::make(lower, upper).unwrap();
382 assert_eq!(
383 &expected, actual,
384 "did not get correct interval for SQL expression: {expr:?}"
385 );
386 }
387 }
388
389 #[test]
390 fn test_analyze_empty_set_boundary_exprs() {
391 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
392
393 let test_cases: Vec<Expr> = vec![
394 col("a").gt(lit(10)).and(col("a").lt(lit(10))),
396 col("a")
400 .gt(lit(10))
401 .and(col("a").lt(lit(20)))
402 .and(col("a").gt(lit(20)))
403 .and(col("a").lt(lit(30))),
404 ];
405
406 for expr in test_cases {
407 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
408 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
409 let physical_expr =
410 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
411 let analysis_result = analyze(
412 &physical_expr,
413 AnalysisContext::new(boundaries),
414 df_schema.as_ref(),
415 )
416 .unwrap();
417
418 for boundary in analysis_result.boundaries {
419 assert!(boundary.interval.is_none());
420 }
421 }
422 }
423
424 #[test]
425 fn test_analyze_invalid_boundary_exprs() {
426 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
427 let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
428 let expected_error = "Interval arithmetic does not support the operator OR";
429 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
430 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
431 let physical_expr =
432 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
433 let analysis_error = analyze(
434 &physical_expr,
435 AnalysisContext::new(boundaries),
436 df_schema.as_ref(),
437 )
438 .unwrap_err();
439 assert_contains!(analysis_error.to_string(), expected_error);
440 }
441}