datafusion_physical_expr/equivalence/properties/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22use dependency::{
23    construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
24    Dependencies, DependencyMap,
25};
26pub use joins::*;
27pub use union::*;
28
29use std::fmt::Display;
30use std::hash::{Hash, Hasher};
31use std::sync::Arc;
32use std::{fmt, mem};
33
34use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
35use crate::equivalence::{
36    EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
37};
38use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
39use crate::{
40    physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr,
41    PhysicalSortExpr, PhysicalSortRequirement,
42};
43
44use arrow::compute::SortOptions;
45use arrow::datatypes::SchemaRef;
46use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
47use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
48use datafusion_expr::interval_arithmetic::Interval;
49use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
50use datafusion_physical_expr_common::utils::ExprPropertiesNode;
51
52use indexmap::IndexSet;
53use itertools::Itertools;
54
55/// A `EquivalenceProperties` object stores information known about the output
56/// of a plan node, that can be used to optimize the plan.
57///
58/// Currently, it keeps track of:
59/// - Sort expressions (orderings)
60/// - Equivalent expressions: expressions that are known to have same value.
61/// - Constants expressions: expressions that are known to contain a single
62///   constant value.
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties`, tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case,  columns `a` and `b` always have the same value, which can of
101/// such equivalences inside this object. With this information, Datafusion can
102/// optimize operations such as. For example, if the partition requirement is
103/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
104/// repartitioning the data as the existing partitioning satisfies the
105/// requirement.
106///
107/// # Code Example
108/// ```
109/// # use std::sync::Arc;
110/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
111/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
112/// # use datafusion_physical_expr::expressions::col;
113/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
114/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
115/// #   Field::new("a", DataType::Int32, false),
116/// #   Field::new("b", DataType::Int32, false),
117/// #   Field::new("c", DataType::Int32, false),
118/// # ]));
119/// # let col_a = col("a", &schema).unwrap();
120/// # let col_b = col("b", &schema).unwrap();
121/// # let col_c = col("c", &schema).unwrap();
122/// // This object represents data that is sorted by a ASC, c DESC
123/// // with a single constant value of b
124/// let mut eq_properties = EquivalenceProperties::new(schema)
125///   .with_constants(vec![ConstExpr::from(col_b)]);
126/// eq_properties.add_new_ordering(LexOrdering::new(vec![
127///   PhysicalSortExpr::new_default(col_a).asc(),
128///   PhysicalSortExpr::new_default(col_c).desc(),
129/// ]));
130///
131/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1(heterogeneous)]")
132/// ```
133#[derive(Debug, Clone)]
134pub struct EquivalenceProperties {
135    /// Distinct equivalence classes (exprs known to have the same expressions)
136    eq_group: EquivalenceGroup,
137    /// Equivalent sort expressions
138    oeq_class: OrderingEquivalenceClass,
139    /// Expressions whose values are constant
140    ///
141    /// TODO: We do not need to track constants separately, they can be tracked
142    ///       inside `eq_group` as `Literal` expressions.
143    constants: Vec<ConstExpr>,
144    /// Table constraints
145    constraints: Constraints,
146    /// Schema associated with this object.
147    schema: SchemaRef,
148}
149
150impl EquivalenceProperties {
151    /// Creates an empty `EquivalenceProperties` object.
152    pub fn new(schema: SchemaRef) -> Self {
153        Self {
154            eq_group: EquivalenceGroup::empty(),
155            oeq_class: OrderingEquivalenceClass::empty(),
156            constants: vec![],
157            constraints: Constraints::empty(),
158            schema,
159        }
160    }
161
162    /// Adds constraints to the properties.
163    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
164        self.constraints = constraints;
165        self
166    }
167
168    /// Creates a new `EquivalenceProperties` object with the given orderings.
169    pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
170        Self {
171            eq_group: EquivalenceGroup::empty(),
172            oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
173            constants: vec![],
174            constraints: Constraints::empty(),
175            schema,
176        }
177    }
178
179    /// Returns the associated schema.
180    pub fn schema(&self) -> &SchemaRef {
181        &self.schema
182    }
183
184    /// Returns a reference to the ordering equivalence class within.
185    pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
186        &self.oeq_class
187    }
188
189    /// Return the inner OrderingEquivalenceClass, consuming self
190    pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
191        self.oeq_class
192    }
193
194    /// Returns a reference to the equivalence group within.
195    pub fn eq_group(&self) -> &EquivalenceGroup {
196        &self.eq_group
197    }
198
199    /// Returns a reference to the constant expressions
200    pub fn constants(&self) -> &[ConstExpr] {
201        &self.constants
202    }
203
204    pub fn constraints(&self) -> &Constraints {
205        &self.constraints
206    }
207
208    /// Returns the output ordering of the properties.
209    pub fn output_ordering(&self) -> Option<LexOrdering> {
210        let constants = self.constants();
211        let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
212        // Prune out constant expressions
213        output_ordering
214            .retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
215        (!output_ordering.is_empty()).then_some(output_ordering)
216    }
217
218    /// Returns the normalized version of the ordering equivalence class within.
219    /// Normalization removes constants and duplicates as well as standardizing
220    /// expressions according to the equivalence group within.
221    pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
222        OrderingEquivalenceClass::new(
223            self.oeq_class
224                .iter()
225                .map(|ordering| self.normalize_sort_exprs(ordering))
226                .collect(),
227        )
228    }
229
230    /// Extends this `EquivalenceProperties` with the `other` object.
231    pub fn extend(mut self, other: Self) -> Self {
232        self.eq_group.extend(other.eq_group);
233        self.oeq_class.extend(other.oeq_class);
234        self.with_constants(other.constants)
235    }
236
237    /// Clears (empties) the ordering equivalence class within this object.
238    /// Call this method when existing orderings are invalidated.
239    pub fn clear_orderings(&mut self) {
240        self.oeq_class.clear();
241    }
242
243    /// Removes constant expressions that may change across partitions.
244    /// This method should be used when data from different partitions are merged.
245    pub fn clear_per_partition_constants(&mut self) {
246        self.constants.retain(|item| {
247            matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
248        })
249    }
250
251    /// Extends this `EquivalenceProperties` by adding the orderings inside the
252    /// ordering equivalence class `other`.
253    pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
254        self.oeq_class.extend(other);
255    }
256
257    /// Adds new orderings into the existing ordering equivalence class.
258    pub fn add_new_orderings(
259        &mut self,
260        orderings: impl IntoIterator<Item = LexOrdering>,
261    ) {
262        self.oeq_class.add_new_orderings(orderings);
263    }
264
265    /// Adds a single ordering to the existing ordering equivalence class.
266    pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
267        self.add_new_orderings([ordering]);
268    }
269
270    /// Incorporates the given equivalence group to into the existing
271    /// equivalence group within.
272    pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
273        self.eq_group.extend(other_eq_group);
274    }
275
276    /// Adds a new equality condition into the existing equivalence group.
277    /// If the given equality defines a new equivalence class, adds this new
278    /// equivalence class to the equivalence group.
279    pub fn add_equal_conditions(
280        &mut self,
281        left: &Arc<dyn PhysicalExpr>,
282        right: &Arc<dyn PhysicalExpr>,
283    ) -> Result<()> {
284        // Discover new constants in light of new the equality:
285        if self.is_expr_constant(left) {
286            // Left expression is constant, add right as constant
287            if !const_exprs_contains(&self.constants, right) {
288                let const_expr = ConstExpr::from(right)
289                    .with_across_partitions(self.get_expr_constant_value(left));
290                self.constants.push(const_expr);
291            }
292        } else if self.is_expr_constant(right) {
293            // Right expression is constant, add left as constant
294            if !const_exprs_contains(&self.constants, left) {
295                let const_expr = ConstExpr::from(left)
296                    .with_across_partitions(self.get_expr_constant_value(right));
297                self.constants.push(const_expr);
298            }
299        }
300
301        // Add equal expressions to the state
302        self.eq_group.add_equal_conditions(left, right);
303
304        // Discover any new orderings
305        self.discover_new_orderings(left)?;
306        Ok(())
307    }
308
309    /// Track/register physical expressions with constant values.
310    #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
311    pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) -> Self {
312        self.with_constants(constants)
313    }
314
315    /// Remove the specified constant
316    pub fn remove_constant(mut self, c: &ConstExpr) -> Self {
317        self.constants.retain(|existing| existing != c);
318        self
319    }
320
321    /// Track/register physical expressions with constant values.
322    pub fn with_constants(
323        mut self,
324        constants: impl IntoIterator<Item = ConstExpr>,
325    ) -> Self {
326        let normalized_constants = constants
327            .into_iter()
328            .filter_map(|c| {
329                let across_partitions = c.across_partitions();
330                let expr = c.owned_expr();
331                let normalized_expr = self.eq_group.normalize_expr(expr);
332
333                if const_exprs_contains(&self.constants, &normalized_expr) {
334                    return None;
335                }
336
337                let const_expr = ConstExpr::from(normalized_expr)
338                    .with_across_partitions(across_partitions);
339
340                Some(const_expr)
341            })
342            .collect::<Vec<_>>();
343
344        // Add all new normalized constants
345        self.constants.extend(normalized_constants);
346
347        // Discover any new orderings based on the constants
348        for ordering in self.normalized_oeq_class().iter() {
349            if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
350                log::debug!("error discovering new orderings: {e}");
351            }
352        }
353
354        self
355    }
356
357    // Discover new valid orderings in light of a new equality.
358    // Accepts a single argument (`expr`) which is used to determine
359    // which orderings should be updated.
360    // When constants or equivalence classes are changed, there may be new orderings
361    // that can be discovered with the new equivalence properties.
362    // For a discussion, see: https://github.com/apache/datafusion/issues/9812
363    fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
364        let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
365        let eq_class = self
366            .eq_group
367            .iter()
368            .find_map(|class| {
369                class
370                    .contains(&normalized_expr)
371                    .then(|| class.clone().into_vec())
372            })
373            .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
374
375        let mut new_orderings: Vec<LexOrdering> = vec![];
376        for ordering in self.normalized_oeq_class().iter() {
377            if !ordering[0].expr.eq(&normalized_expr) {
378                continue;
379            }
380
381            let leading_ordering_options = ordering[0].options;
382
383            for equivalent_expr in &eq_class {
384                let children = equivalent_expr.children();
385                if children.is_empty() {
386                    continue;
387                }
388
389                // Check if all children match the next expressions in the ordering
390                let mut all_children_match = true;
391                let mut child_properties = vec![];
392
393                // Build properties for each child based on the next expressions
394                for (i, child) in children.iter().enumerate() {
395                    if let Some(next) = ordering.get(i + 1) {
396                        if !child.as_ref().eq(next.expr.as_ref()) {
397                            all_children_match = false;
398                            break;
399                        }
400                        child_properties.push(ExprProperties {
401                            sort_properties: SortProperties::Ordered(next.options),
402                            range: Interval::make_unbounded(
403                                &child.data_type(&self.schema)?,
404                            )?,
405                            preserves_lex_ordering: true,
406                        });
407                    } else {
408                        all_children_match = false;
409                        break;
410                    }
411                }
412
413                if all_children_match {
414                    // Check if the expression is monotonic in all arguments
415                    if let Ok(expr_properties) =
416                        equivalent_expr.get_properties(&child_properties)
417                    {
418                        if expr_properties.preserves_lex_ordering
419                            && SortProperties::Ordered(leading_ordering_options)
420                                == expr_properties.sort_properties
421                        {
422                            // Assume existing ordering is [c ASC, a ASC, b ASC]
423                            // When equality c = f(a,b) is given, if we know that given ordering `[a ASC, b ASC]`,
424                            // ordering `[f(a,b) ASC]` is valid, then we can deduce that ordering `[a ASC, b ASC]` is also valid.
425                            // Hence, ordering `[a ASC, b ASC]` can be added to the state as a valid ordering.
426                            // (e.g. existing ordering where leading ordering is removed)
427                            new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
428                            break;
429                        }
430                    }
431                }
432            }
433        }
434
435        self.oeq_class.add_new_orderings(new_orderings);
436        Ok(())
437    }
438
439    /// Updates the ordering equivalence group within assuming that the table
440    /// is re-sorted according to the argument `sort_exprs`. Note that constants
441    /// and equivalence classes are unchanged as they are unaffected by a re-sort.
442    /// If the given ordering is already satisfied, the function does nothing.
443    pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
444        // Filter out constant expressions as they don't affect ordering
445        let filtered_exprs = LexOrdering::new(
446            sort_exprs
447                .into_iter()
448                .filter(|expr| !self.is_expr_constant(&expr.expr))
449                .collect(),
450        );
451
452        if filtered_exprs.is_empty() {
453            return self;
454        }
455
456        let mut new_orderings = vec![filtered_exprs.clone()];
457
458        // Preserve valid suffixes from existing orderings
459        let oeq_class = mem::take(&mut self.oeq_class);
460        for existing in oeq_class {
461            if self.is_prefix_of(&filtered_exprs, &existing) {
462                let mut extended = filtered_exprs.clone();
463                extended.extend(existing.into_iter().skip(filtered_exprs.len()));
464                new_orderings.push(extended);
465            }
466        }
467
468        self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
469        self
470    }
471
472    /// Checks if the new ordering matches a prefix of the existing ordering
473    /// (considering expression equivalences)
474    fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> bool {
475        // Check if new order is longer than existing - can't be a prefix
476        if new_order.len() > existing.len() {
477            return false;
478        }
479
480        // Check if new order matches existing prefix (considering equivalences)
481        new_order.iter().zip(existing).all(|(new, existing)| {
482            self.eq_group.exprs_equal(&new.expr, &existing.expr)
483                && new.options == existing.options
484        })
485    }
486
487    /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
488    /// equivalence group and the ordering equivalence class within.
489    ///
490    /// Assume that `self.eq_group` states column `a` and `b` are aliases.
491    /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
492    /// are equivalent (in the sense that both describe the ordering of the table).
493    /// If the `sort_exprs` argument were `vec![b ASC, c ASC, a ASC]`, then this
494    /// function would return `vec![a ASC, c ASC]`. Internally, it would first
495    /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
496    /// after deduplication.
497    fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
498        // Convert sort expressions to sort requirements:
499        let sort_reqs = LexRequirement::from(sort_exprs.clone());
500        // Normalize the requirements:
501        let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
502        // Convert sort requirements back to sort expressions:
503        LexOrdering::from(normalized_sort_reqs)
504    }
505
506    /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
507    /// equivalence group and the ordering equivalence class within. It works by:
508    /// - Removing expressions that have a constant value from the given requirement.
509    /// - Replacing sections that belong to some equivalence class in the equivalence
510    ///   group with the first entry in the matching equivalence class.
511    ///
512    /// Assume that `self.eq_group` states column `a` and `b` are aliases.
513    /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
514    /// are equivalent (in the sense that both describe the ordering of the table).
515    /// If the `sort_reqs` argument were `vec![b ASC, c ASC, a ASC]`, then this
516    /// function would return `vec![a ASC, c ASC]`. Internally, it would first
517    /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
518    /// after deduplication.
519    fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> LexRequirement {
520        let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
521        let mut constant_exprs = vec![];
522        constant_exprs.extend(
523            self.constants
524                .iter()
525                .map(|const_expr| Arc::clone(const_expr.expr())),
526        );
527        let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
528        // Prune redundant sections in the requirement:
529        normalized_sort_reqs
530            .iter()
531            .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
532            .cloned()
533            .collect::<LexRequirement>()
534            .collapse()
535    }
536
537    /// Checks whether the given ordering is satisfied by any of the existing
538    /// orderings.
539    pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
540        // Convert the given sort expressions to sort requirements:
541        let sort_requirements = LexRequirement::from(given.clone());
542        self.ordering_satisfy_requirement(&sort_requirements)
543    }
544
545    /// Checks whether the given sort requirements are satisfied by any of the
546    /// existing orderings.
547    pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
548        let mut eq_properties = self.clone();
549        // First, standardize the given requirement:
550        let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
551
552        // Check whether given ordering is satisfied by constraints first
553        if self.satisfied_by_constraints(&normalized_reqs) {
554            return true;
555        }
556
557        for normalized_req in normalized_reqs {
558            // Check whether given ordering is satisfied
559            if !eq_properties.ordering_satisfy_single(&normalized_req) {
560                return false;
561            }
562            // Treat satisfied keys as constants in subsequent iterations. We
563            // can do this because the "next" key only matters in a lexicographical
564            // ordering when the keys to its left have the same values.
565            //
566            // Note that these expressions are not properly "constants". This is just
567            // an implementation strategy confined to this function.
568            //
569            // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
570            // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
571            // From the analysis above, we know that `[a ASC]` is satisfied. Then,
572            // we add column `a` as constant to the algorithm state. This enables us
573            // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
574            eq_properties = eq_properties
575                .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
576        }
577        true
578    }
579
580    /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique).
581    /// Returns true if any constraint fully satisfies the requirements.
582    fn satisfied_by_constraints(
583        &self,
584        normalized_reqs: &[PhysicalSortRequirement],
585    ) -> bool {
586        self.constraints.iter().any(|constraint| match constraint {
587            Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self
588                .satisfied_by_constraint(
589                    normalized_reqs,
590                    indices,
591                    matches!(constraint, Constraint::Unique(_)),
592                ),
593        })
594    }
595
596    /// Checks if sort requirements are satisfied by a constraint (primary key or unique).
597    /// Returns true if the constraint indices form a valid prefix of an existing ordering
598    /// that matches the requirements. For unique constraints, also verifies nullable columns.
599    fn satisfied_by_constraint(
600        &self,
601        normalized_reqs: &[PhysicalSortRequirement],
602        indices: &[usize],
603        check_null: bool,
604    ) -> bool {
605        // Requirements must contain indices
606        if indices.len() > normalized_reqs.len() {
607            return false;
608        }
609
610        // Iterate over all orderings
611        self.oeq_class.iter().any(|ordering| {
612            if indices.len() > ordering.len() {
613                return false;
614            }
615
616            // Build a map of column positions in the ordering
617            let mut col_positions = HashMap::with_capacity(ordering.len());
618            for (pos, req) in ordering.iter().enumerate() {
619                if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
620                    col_positions.insert(
621                        col.index(),
622                        (pos, col.nullable(&self.schema).unwrap_or(true)),
623                    );
624                }
625            }
626
627            // Check if all constraint indices appear in valid positions
628            if !indices.iter().all(|&idx| {
629                col_positions
630                    .get(&idx)
631                    .map(|&(pos, nullable)| {
632                        // For unique constraints, verify column is not nullable if it's first/last
633                        !check_null
634                            || (pos != 0 && pos != ordering.len() - 1)
635                            || !nullable
636                    })
637                    .unwrap_or(false)
638            }) {
639                return false;
640            }
641
642            // Check if this ordering matches requirements prefix
643            let ordering_len = ordering.len();
644            normalized_reqs.len() >= ordering_len
645                && normalized_reqs[..ordering_len].iter().zip(ordering).all(
646                    |(req, existing)| {
647                        req.expr.eq(&existing.expr)
648                            && req
649                                .options
650                                .is_none_or(|req_opts| req_opts == existing.options)
651                    },
652                )
653        })
654    }
655
656    /// Determines whether the ordering specified by the given sort requirement
657    /// is satisfied based on the orderings within, equivalence classes, and
658    /// constant expressions.
659    ///
660    /// # Parameters
661    ///
662    /// - `req`: A reference to a `PhysicalSortRequirement` for which the ordering
663    ///   satisfaction check will be done.
664    ///
665    /// # Returns
666    ///
667    /// Returns `true` if the specified ordering is satisfied, `false` otherwise.
668    fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
669        let ExprProperties {
670            sort_properties, ..
671        } = self.get_expr_properties(Arc::clone(&req.expr));
672        match sort_properties {
673            SortProperties::Ordered(options) => {
674                let sort_expr = PhysicalSortExpr {
675                    expr: Arc::clone(&req.expr),
676                    options,
677                };
678                sort_expr.satisfy(req, self.schema())
679            }
680            // Singleton expressions satisfies any ordering.
681            SortProperties::Singleton => true,
682            SortProperties::Unordered => false,
683        }
684    }
685
686    /// Checks whether the `given` sort requirements are equal or more specific
687    /// than the `reference` sort requirements.
688    pub fn requirements_compatible(
689        &self,
690        given: &LexRequirement,
691        reference: &LexRequirement,
692    ) -> bool {
693        let normalized_given = self.normalize_sort_requirements(given);
694        let normalized_reference = self.normalize_sort_requirements(reference);
695
696        (normalized_reference.len() <= normalized_given.len())
697            && normalized_reference
698                .into_iter()
699                .zip(normalized_given)
700                .all(|(reference, given)| given.compatible(&reference))
701    }
702
703    /// Returns the finer ordering among the orderings `lhs` and `rhs`, breaking
704    /// any ties by choosing `lhs`.
705    ///
706    /// The finer ordering is the ordering that satisfies both of the orderings.
707    /// If the orderings are incomparable, returns `None`.
708    ///
709    /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is
710    /// the latter.
711    pub fn get_finer_ordering(
712        &self,
713        lhs: &LexOrdering,
714        rhs: &LexOrdering,
715    ) -> Option<LexOrdering> {
716        // Convert the given sort expressions to sort requirements:
717        let lhs = LexRequirement::from(lhs.clone());
718        let rhs = LexRequirement::from(rhs.clone());
719        let finer = self.get_finer_requirement(&lhs, &rhs);
720        // Convert the chosen sort requirements back to sort expressions:
721        finer.map(LexOrdering::from)
722    }
723
724    /// Returns the finer ordering among the requirements `lhs` and `rhs`,
725    /// breaking any ties by choosing `lhs`.
726    ///
727    /// The finer requirements are the ones that satisfy both of the given
728    /// requirements. If the requirements are incomparable, returns `None`.
729    ///
730    /// For example, the finer requirements among `[a ASC]` and `[a ASC, b ASC]`
731    /// is the latter.
732    pub fn get_finer_requirement(
733        &self,
734        req1: &LexRequirement,
735        req2: &LexRequirement,
736    ) -> Option<LexRequirement> {
737        let mut lhs = self.normalize_sort_requirements(req1);
738        let mut rhs = self.normalize_sort_requirements(req2);
739        lhs.inner
740            .iter_mut()
741            .zip(rhs.inner.iter_mut())
742            .all(|(lhs, rhs)| {
743                lhs.expr.eq(&rhs.expr)
744                    && match (lhs.options, rhs.options) {
745                        (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
746                        (Some(options), None) => {
747                            rhs.options = Some(options);
748                            true
749                        }
750                        (None, Some(options)) => {
751                            lhs.options = Some(options);
752                            true
753                        }
754                        (None, None) => true,
755                    }
756            })
757            .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
758    }
759
760    /// we substitute the ordering according to input expression type, this is a simplified version
761    /// In this case, we just substitute when the expression satisfy the following condition:
762    /// I. just have one column and is a CAST expression
763    /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
764    /// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if
765    ///  x is DESC or ASC
766    /// After substitution, we may generate more than 1 `LexOrdering`. As an example,
767    /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied.
768    pub fn substitute_ordering_component(
769        &self,
770        mapping: &ProjectionMapping,
771        sort_expr: &LexOrdering,
772    ) -> Result<Vec<LexOrdering>> {
773        let new_orderings = sort_expr
774            .iter()
775            .map(|sort_expr| {
776                let referring_exprs: Vec<_> = mapping
777                    .iter()
778                    .map(|(source, _target)| source)
779                    .filter(|source| expr_refers(source, &sort_expr.expr))
780                    .cloned()
781                    .collect();
782                let mut res = LexOrdering::new(vec![sort_expr.clone()]);
783                // TODO: Add one-to-ones analysis for ScalarFunctions.
784                for r_expr in referring_exprs {
785                    // we check whether this expression is substitutable or not
786                    if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
787                        // we need to know whether the Cast Expr matches or not
788                        let expr_type = sort_expr.expr.data_type(&self.schema)?;
789                        if cast_expr.expr.eq(&sort_expr.expr)
790                            && cast_expr.is_bigger_cast(expr_type)
791                        {
792                            res.push(PhysicalSortExpr {
793                                expr: Arc::clone(&r_expr),
794                                options: sort_expr.options,
795                            });
796                        }
797                    }
798                }
799                Ok(res)
800            })
801            .collect::<Result<Vec<_>>>()?;
802        // Generate all valid orderings, given substituted expressions.
803        let res = new_orderings
804            .into_iter()
805            .multi_cartesian_product()
806            .map(LexOrdering::new)
807            .collect::<Vec<_>>();
808        Ok(res)
809    }
810
811    /// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
812    /// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
813    /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
814    /// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
815    /// dependency map, happen in issue 8838: <https://github.com/apache/datafusion/issues/8838>
816    pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
817        let new_order = self
818            .oeq_class
819            .iter()
820            .map(|order| self.substitute_ordering_component(mapping, order))
821            .collect::<Result<Vec<_>>>()?;
822        let new_order = new_order.into_iter().flatten().collect();
823        self.oeq_class = OrderingEquivalenceClass::new(new_order);
824        Ok(())
825    }
826    /// Projects argument `expr` according to `projection_mapping`, taking
827    /// equivalences into account.
828    ///
829    /// For example, assume that columns `a` and `c` are always equal, and that
830    /// `projection_mapping` encodes following mapping:
831    ///
832    /// ```text
833    /// a -> a1
834    /// b -> b1
835    /// ```
836    ///
837    /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
838    /// `Some(a1 + b1)` and `d` to `None`, meaning that it  cannot be projected.
839    pub fn project_expr(
840        &self,
841        expr: &Arc<dyn PhysicalExpr>,
842        projection_mapping: &ProjectionMapping,
843    ) -> Option<Arc<dyn PhysicalExpr>> {
844        self.eq_group.project_expr(projection_mapping, expr)
845    }
846
847    /// Constructs a dependency map based on existing orderings referred to in
848    /// the projection.
849    ///
850    /// This function analyzes the orderings in the normalized order-equivalence
851    /// class and builds a dependency map. The dependency map captures relationships
852    /// between expressions within the orderings, helping to identify dependencies
853    /// and construct valid projected orderings during projection operations.
854    ///
855    /// # Parameters
856    ///
857    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
858    ///   relationship between source and target expressions.
859    ///
860    /// # Returns
861    ///
862    /// A [`DependencyMap`] representing the dependency map, where each
863    /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
864    ///
865    /// # Example
866    ///
867    /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
868    /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
869    /// Then, the dependency map will be:
870    ///
871    /// ```text
872    /// a ASC: Node {Some(a_new ASC), HashSet{}}
873    /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
874    /// c ASC: Node {None, HashSet{a ASC}}
875    /// ```
876    fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
877        let mut dependency_map = DependencyMap::new();
878        for ordering in self.normalized_oeq_class().iter() {
879            for (idx, sort_expr) in ordering.iter().enumerate() {
880                let target_sort_expr =
881                    self.project_expr(&sort_expr.expr, mapping).map(|expr| {
882                        PhysicalSortExpr {
883                            expr,
884                            options: sort_expr.options,
885                        }
886                    });
887                let is_projected = target_sort_expr.is_some();
888                if is_projected
889                    || mapping
890                        .iter()
891                        .any(|(source, _)| expr_refers(source, &sort_expr.expr))
892                {
893                    // Previous ordering is a dependency. Note that there is no,
894                    // dependency for a leading ordering (i.e. the first sort
895                    // expression).
896                    let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
897                    // Add sort expressions that can be projected or referred to
898                    // by any of the projection expressions to the dependency map:
899                    dependency_map.insert(
900                        sort_expr,
901                        target_sort_expr.as_ref(),
902                        dependency,
903                    );
904                }
905                if !is_projected {
906                    // If we can not project, stop constructing the dependency
907                    // map as remaining dependencies will be invalid after projection.
908                    break;
909                }
910            }
911        }
912        dependency_map
913    }
914
915    /// Returns a new `ProjectionMapping` where source expressions are normalized.
916    ///
917    /// This normalization ensures that source expressions are transformed into a
918    /// consistent representation. This is beneficial for algorithms that rely on
919    /// exact equalities, as it allows for more precise and reliable comparisons.
920    ///
921    /// # Parameters
922    ///
923    /// - `mapping`: A reference to the original `ProjectionMapping` to be normalized.
924    ///
925    /// # Returns
926    ///
927    /// A new `ProjectionMapping` with normalized source expressions.
928    fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
929        // Construct the mapping where source expressions are normalized. In this way
930        // In the algorithms below we can work on exact equalities
931        ProjectionMapping {
932            map: mapping
933                .iter()
934                .map(|(source, target)| {
935                    let normalized_source =
936                        self.eq_group.normalize_expr(Arc::clone(source));
937                    (normalized_source, Arc::clone(target))
938                })
939                .collect(),
940        }
941    }
942
943    /// Computes projected orderings based on a given projection mapping.
944    ///
945    /// This function takes a `ProjectionMapping` and computes the possible
946    /// orderings for the projected expressions. It considers dependencies
947    /// between expressions and generates valid orderings according to the
948    /// specified sort properties.
949    ///
950    /// # Parameters
951    ///
952    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
953    ///   relationship between source and target expressions.
954    ///
955    /// # Returns
956    ///
957    /// A vector of `LexOrdering` containing all valid orderings after projection.
958    fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
959        let mapping = self.normalized_mapping(mapping);
960
961        // Get dependency map for existing orderings:
962        let dependency_map = self.construct_dependency_map(&mapping);
963        let orderings = mapping.iter().flat_map(|(source, target)| {
964            referred_dependencies(&dependency_map, source)
965                .into_iter()
966                .filter_map(|relevant_deps| {
967                    if let Ok(SortProperties::Ordered(options)) =
968                        get_expr_properties(source, &relevant_deps, &self.schema)
969                            .map(|prop| prop.sort_properties)
970                    {
971                        Some((options, relevant_deps))
972                    } else {
973                        // Do not consider unordered cases
974                        None
975                    }
976                })
977                .flat_map(|(options, relevant_deps)| {
978                    let sort_expr = PhysicalSortExpr {
979                        expr: Arc::clone(target),
980                        options,
981                    };
982                    // Generate dependent orderings (i.e. prefixes for `sort_expr`):
983                    let mut dependency_orderings =
984                        generate_dependency_orderings(&relevant_deps, &dependency_map);
985                    // Append `sort_expr` to the dependent orderings:
986                    for ordering in dependency_orderings.iter_mut() {
987                        ordering.push(sort_expr.clone());
988                    }
989                    dependency_orderings
990                })
991        });
992
993        // Add valid projected orderings. For example, if existing ordering is
994        // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
995        // preserve `a_new + b_new` as ordered. Please note that `a_new` and
996        // `b_new` themselves need not be ordered. Such dependencies cannot be
997        // deduced via the pass above.
998        let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
999            let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1000            if prefixes.is_empty() {
1001                // If prefix is empty, there is no dependency. Insert
1002                // empty ordering:
1003                prefixes = vec![LexOrdering::default()];
1004            }
1005            // Append current ordering on top its dependencies:
1006            for ordering in prefixes.iter_mut() {
1007                if let Some(target) = &node.target_sort_expr {
1008                    ordering.push(target.clone())
1009                }
1010            }
1011            prefixes
1012        });
1013
1014        // Simplify each ordering by removing redundant sections:
1015        orderings
1016            .chain(projected_orderings)
1017            .map(|lex_ordering| lex_ordering.collapse())
1018            .collect()
1019    }
1020
1021    /// Projects constants based on the provided `ProjectionMapping`.
1022    ///
1023    /// This function takes a `ProjectionMapping` and identifies/projects
1024    /// constants based on the existing constants and the mapping. It ensures
1025    /// that constants are appropriately propagated through the projection.
1026    ///
1027    /// # Parameters
1028    ///
1029    /// - `mapping`: A reference to a `ProjectionMapping` representing the
1030    ///   mapping of source expressions to target expressions in the projection.
1031    ///
1032    /// # Returns
1033    ///
1034    /// Returns a `Vec<Arc<dyn PhysicalExpr>>` containing the projected constants.
1035    fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> {
1036        // First, project existing constants. For example, assume that `a + b`
1037        // is known to be constant. If the projection were `a as a_new`, `b as b_new`,
1038        // then we would project constant `a + b` as `a_new + b_new`.
1039        let mut projected_constants = self
1040            .constants
1041            .iter()
1042            .flat_map(|const_expr| {
1043                const_expr
1044                    .map(|expr| self.eq_group.project_expr(mapping, expr))
1045                    .map(|projected_expr| {
1046                        projected_expr
1047                            .with_across_partitions(const_expr.across_partitions())
1048                    })
1049            })
1050            .collect::<Vec<_>>();
1051
1052        // Add projection expressions that are known to be constant:
1053        for (source, target) in mapping.iter() {
1054            if self.is_expr_constant(source)
1055                && !const_exprs_contains(&projected_constants, target)
1056            {
1057                if self.is_expr_constant_across_partitions(source) {
1058                    projected_constants.push(
1059                        ConstExpr::from(target)
1060                            .with_across_partitions(self.get_expr_constant_value(source)),
1061                    )
1062                } else {
1063                    projected_constants.push(
1064                        ConstExpr::from(target)
1065                            .with_across_partitions(AcrossPartitions::Heterogeneous),
1066                    )
1067                }
1068            }
1069        }
1070        projected_constants
1071    }
1072
1073    /// Projects constraints according to the given projection mapping.
1074    ///
1075    /// This function takes a projection mapping and extracts the column indices of the target columns.
1076    /// It then projects the constraints to only include relationships between
1077    /// columns that exist in the projected output.
1078    ///
1079    /// # Arguments
1080    ///
1081    /// * `mapping` - A reference to `ProjectionMapping` that defines how expressions are mapped
1082    ///               in the projection operation
1083    ///
1084    /// # Returns
1085    ///
1086    /// Returns a new `Constraints` object containing only the constraints
1087    /// that are valid for the projected columns.
1088    fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1089        let indices = mapping
1090            .iter()
1091            .filter_map(|(_, target)| target.as_any().downcast_ref::<Column>())
1092            .map(|col| col.index())
1093            .collect::<Vec<_>>();
1094        debug_assert_eq!(mapping.map.len(), indices.len());
1095        self.constraints.project(&indices)
1096    }
1097
1098    /// Projects the equivalences within according to `mapping`
1099    /// and `output_schema`.
1100    pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1101        let eq_group = self.eq_group.project(mapping);
1102        let oeq_class = OrderingEquivalenceClass::new(self.projected_orderings(mapping));
1103        let constants = self.projected_constants(mapping);
1104        let constraints = self
1105            .projected_constraints(mapping)
1106            .unwrap_or_else(Constraints::empty);
1107        Self {
1108            schema: output_schema,
1109            eq_group,
1110            oeq_class,
1111            constants,
1112            constraints,
1113        }
1114    }
1115
1116    /// Returns the longest (potentially partial) permutation satisfying the
1117    /// existing ordering. For example, if we have the equivalent orderings
1118    /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1119    /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1120    /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1121    /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1122    /// inside the argument `exprs` (respectively). For the mathematical
1123    /// definition of "partial permutation", see:
1124    ///
1125    /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1126    pub fn find_longest_permutation(
1127        &self,
1128        exprs: &[Arc<dyn PhysicalExpr>],
1129    ) -> (LexOrdering, Vec<usize>) {
1130        let mut eq_properties = self.clone();
1131        let mut result = vec![];
1132        // The algorithm is as follows:
1133        // - Iterate over all the expressions and insert ordered expressions
1134        //   into the result.
1135        // - Treat inserted expressions as constants (i.e. add them as constants
1136        //   to the state).
1137        // - Continue the above procedure until no expression is inserted; i.e.
1138        //   the algorithm reaches a fixed point.
1139        // This algorithm should reach a fixed point in at most `exprs.len()`
1140        // iterations.
1141        let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1142        for _idx in 0..exprs.len() {
1143            // Get ordered expressions with their indices.
1144            let ordered_exprs = search_indices
1145                .iter()
1146                .flat_map(|&idx| {
1147                    let ExprProperties {
1148                        sort_properties, ..
1149                    } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1150                    match sort_properties {
1151                        SortProperties::Ordered(options) => Some((
1152                            PhysicalSortExpr {
1153                                expr: Arc::clone(&exprs[idx]),
1154                                options,
1155                            },
1156                            idx,
1157                        )),
1158                        SortProperties::Singleton => {
1159                            // Assign default ordering to constant expressions
1160                            let options = SortOptions::default();
1161                            Some((
1162                                PhysicalSortExpr {
1163                                    expr: Arc::clone(&exprs[idx]),
1164                                    options,
1165                                },
1166                                idx,
1167                            ))
1168                        }
1169                        SortProperties::Unordered => None,
1170                    }
1171                })
1172                .collect::<Vec<_>>();
1173            // We reached a fixed point, exit.
1174            if ordered_exprs.is_empty() {
1175                break;
1176            }
1177            // Remove indices that have an ordering from `search_indices`, and
1178            // treat ordered expressions as constants in subsequent iterations.
1179            // We can do this because the "next" key only matters in a lexicographical
1180            // ordering when the keys to its left have the same values.
1181            //
1182            // Note that these expressions are not properly "constants". This is just
1183            // an implementation strategy confined to this function.
1184            for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1185                eq_properties =
1186                    eq_properties.with_constants(std::iter::once(ConstExpr::from(expr)));
1187                search_indices.shift_remove(idx);
1188            }
1189            // Add new ordered section to the state.
1190            result.extend(ordered_exprs);
1191        }
1192        let (left, right) = result.into_iter().unzip();
1193        (LexOrdering::new(left), right)
1194    }
1195
1196    /// This function determines whether the provided expression is constant
1197    /// based on the known constants.
1198    ///
1199    /// # Parameters
1200    ///
1201    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1202    ///   expression to be checked.
1203    ///
1204    /// # Returns
1205    ///
1206    /// Returns `true` if the expression is constant according to equivalence
1207    /// group, `false` otherwise.
1208    pub fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
1209        // As an example, assume that we know columns `a` and `b` are constant.
1210        // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1211        // return `false`.
1212        let const_exprs = self
1213            .constants
1214            .iter()
1215            .map(|const_expr| Arc::clone(const_expr.expr()));
1216        let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1217        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1218        is_constant_recurse(&normalized_constants, &normalized_expr)
1219    }
1220
1221    /// This function determines whether the provided expression is constant
1222    /// across partitions based on the known constants.
1223    ///
1224    /// # Parameters
1225    ///
1226    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1227    ///   expression to be checked.
1228    ///
1229    /// # Returns
1230    ///
1231    /// Returns `true` if the expression is constant across all partitions according
1232    /// to equivalence group, `false` otherwise
1233    #[deprecated(
1234        since = "45.0.0",
1235        note = "Use [`is_expr_constant_across_partitions`] instead"
1236    )]
1237    pub fn is_expr_constant_accross_partitions(
1238        &self,
1239        expr: &Arc<dyn PhysicalExpr>,
1240    ) -> bool {
1241        self.is_expr_constant_across_partitions(expr)
1242    }
1243
1244    /// This function determines whether the provided expression is constant
1245    /// across partitions based on the known constants.
1246    ///
1247    /// # Parameters
1248    ///
1249    /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1250    ///   expression to be checked.
1251    ///
1252    /// # Returns
1253    ///
1254    /// Returns `true` if the expression is constant across all partitions according
1255    /// to equivalence group, `false` otherwise.
1256    pub fn is_expr_constant_across_partitions(
1257        &self,
1258        expr: &Arc<dyn PhysicalExpr>,
1259    ) -> bool {
1260        // As an example, assume that we know columns `a` and `b` are constant.
1261        // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1262        // return `false`.
1263        let const_exprs = self
1264            .constants
1265            .iter()
1266            .filter_map(|const_expr| {
1267                if matches!(
1268                    const_expr.across_partitions(),
1269                    AcrossPartitions::Uniform { .. }
1270                ) {
1271                    Some(Arc::clone(const_expr.expr()))
1272                } else {
1273                    None
1274                }
1275            })
1276            .collect::<Vec<_>>();
1277        let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1278        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1279        is_constant_recurse(&normalized_constants, &normalized_expr)
1280    }
1281
1282    /// Retrieves the constant value of a given physical expression, if it exists.
1283    ///
1284    /// Normalizes the input expression and checks if it matches any known constants
1285    /// in the current context. Returns whether the expression has a uniform value,
1286    /// varies across partitions, or is not constant.
1287    ///
1288    /// # Parameters
1289    /// - `expr`: A reference to the physical expression to evaluate.
1290    ///
1291    /// # Returns
1292    /// - `AcrossPartitions::Uniform(value)`: If the expression has the same value across partitions.
1293    /// - `AcrossPartitions::Heterogeneous`: If the expression varies across partitions.
1294    /// - `None`: If the expression is not recognized as constant.
1295    pub fn get_expr_constant_value(
1296        &self,
1297        expr: &Arc<dyn PhysicalExpr>,
1298    ) -> AcrossPartitions {
1299        let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1300
1301        if let Some(lit) = normalized_expr.as_any().downcast_ref::<Literal>() {
1302            return AcrossPartitions::Uniform(Some(lit.value().clone()));
1303        }
1304
1305        for const_expr in self.constants.iter() {
1306            if normalized_expr.eq(const_expr.expr()) {
1307                return const_expr.across_partitions();
1308            }
1309        }
1310
1311        AcrossPartitions::Heterogeneous
1312    }
1313
1314    /// Retrieves the properties for a given physical expression.
1315    ///
1316    /// This function constructs an [`ExprProperties`] object for the given
1317    /// expression, which encapsulates information about the expression's
1318    /// properties, including its [`SortProperties`] and [`Interval`].
1319    ///
1320    /// # Parameters
1321    ///
1322    /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1323    ///   for which ordering information is sought.
1324    ///
1325    /// # Returns
1326    ///
1327    /// Returns an [`ExprProperties`] object containing the ordering and range
1328    /// information for the given expression.
1329    pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1330        ExprPropertiesNode::new_unknown(expr)
1331            .transform_up(|expr| update_properties(expr, self))
1332            .data()
1333            .map(|node| node.data)
1334            .unwrap_or(ExprProperties::new_unknown())
1335    }
1336
1337    /// Transforms this `EquivalenceProperties` into a new `EquivalenceProperties`
1338    /// by mapping columns in the original schema to columns in the new schema
1339    /// by index.
1340    pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
1341        // The new schema and the original schema is aligned when they have the
1342        // same number of columns, and fields at the same index have the same
1343        // type in both schemas.
1344        let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1345            && self
1346                .schema
1347                .fields
1348                .iter()
1349                .zip(schema.fields.iter())
1350                .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1351        if !schemas_aligned {
1352            // Rewriting equivalence properties in terms of new schema is not
1353            // safe when schemas are not aligned:
1354            return plan_err!(
1355                "Cannot rewrite old_schema:{:?} with new schema: {:?}",
1356                self.schema,
1357                schema
1358            );
1359        }
1360        // Rewrite constants according to new schema:
1361        let new_constants = self
1362            .constants
1363            .into_iter()
1364            .map(|const_expr| {
1365                let across_partitions = const_expr.across_partitions();
1366                let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?;
1367                Ok(ConstExpr::new(new_const_expr)
1368                    .with_across_partitions(across_partitions))
1369            })
1370            .collect::<Result<Vec<_>>>()?;
1371
1372        // Rewrite orderings according to new schema:
1373        let mut new_orderings = vec![];
1374        for ordering in self.oeq_class {
1375            let new_ordering = ordering
1376                .into_iter()
1377                .map(|mut sort_expr| {
1378                    sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
1379                    Ok(sort_expr)
1380                })
1381                .collect::<Result<_>>()?;
1382            new_orderings.push(new_ordering);
1383        }
1384
1385        // Rewrite equivalence classes according to the new schema:
1386        let mut eq_classes = vec![];
1387        for eq_class in self.eq_group {
1388            let new_eq_exprs = eq_class
1389                .into_vec()
1390                .into_iter()
1391                .map(|expr| with_new_schema(expr, &schema))
1392                .collect::<Result<_>>()?;
1393            eq_classes.push(EquivalenceClass::new(new_eq_exprs));
1394        }
1395
1396        // Construct the resulting equivalence properties:
1397        let mut result = EquivalenceProperties::new(schema);
1398        result.constants = new_constants;
1399        result.add_new_orderings(new_orderings);
1400        result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
1401
1402        Ok(result)
1403    }
1404}
1405
1406/// More readable display version of the `EquivalenceProperties`.
1407///
1408/// Format:
1409/// ```text
1410/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1]
1411/// ```
1412impl Display for EquivalenceProperties {
1413    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1414        if self.eq_group.is_empty()
1415            && self.oeq_class.is_empty()
1416            && self.constants.is_empty()
1417        {
1418            return write!(f, "No properties");
1419        }
1420        if !self.oeq_class.is_empty() {
1421            write!(f, "order: {}", self.oeq_class)?;
1422        }
1423        if !self.eq_group.is_empty() {
1424            write!(f, ", eq: {}", self.eq_group)?;
1425        }
1426        if !self.constants.is_empty() {
1427            write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?;
1428        }
1429        Ok(())
1430    }
1431}
1432
1433/// Calculates the properties of a given [`ExprPropertiesNode`].
1434///
1435/// Order information can be retrieved as:
1436/// - If it is a leaf node, we directly find the order of the node by looking
1437///   at the given sort expression and equivalence properties if it is a `Column`
1438///   leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1439///   it as singleton so that it can cooperate with all ordered columns.
1440/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1441///   and operator has its own rules on how to propagate the children orderings.
1442///   However, before we engage in recursion, we check whether this intermediate
1443///   node directly matches with the sort expression. If there is a match, the
1444///   sort expression emerges at that node immediately, discarding the recursive
1445///   result coming from its children.
1446///
1447/// Range information is calculated as:
1448/// - If it is a `Literal` node, we set the range as a point value. If it is a
1449///   `Column` node, we set the datatype of the range, but cannot give an interval
1450///   for the range, yet.
1451/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1452///   and operator has its own rules on how to propagate the children range.
1453fn update_properties(
1454    mut node: ExprPropertiesNode,
1455    eq_properties: &EquivalenceProperties,
1456) -> Result<Transformed<ExprPropertiesNode>> {
1457    // First, try to gather the information from the children:
1458    if !node.expr.children().is_empty() {
1459        // We have an intermediate (non-leaf) node, account for its children:
1460        let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1461        node.data = node.expr.get_properties(&children_props)?;
1462    } else if node.expr.as_any().is::<Literal>() {
1463        // We have a Literal, which is one of the two possible leaf node types:
1464        node.data = node.expr.get_properties(&[])?;
1465    } else if node.expr.as_any().is::<Column>() {
1466        // We have a Column, which is the other possible leaf node type:
1467        node.data.range =
1468            Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1469    }
1470    // Now, check what we know about orderings:
1471    let normalized_expr = eq_properties
1472        .eq_group
1473        .normalize_expr(Arc::clone(&node.expr));
1474    let oeq_class = eq_properties.normalized_oeq_class();
1475    if eq_properties.is_expr_constant(&normalized_expr)
1476        || oeq_class.is_expr_partial_const(&normalized_expr)
1477    {
1478        node.data.sort_properties = SortProperties::Singleton;
1479    } else if let Some(options) = oeq_class.get_options(&normalized_expr) {
1480        node.data.sort_properties = SortProperties::Ordered(options);
1481    }
1482    Ok(Transformed::yes(node))
1483}
1484
1485/// This function determines whether the provided expression is constant
1486/// based on the known constants.
1487///
1488/// # Parameters
1489///
1490/// - `constants`: A `&[Arc<dyn PhysicalExpr>]` containing expressions known to
1491///   be a constant.
1492/// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the expression
1493///   to check.
1494///
1495/// # Returns
1496///
1497/// Returns `true` if the expression is constant according to equivalence
1498/// group, `false` otherwise.
1499fn is_constant_recurse(
1500    constants: &[Arc<dyn PhysicalExpr>],
1501    expr: &Arc<dyn PhysicalExpr>,
1502) -> bool {
1503    if physical_exprs_contains(constants, expr) || expr.as_any().is::<Literal>() {
1504        return true;
1505    }
1506    let children = expr.children();
1507    !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
1508}
1509
1510/// This function examines whether a referring expression directly refers to a
1511/// given referred expression or if any of its children in the expression tree
1512/// refer to the specified expression.
1513///
1514/// # Parameters
1515///
1516/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1517/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1518///
1519/// # Returns
1520///
1521/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1522/// `referred_expr` or not.
1523fn expr_refers(
1524    referring_expr: &Arc<dyn PhysicalExpr>,
1525    referred_expr: &Arc<dyn PhysicalExpr>,
1526) -> bool {
1527    referring_expr.eq(referred_expr)
1528        || referring_expr
1529            .children()
1530            .iter()
1531            .any(|child| expr_refers(child, referred_expr))
1532}
1533
1534/// This function examines the given expression and its properties to determine
1535/// the ordering properties of the expression. The range knowledge is not utilized
1536/// yet in the scope of this function.
1537///
1538/// # Parameters
1539///
1540/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1541///   which ordering properties need to be determined.
1542/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1543///   referred to by `expr`.
1544/// - `schema``: A reference to the schema which the `expr` columns refer.
1545///
1546/// # Returns
1547///
1548/// A `SortProperties` indicating the ordering information of the given expression.
1549fn get_expr_properties(
1550    expr: &Arc<dyn PhysicalExpr>,
1551    dependencies: &Dependencies,
1552    schema: &SchemaRef,
1553) -> Result<ExprProperties> {
1554    if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1555        // If exact match is found, return its ordering.
1556        Ok(ExprProperties {
1557            sort_properties: SortProperties::Ordered(column_order.options),
1558            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1559            preserves_lex_ordering: false,
1560        })
1561    } else if expr.as_any().downcast_ref::<Column>().is_some() {
1562        Ok(ExprProperties {
1563            sort_properties: SortProperties::Unordered,
1564            range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1565            preserves_lex_ordering: false,
1566        })
1567    } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1568        Ok(ExprProperties {
1569            sort_properties: SortProperties::Singleton,
1570            range: Interval::try_new(literal.value().clone(), literal.value().clone())?,
1571            preserves_lex_ordering: true,
1572        })
1573    } else {
1574        // Find orderings of its children
1575        let child_states = expr
1576            .children()
1577            .iter()
1578            .map(|child| get_expr_properties(child, dependencies, schema))
1579            .collect::<Result<Vec<_>>>()?;
1580        // Calculate expression ordering using ordering of its children.
1581        expr.get_properties(&child_states)
1582    }
1583}
1584
1585/// Wrapper struct for `Arc<dyn PhysicalExpr>` to use them as keys in a hash map.
1586#[derive(Debug, Clone)]
1587struct ExprWrapper(Arc<dyn PhysicalExpr>);
1588
1589impl PartialEq<Self> for ExprWrapper {
1590    fn eq(&self, other: &Self) -> bool {
1591        self.0.eq(&other.0)
1592    }
1593}
1594
1595impl Eq for ExprWrapper {}
1596
1597impl Hash for ExprWrapper {
1598    fn hash<H: Hasher>(&self, state: &mut H) {
1599        self.0.hash(state);
1600    }
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605
1606    use super::*;
1607    use crate::expressions::{col, BinaryExpr};
1608
1609    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1610    use datafusion_expr::Operator;
1611
1612    #[test]
1613    fn test_expr_consists_of_constants() -> Result<()> {
1614        let schema = Arc::new(Schema::new(vec![
1615            Field::new("a", DataType::Int32, true),
1616            Field::new("b", DataType::Int32, true),
1617            Field::new("c", DataType::Int32, true),
1618            Field::new("d", DataType::Int32, true),
1619            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1620        ]));
1621        let col_a = col("a", &schema)?;
1622        let col_b = col("b", &schema)?;
1623        let col_d = col("d", &schema)?;
1624        let b_plus_d = Arc::new(BinaryExpr::new(
1625            Arc::clone(&col_b),
1626            Operator::Plus,
1627            Arc::clone(&col_d),
1628        )) as Arc<dyn PhysicalExpr>;
1629
1630        let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b)];
1631        let expr = Arc::clone(&b_plus_d);
1632        assert!(!is_constant_recurse(&constants, &expr));
1633
1634        let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_d)];
1635        let expr = Arc::clone(&b_plus_d);
1636        assert!(is_constant_recurse(&constants, &expr));
1637        Ok(())
1638    }
1639}