datafusion_physical_expr/
analysis.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interval and selectivity in [`AnalysisContext`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::expressions::Column;
24use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
25use crate::utils::collect_columns;
26use crate::PhysicalExpr;
27
28use arrow::datatypes::Schema;
29use datafusion_common::stats::Precision;
30use datafusion_common::{
31    internal_datafusion_err, internal_err, ColumnStatistics, Result, ScalarValue,
32};
33use datafusion_expr::interval_arithmetic::{cardinality_ratio, Interval};
34
35/// The shared context used during the analysis of an expression. Includes
36/// the boundaries for all known columns.
37#[derive(Clone, Debug, PartialEq)]
38pub struct AnalysisContext {
39    // A list of known column boundaries, ordered by the index
40    // of the column in the current schema.
41    pub boundaries: Vec<ExprBoundaries>,
42    /// The estimated percentage of rows that this expression would select, if
43    /// it were to be used as a boolean predicate on a filter. The value will be
44    /// between 0.0 (selects nothing) and 1.0 (selects everything).
45    pub selectivity: Option<f64>,
46}
47
48impl AnalysisContext {
49    pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
50        Self {
51            boundaries,
52            selectivity: None,
53        }
54    }
55
56    pub fn with_selectivity(mut self, selectivity: f64) -> Self {
57        self.selectivity = Some(selectivity);
58        self
59    }
60
61    /// Create a new analysis context from column statistics.
62    pub fn try_from_statistics(
63        input_schema: &Schema,
64        statistics: &[ColumnStatistics],
65    ) -> Result<Self> {
66        statistics
67            .iter()
68            .enumerate()
69            .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
70            .collect::<Result<Vec<_>>>()
71            .map(Self::new)
72    }
73}
74
75/// Represents the boundaries (e.g. min and max values) of a particular column
76///
77/// This is used range analysis of expressions, to determine if the expression
78/// limits the value of particular columns (e.g. analyzing an expression such as
79/// `time < 50` would result in a boundary interval for `time` having a max
80/// value of `50`).
81#[derive(Clone, Debug, PartialEq)]
82pub struct ExprBoundaries {
83    pub column: Column,
84    /// Minimum and maximum values this expression can have. A `None` value
85    /// indicates that evaluating the given column results in an empty set.
86    /// For example, if the column `a` has values in the range [10, 20],
87    /// and there is a filter asserting that `a > 50`, then the resulting interval
88    /// range of `a` will be `None`.
89    pub interval: Option<Interval>,
90    /// Maximum number of distinct values this expression can produce, if known.
91    pub distinct_count: Precision<usize>,
92}
93
94impl ExprBoundaries {
95    /// Create a new `ExprBoundaries` object from column level statistics.
96    pub fn try_from_column(
97        schema: &Schema,
98        col_stats: &ColumnStatistics,
99        col_index: usize,
100    ) -> Result<Self> {
101        let field = schema.fields().get(col_index).ok_or_else(|| {
102            internal_datafusion_err!(
103                "Could not create `ExprBoundaries`: in `try_from_column` `col_index` 
104                has gone out of bounds with a value of {col_index}, the schema has {} columns.",
105                schema.fields.len()
106            )
107        })?;
108        let empty_field =
109            ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
110        let interval = Interval::try_new(
111            col_stats
112                .min_value
113                .get_value()
114                .cloned()
115                .unwrap_or(empty_field.clone()),
116            col_stats
117                .max_value
118                .get_value()
119                .cloned()
120                .unwrap_or(empty_field),
121        )?;
122        let column = Column::new(field.name(), col_index);
123        Ok(ExprBoundaries {
124            column,
125            interval: Some(interval),
126            distinct_count: col_stats.distinct_count,
127        })
128    }
129
130    /// Create `ExprBoundaries` that represent no known bounds for all the
131    /// columns in `schema`
132    pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
133        schema
134            .fields()
135            .iter()
136            .enumerate()
137            .map(|(i, field)| {
138                Ok(Self {
139                    column: Column::new(field.name(), i),
140                    interval: Some(Interval::make_unbounded(field.data_type())?),
141                    distinct_count: Precision::Absent,
142                })
143            })
144            .collect()
145    }
146}
147
148/// Attempts to refine column boundaries and compute a selectivity value.
149///
150/// The function accepts boundaries of the input columns in the `context` parameter.
151/// It then tries to tighten these boundaries based on the provided `expr`.
152/// The resulting selectivity value is calculated by comparing the initial and final boundaries.
153/// The computation assumes that the data within the column is uniformly distributed and not sorted.
154///
155/// # Arguments
156///
157/// * `context` - The context holding input column boundaries.
158/// * `expr` - The expression used to shrink the column boundaries.
159///
160/// # Returns
161///
162/// * `AnalysisContext` constructed by pruned boundaries and a selectivity value.
163pub fn analyze(
164    expr: &Arc<dyn PhysicalExpr>,
165    context: AnalysisContext,
166    schema: &Schema,
167) -> Result<AnalysisContext> {
168    let initial_boundaries = &context.boundaries;
169    if initial_boundaries
170        .iter()
171        .all(|bound| bound.interval.is_none())
172    {
173        if initial_boundaries
174            .iter()
175            .any(|bound| bound.distinct_count != Precision::Exact(0))
176        {
177            return internal_err!(
178                "ExprBoundaries has a non-zero distinct count although it represents an empty table"
179            );
180        }
181        if context.selectivity != Some(0.0) {
182            return internal_err!(
183                "AnalysisContext has a non-zero selectivity although it represents an empty table"
184            );
185        }
186        Ok(context)
187    } else if initial_boundaries
188        .iter()
189        .any(|bound| bound.interval.is_none())
190    {
191        internal_err!(
192                "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
193            )
194    } else {
195        let mut target_boundaries = context.boundaries;
196        let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
197        let columns = collect_columns(expr)
198            .into_iter()
199            .map(|c| Arc::new(c) as _)
200            .collect::<Vec<_>>();
201
202        let mut target_indices_and_boundaries = vec![];
203        let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());
204
205        for (expr, index) in &target_expr_and_indices {
206            if let Some(column) = expr.as_any().downcast_ref::<Column>() {
207                if let Some(bound) =
208                    target_boundaries.iter().find(|b| b.column == *column)
209                {
210                    // Now, it's safe to unwrap
211                    target_indices_and_boundaries
212                        .push((*index, bound.interval.as_ref().unwrap().clone()));
213                }
214            }
215        }
216
217        match graph
218            .update_ranges(&mut target_indices_and_boundaries, Interval::CERTAINLY_TRUE)?
219        {
220            PropagationResult::Success => {
221                shrink_boundaries(graph, target_boundaries, target_expr_and_indices)
222            }
223            PropagationResult::Infeasible => {
224                // If the propagation result is infeasible, set intervals to None
225                target_boundaries
226                    .iter_mut()
227                    .for_each(|bound| bound.interval = None);
228                Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
229            }
230            PropagationResult::CannotPropagate => {
231                Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
232            }
233        }
234    }
235}
236
237/// If the `PropagationResult` indicates success, this function calculates the
238/// selectivity value by comparing the initial and final column boundaries.
239/// Following this, it constructs and returns a new `AnalysisContext` with the
240/// updated parameters.
241fn shrink_boundaries(
242    graph: ExprIntervalGraph,
243    mut target_boundaries: Vec<ExprBoundaries>,
244    target_expr_and_indices: Vec<(Arc<dyn PhysicalExpr>, usize)>,
245) -> Result<AnalysisContext> {
246    let initial_boundaries = target_boundaries.clone();
247    target_expr_and_indices.iter().for_each(|(expr, i)| {
248        if let Some(column) = expr.as_any().downcast_ref::<Column>() {
249            if let Some(bound) = target_boundaries
250                .iter_mut()
251                .find(|bound| bound.column.eq(column))
252            {
253                bound.interval = Some(graph.get_interval(*i));
254            };
255        }
256    });
257
258    let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;
259
260    if !(0.0..=1.0).contains(&selectivity) {
261        return internal_err!("Selectivity is out of limit: {}", selectivity);
262    }
263
264    Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
265}
266
267/// This function calculates the filter predicate's selectivity by comparing
268/// the initial and pruned column boundaries. Selectivity is defined as the
269/// ratio of rows in a table that satisfy the filter's predicate.
270fn calculate_selectivity(
271    target_boundaries: &[ExprBoundaries],
272    initial_boundaries: &[ExprBoundaries],
273) -> Result<f64> {
274    // Since the intervals are assumed uniform and the values
275    // are not correlated, we need to multiply the selectivities
276    // of multiple columns to get the overall selectivity.
277    if target_boundaries.len() != initial_boundaries.len() {
278        return Err(internal_datafusion_err!(
279            "The number of columns in the initial and target boundaries should be the same"
280        ));
281    }
282    let mut acc: f64 = 1.0;
283    for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
284        match (initial.interval.as_ref(), target.interval.as_ref()) {
285            (Some(initial), Some(target)) => {
286                acc *= cardinality_ratio(initial, target);
287            }
288            (None, Some(_)) => {
289                return internal_err!(
290                "Initial boundary cannot be None while having a Some() target boundary"
291            );
292            }
293            _ => return Ok(0.0),
294        }
295    }
296
297    Ok(acc)
298}
299
300#[cfg(test)]
301mod tests {
302    use std::sync::Arc;
303
304    use arrow::datatypes::{DataType, Field, Schema};
305    use datafusion_common::{assert_contains, DFSchema};
306    use datafusion_expr::{
307        col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit, Expr,
308    };
309
310    use crate::{create_physical_expr, AnalysisContext};
311
312    use super::{analyze, ExprBoundaries};
313
314    fn make_field(name: &str, data_type: DataType) -> Field {
315        let nullable = false;
316        Field::new(name, data_type, nullable)
317    }
318
319    #[test]
320    fn test_analyze_boundary_exprs() {
321        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
322
323        /// Test case containing (expression tree, lower bound, upper bound)
324        type TestCase = (Expr, Option<i32>, Option<i32>);
325
326        let test_cases: Vec<TestCase> = vec![
327            // a > 10
328            (col("a").gt(lit(10)), Some(11), None),
329            // a < 20
330            (col("a").lt(lit(20)), None, Some(19)),
331            // a > 10 AND a < 20
332            (
333                col("a").gt(lit(10)).and(col("a").lt(lit(20))),
334                Some(11),
335                Some(19),
336            ),
337            // a >= 10
338            (col("a").gt_eq(lit(10)), Some(10), None),
339            // a <= 20
340            (col("a").lt_eq(lit(20)), None, Some(20)),
341            // a >= 10 AND a <= 20
342            (
343                col("a").gt_eq(lit(10)).and(col("a").lt_eq(lit(20))),
344                Some(10),
345                Some(20),
346            ),
347            // a > 10 AND a < 20 AND a < 15
348            (
349                col("a")
350                    .gt(lit(10))
351                    .and(col("a").lt(lit(20)))
352                    .and(col("a").lt(lit(15))),
353                Some(11),
354                Some(14),
355            ),
356            // (a > 10 AND a < 20) AND (a > 15 AND a < 25)
357            (
358                col("a")
359                    .gt(lit(10))
360                    .and(col("a").lt(lit(20)))
361                    .and(col("a").gt(lit(15)))
362                    .and(col("a").lt(lit(25))),
363                Some(16),
364                Some(19),
365            ),
366        ];
367        for (expr, lower, upper) in test_cases {
368            let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
369            let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
370            let physical_expr =
371                create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
372            let analysis_result = analyze(
373                &physical_expr,
374                AnalysisContext::new(boundaries),
375                df_schema.as_ref(),
376            )
377            .unwrap();
378            let Some(actual) = &analysis_result.boundaries[0].interval else {
379                panic!("The analysis result should contain non-empty intervals for all columns");
380            };
381            let expected = Interval::make(lower, upper).unwrap();
382            assert_eq!(
383                &expected, actual,
384                "did not get correct interval for SQL expression: {expr:?}"
385            );
386        }
387    }
388
389    #[test]
390    fn test_analyze_empty_set_boundary_exprs() {
391        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
392
393        let test_cases: Vec<Expr> = vec![
394            // a > 10 AND a < 10
395            col("a").gt(lit(10)).and(col("a").lt(lit(10))),
396            // a > 5 AND (a < 20 OR a > 20)
397            // a > 10 AND a < 20
398            // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
399            col("a")
400                .gt(lit(10))
401                .and(col("a").lt(lit(20)))
402                .and(col("a").gt(lit(20)))
403                .and(col("a").lt(lit(30))),
404        ];
405
406        for expr in test_cases {
407            let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
408            let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
409            let physical_expr =
410                create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
411            let analysis_result = analyze(
412                &physical_expr,
413                AnalysisContext::new(boundaries),
414                df_schema.as_ref(),
415            )
416            .unwrap();
417
418            for boundary in analysis_result.boundaries {
419                assert!(boundary.interval.is_none());
420            }
421        }
422    }
423
424    #[test]
425    fn test_analyze_invalid_boundary_exprs() {
426        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
427        let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
428        let expected_error = "Interval arithmetic does not support the operator OR";
429        let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
430        let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
431        let physical_expr =
432            create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
433        let analysis_error = analyze(
434            &physical_expr,
435            AnalysisContext::new(boundaries),
436            df_schema.as_ref(),
437        )
438        .unwrap_err();
439        assert_contains!(analysis_error.to_string(), expected_error);
440    }
441}