datafusion_physical_expr/equivalence/properties/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod dependency; // Submodule containing DependencyMap and Dependencies
19mod joins; // Submodule containing join_equivalence_properties
20mod union; // Submodule containing calculate_union
21
22use dependency::{
23 construct_prefix_orderings, generate_dependency_orderings, referred_dependencies,
24 Dependencies, DependencyMap,
25};
26pub use joins::*;
27pub use union::*;
28
29use std::fmt::Display;
30use std::hash::{Hash, Hasher};
31use std::sync::Arc;
32use std::{fmt, mem};
33
34use crate::equivalence::class::{const_exprs_contains, AcrossPartitions};
35use crate::equivalence::{
36 EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping,
37};
38use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
39use crate::{
40 physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, PhysicalExpr,
41 PhysicalSortExpr, PhysicalSortRequirement,
42};
43
44use arrow::compute::SortOptions;
45use arrow::datatypes::SchemaRef;
46use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
47use datafusion_common::{plan_err, Constraint, Constraints, HashMap, Result};
48use datafusion_expr::interval_arithmetic::Interval;
49use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
50use datafusion_physical_expr_common::utils::ExprPropertiesNode;
51
52use indexmap::IndexSet;
53use itertools::Itertools;
54
55/// A `EquivalenceProperties` object stores information known about the output
56/// of a plan node, that can be used to optimize the plan.
57///
58/// Currently, it keeps track of:
59/// - Sort expressions (orderings)
60/// - Equivalent expressions: expressions that are known to have same value.
61/// - Constants expressions: expressions that are known to contain a single
62/// constant value.
63///
64/// # Example equivalent sort expressions
65///
66/// Consider table below:
67///
68/// ```text
69/// ┌-------┐
70/// | a | b |
71/// |---|---|
72/// | 1 | 9 |
73/// | 2 | 8 |
74/// | 3 | 7 |
75/// | 5 | 5 |
76/// └---┴---┘
77/// ```
78///
79/// In this case, both `a ASC` and `b DESC` can describe the table ordering.
80/// `EquivalenceProperties`, tracks these different valid sort expressions and
81/// treat `a ASC` and `b DESC` on an equal footing. For example if the query
82/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be
83/// avoided.
84///
85/// # Example equivalent expressions
86///
87/// Similarly, consider the table below:
88///
89/// ```text
90/// ┌-------┐
91/// | a | b |
92/// |---|---|
93/// | 1 | 1 |
94/// | 2 | 2 |
95/// | 3 | 3 |
96/// | 5 | 5 |
97/// └---┴---┘
98/// ```
99///
100/// In this case, columns `a` and `b` always have the same value, which can of
101/// such equivalences inside this object. With this information, Datafusion can
102/// optimize operations such as. For example, if the partition requirement is
103/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids
104/// repartitioning the data as the existing partitioning satisfies the
105/// requirement.
106///
107/// # Code Example
108/// ```
109/// # use std::sync::Arc;
110/// # use arrow::datatypes::{Schema, Field, DataType, SchemaRef};
111/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
112/// # use datafusion_physical_expr::expressions::col;
113/// use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
114/// # let schema: SchemaRef = Arc::new(Schema::new(vec![
115/// # Field::new("a", DataType::Int32, false),
116/// # Field::new("b", DataType::Int32, false),
117/// # Field::new("c", DataType::Int32, false),
118/// # ]));
119/// # let col_a = col("a", &schema).unwrap();
120/// # let col_b = col("b", &schema).unwrap();
121/// # let col_c = col("c", &schema).unwrap();
122/// // This object represents data that is sorted by a ASC, c DESC
123/// // with a single constant value of b
124/// let mut eq_properties = EquivalenceProperties::new(schema)
125/// .with_constants(vec![ConstExpr::from(col_b)]);
126/// eq_properties.add_new_ordering(LexOrdering::new(vec![
127/// PhysicalSortExpr::new_default(col_a).asc(),
128/// PhysicalSortExpr::new_default(col_c).desc(),
129/// ]));
130///
131/// assert_eq!(eq_properties.to_string(), "order: [[a@0 ASC, c@2 DESC]], const: [b@1(heterogeneous)]")
132/// ```
133#[derive(Debug, Clone)]
134pub struct EquivalenceProperties {
135 /// Distinct equivalence classes (exprs known to have the same expressions)
136 eq_group: EquivalenceGroup,
137 /// Equivalent sort expressions
138 oeq_class: OrderingEquivalenceClass,
139 /// Expressions whose values are constant
140 ///
141 /// TODO: We do not need to track constants separately, they can be tracked
142 /// inside `eq_group` as `Literal` expressions.
143 constants: Vec<ConstExpr>,
144 /// Table constraints
145 constraints: Constraints,
146 /// Schema associated with this object.
147 schema: SchemaRef,
148}
149
150impl EquivalenceProperties {
151 /// Creates an empty `EquivalenceProperties` object.
152 pub fn new(schema: SchemaRef) -> Self {
153 Self {
154 eq_group: EquivalenceGroup::empty(),
155 oeq_class: OrderingEquivalenceClass::empty(),
156 constants: vec![],
157 constraints: Constraints::empty(),
158 schema,
159 }
160 }
161
162 /// Adds constraints to the properties.
163 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
164 self.constraints = constraints;
165 self
166 }
167
168 /// Creates a new `EquivalenceProperties` object with the given orderings.
169 pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self {
170 Self {
171 eq_group: EquivalenceGroup::empty(),
172 oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()),
173 constants: vec![],
174 constraints: Constraints::empty(),
175 schema,
176 }
177 }
178
179 /// Returns the associated schema.
180 pub fn schema(&self) -> &SchemaRef {
181 &self.schema
182 }
183
184 /// Returns a reference to the ordering equivalence class within.
185 pub fn oeq_class(&self) -> &OrderingEquivalenceClass {
186 &self.oeq_class
187 }
188
189 /// Return the inner OrderingEquivalenceClass, consuming self
190 pub fn into_oeq_class(self) -> OrderingEquivalenceClass {
191 self.oeq_class
192 }
193
194 /// Returns a reference to the equivalence group within.
195 pub fn eq_group(&self) -> &EquivalenceGroup {
196 &self.eq_group
197 }
198
199 /// Returns a reference to the constant expressions
200 pub fn constants(&self) -> &[ConstExpr] {
201 &self.constants
202 }
203
204 pub fn constraints(&self) -> &Constraints {
205 &self.constraints
206 }
207
208 /// Returns the output ordering of the properties.
209 pub fn output_ordering(&self) -> Option<LexOrdering> {
210 let constants = self.constants();
211 let mut output_ordering = self.oeq_class().output_ordering().unwrap_or_default();
212 // Prune out constant expressions
213 output_ordering
214 .retain(|sort_expr| !const_exprs_contains(constants, &sort_expr.expr));
215 (!output_ordering.is_empty()).then_some(output_ordering)
216 }
217
218 /// Returns the normalized version of the ordering equivalence class within.
219 /// Normalization removes constants and duplicates as well as standardizing
220 /// expressions according to the equivalence group within.
221 pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass {
222 OrderingEquivalenceClass::new(
223 self.oeq_class
224 .iter()
225 .map(|ordering| self.normalize_sort_exprs(ordering))
226 .collect(),
227 )
228 }
229
230 /// Extends this `EquivalenceProperties` with the `other` object.
231 pub fn extend(mut self, other: Self) -> Self {
232 self.eq_group.extend(other.eq_group);
233 self.oeq_class.extend(other.oeq_class);
234 self.with_constants(other.constants)
235 }
236
237 /// Clears (empties) the ordering equivalence class within this object.
238 /// Call this method when existing orderings are invalidated.
239 pub fn clear_orderings(&mut self) {
240 self.oeq_class.clear();
241 }
242
243 /// Removes constant expressions that may change across partitions.
244 /// This method should be used when data from different partitions are merged.
245 pub fn clear_per_partition_constants(&mut self) {
246 self.constants.retain(|item| {
247 matches!(item.across_partitions(), AcrossPartitions::Uniform(_))
248 })
249 }
250
251 /// Extends this `EquivalenceProperties` by adding the orderings inside the
252 /// ordering equivalence class `other`.
253 pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) {
254 self.oeq_class.extend(other);
255 }
256
257 /// Adds new orderings into the existing ordering equivalence class.
258 pub fn add_new_orderings(
259 &mut self,
260 orderings: impl IntoIterator<Item = LexOrdering>,
261 ) {
262 self.oeq_class.add_new_orderings(orderings);
263 }
264
265 /// Adds a single ordering to the existing ordering equivalence class.
266 pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
267 self.add_new_orderings([ordering]);
268 }
269
270 /// Incorporates the given equivalence group to into the existing
271 /// equivalence group within.
272 pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) {
273 self.eq_group.extend(other_eq_group);
274 }
275
276 /// Adds a new equality condition into the existing equivalence group.
277 /// If the given equality defines a new equivalence class, adds this new
278 /// equivalence class to the equivalence group.
279 pub fn add_equal_conditions(
280 &mut self,
281 left: &Arc<dyn PhysicalExpr>,
282 right: &Arc<dyn PhysicalExpr>,
283 ) -> Result<()> {
284 // Discover new constants in light of new the equality:
285 if self.is_expr_constant(left) {
286 // Left expression is constant, add right as constant
287 if !const_exprs_contains(&self.constants, right) {
288 let const_expr = ConstExpr::from(right)
289 .with_across_partitions(self.get_expr_constant_value(left));
290 self.constants.push(const_expr);
291 }
292 } else if self.is_expr_constant(right) {
293 // Right expression is constant, add left as constant
294 if !const_exprs_contains(&self.constants, left) {
295 let const_expr = ConstExpr::from(left)
296 .with_across_partitions(self.get_expr_constant_value(right));
297 self.constants.push(const_expr);
298 }
299 }
300
301 // Add equal expressions to the state
302 self.eq_group.add_equal_conditions(left, right);
303
304 // Discover any new orderings
305 self.discover_new_orderings(left)?;
306 Ok(())
307 }
308
309 /// Track/register physical expressions with constant values.
310 #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")]
311 pub fn add_constants(self, constants: impl IntoIterator<Item = ConstExpr>) -> Self {
312 self.with_constants(constants)
313 }
314
315 /// Remove the specified constant
316 pub fn remove_constant(mut self, c: &ConstExpr) -> Self {
317 self.constants.retain(|existing| existing != c);
318 self
319 }
320
321 /// Track/register physical expressions with constant values.
322 pub fn with_constants(
323 mut self,
324 constants: impl IntoIterator<Item = ConstExpr>,
325 ) -> Self {
326 let normalized_constants = constants
327 .into_iter()
328 .filter_map(|c| {
329 let across_partitions = c.across_partitions();
330 let expr = c.owned_expr();
331 let normalized_expr = self.eq_group.normalize_expr(expr);
332
333 if const_exprs_contains(&self.constants, &normalized_expr) {
334 return None;
335 }
336
337 let const_expr = ConstExpr::from(normalized_expr)
338 .with_across_partitions(across_partitions);
339
340 Some(const_expr)
341 })
342 .collect::<Vec<_>>();
343
344 // Add all new normalized constants
345 self.constants.extend(normalized_constants);
346
347 // Discover any new orderings based on the constants
348 for ordering in self.normalized_oeq_class().iter() {
349 if let Err(e) = self.discover_new_orderings(&ordering[0].expr) {
350 log::debug!("error discovering new orderings: {e}");
351 }
352 }
353
354 self
355 }
356
357 // Discover new valid orderings in light of a new equality.
358 // Accepts a single argument (`expr`) which is used to determine
359 // which orderings should be updated.
360 // When constants or equivalence classes are changed, there may be new orderings
361 // that can be discovered with the new equivalence properties.
362 // For a discussion, see: https://github.com/apache/datafusion/issues/9812
363 fn discover_new_orderings(&mut self, expr: &Arc<dyn PhysicalExpr>) -> Result<()> {
364 let normalized_expr = self.eq_group().normalize_expr(Arc::clone(expr));
365 let eq_class = self
366 .eq_group
367 .iter()
368 .find_map(|class| {
369 class
370 .contains(&normalized_expr)
371 .then(|| class.clone().into_vec())
372 })
373 .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]);
374
375 let mut new_orderings: Vec<LexOrdering> = vec![];
376 for ordering in self.normalized_oeq_class().iter() {
377 if !ordering[0].expr.eq(&normalized_expr) {
378 continue;
379 }
380
381 let leading_ordering_options = ordering[0].options;
382
383 for equivalent_expr in &eq_class {
384 let children = equivalent_expr.children();
385 if children.is_empty() {
386 continue;
387 }
388
389 // Check if all children match the next expressions in the ordering
390 let mut all_children_match = true;
391 let mut child_properties = vec![];
392
393 // Build properties for each child based on the next expressions
394 for (i, child) in children.iter().enumerate() {
395 if let Some(next) = ordering.get(i + 1) {
396 if !child.as_ref().eq(next.expr.as_ref()) {
397 all_children_match = false;
398 break;
399 }
400 child_properties.push(ExprProperties {
401 sort_properties: SortProperties::Ordered(next.options),
402 range: Interval::make_unbounded(
403 &child.data_type(&self.schema)?,
404 )?,
405 preserves_lex_ordering: true,
406 });
407 } else {
408 all_children_match = false;
409 break;
410 }
411 }
412
413 if all_children_match {
414 // Check if the expression is monotonic in all arguments
415 if let Ok(expr_properties) =
416 equivalent_expr.get_properties(&child_properties)
417 {
418 if expr_properties.preserves_lex_ordering
419 && SortProperties::Ordered(leading_ordering_options)
420 == expr_properties.sort_properties
421 {
422 // Assume existing ordering is [c ASC, a ASC, b ASC]
423 // When equality c = f(a,b) is given, if we know that given ordering `[a ASC, b ASC]`,
424 // ordering `[f(a,b) ASC]` is valid, then we can deduce that ordering `[a ASC, b ASC]` is also valid.
425 // Hence, ordering `[a ASC, b ASC]` can be added to the state as a valid ordering.
426 // (e.g. existing ordering where leading ordering is removed)
427 new_orderings.push(LexOrdering::new(ordering[1..].to_vec()));
428 break;
429 }
430 }
431 }
432 }
433 }
434
435 self.oeq_class.add_new_orderings(new_orderings);
436 Ok(())
437 }
438
439 /// Updates the ordering equivalence group within assuming that the table
440 /// is re-sorted according to the argument `sort_exprs`. Note that constants
441 /// and equivalence classes are unchanged as they are unaffected by a re-sort.
442 /// If the given ordering is already satisfied, the function does nothing.
443 pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
444 // Filter out constant expressions as they don't affect ordering
445 let filtered_exprs = LexOrdering::new(
446 sort_exprs
447 .into_iter()
448 .filter(|expr| !self.is_expr_constant(&expr.expr))
449 .collect(),
450 );
451
452 if filtered_exprs.is_empty() {
453 return self;
454 }
455
456 let mut new_orderings = vec![filtered_exprs.clone()];
457
458 // Preserve valid suffixes from existing orderings
459 let oeq_class = mem::take(&mut self.oeq_class);
460 for existing in oeq_class {
461 if self.is_prefix_of(&filtered_exprs, &existing) {
462 let mut extended = filtered_exprs.clone();
463 extended.extend(existing.into_iter().skip(filtered_exprs.len()));
464 new_orderings.push(extended);
465 }
466 }
467
468 self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
469 self
470 }
471
472 /// Checks if the new ordering matches a prefix of the existing ordering
473 /// (considering expression equivalences)
474 fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> bool {
475 // Check if new order is longer than existing - can't be a prefix
476 if new_order.len() > existing.len() {
477 return false;
478 }
479
480 // Check if new order matches existing prefix (considering equivalences)
481 new_order.iter().zip(existing).all(|(new, existing)| {
482 self.eq_group.exprs_equal(&new.expr, &existing.expr)
483 && new.options == existing.options
484 })
485 }
486
487 /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
488 /// equivalence group and the ordering equivalence class within.
489 ///
490 /// Assume that `self.eq_group` states column `a` and `b` are aliases.
491 /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
492 /// are equivalent (in the sense that both describe the ordering of the table).
493 /// If the `sort_exprs` argument were `vec![b ASC, c ASC, a ASC]`, then this
494 /// function would return `vec![a ASC, c ASC]`. Internally, it would first
495 /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
496 /// after deduplication.
497 fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
498 // Convert sort expressions to sort requirements:
499 let sort_reqs = LexRequirement::from(sort_exprs.clone());
500 // Normalize the requirements:
501 let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
502 // Convert sort requirements back to sort expressions:
503 LexOrdering::from(normalized_sort_reqs)
504 }
505
506 /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
507 /// equivalence group and the ordering equivalence class within. It works by:
508 /// - Removing expressions that have a constant value from the given requirement.
509 /// - Replacing sections that belong to some equivalence class in the equivalence
510 /// group with the first entry in the matching equivalence class.
511 ///
512 /// Assume that `self.eq_group` states column `a` and `b` are aliases.
513 /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC`
514 /// are equivalent (in the sense that both describe the ordering of the table).
515 /// If the `sort_reqs` argument were `vec![b ASC, c ASC, a ASC]`, then this
516 /// function would return `vec![a ASC, c ASC]`. Internally, it would first
517 /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result
518 /// after deduplication.
519 fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> LexRequirement {
520 let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs);
521 let mut constant_exprs = vec![];
522 constant_exprs.extend(
523 self.constants
524 .iter()
525 .map(|const_expr| Arc::clone(const_expr.expr())),
526 );
527 let constants_normalized = self.eq_group.normalize_exprs(constant_exprs);
528 // Prune redundant sections in the requirement:
529 normalized_sort_reqs
530 .iter()
531 .filter(|&order| !physical_exprs_contains(&constants_normalized, &order.expr))
532 .cloned()
533 .collect::<LexRequirement>()
534 .collapse()
535 }
536
537 /// Checks whether the given ordering is satisfied by any of the existing
538 /// orderings.
539 pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
540 // Convert the given sort expressions to sort requirements:
541 let sort_requirements = LexRequirement::from(given.clone());
542 self.ordering_satisfy_requirement(&sort_requirements)
543 }
544
545 /// Checks whether the given sort requirements are satisfied by any of the
546 /// existing orderings.
547 pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
548 let mut eq_properties = self.clone();
549 // First, standardize the given requirement:
550 let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
551
552 // Check whether given ordering is satisfied by constraints first
553 if self.satisfied_by_constraints(&normalized_reqs) {
554 return true;
555 }
556
557 for normalized_req in normalized_reqs {
558 // Check whether given ordering is satisfied
559 if !eq_properties.ordering_satisfy_single(&normalized_req) {
560 return false;
561 }
562 // Treat satisfied keys as constants in subsequent iterations. We
563 // can do this because the "next" key only matters in a lexicographical
564 // ordering when the keys to its left have the same values.
565 //
566 // Note that these expressions are not properly "constants". This is just
567 // an implementation strategy confined to this function.
568 //
569 // For example, assume that the requirement is `[a ASC, (b + c) ASC]`,
570 // and existing equivalent orderings are `[a ASC, b ASC]` and `[c ASC]`.
571 // From the analysis above, we know that `[a ASC]` is satisfied. Then,
572 // we add column `a` as constant to the algorithm state. This enables us
573 // to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
574 eq_properties = eq_properties
575 .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
576 }
577 true
578 }
579
580 /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique).
581 /// Returns true if any constraint fully satisfies the requirements.
582 fn satisfied_by_constraints(
583 &self,
584 normalized_reqs: &[PhysicalSortRequirement],
585 ) -> bool {
586 self.constraints.iter().any(|constraint| match constraint {
587 Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self
588 .satisfied_by_constraint(
589 normalized_reqs,
590 indices,
591 matches!(constraint, Constraint::Unique(_)),
592 ),
593 })
594 }
595
596 /// Checks if sort requirements are satisfied by a constraint (primary key or unique).
597 /// Returns true if the constraint indices form a valid prefix of an existing ordering
598 /// that matches the requirements. For unique constraints, also verifies nullable columns.
599 fn satisfied_by_constraint(
600 &self,
601 normalized_reqs: &[PhysicalSortRequirement],
602 indices: &[usize],
603 check_null: bool,
604 ) -> bool {
605 // Requirements must contain indices
606 if indices.len() > normalized_reqs.len() {
607 return false;
608 }
609
610 // Iterate over all orderings
611 self.oeq_class.iter().any(|ordering| {
612 if indices.len() > ordering.len() {
613 return false;
614 }
615
616 // Build a map of column positions in the ordering
617 let mut col_positions = HashMap::with_capacity(ordering.len());
618 for (pos, req) in ordering.iter().enumerate() {
619 if let Some(col) = req.expr.as_any().downcast_ref::<Column>() {
620 col_positions.insert(
621 col.index(),
622 (pos, col.nullable(&self.schema).unwrap_or(true)),
623 );
624 }
625 }
626
627 // Check if all constraint indices appear in valid positions
628 if !indices.iter().all(|&idx| {
629 col_positions
630 .get(&idx)
631 .map(|&(pos, nullable)| {
632 // For unique constraints, verify column is not nullable if it's first/last
633 !check_null
634 || (pos != 0 && pos != ordering.len() - 1)
635 || !nullable
636 })
637 .unwrap_or(false)
638 }) {
639 return false;
640 }
641
642 // Check if this ordering matches requirements prefix
643 let ordering_len = ordering.len();
644 normalized_reqs.len() >= ordering_len
645 && normalized_reqs[..ordering_len].iter().zip(ordering).all(
646 |(req, existing)| {
647 req.expr.eq(&existing.expr)
648 && req
649 .options
650 .is_none_or(|req_opts| req_opts == existing.options)
651 },
652 )
653 })
654 }
655
656 /// Determines whether the ordering specified by the given sort requirement
657 /// is satisfied based on the orderings within, equivalence classes, and
658 /// constant expressions.
659 ///
660 /// # Parameters
661 ///
662 /// - `req`: A reference to a `PhysicalSortRequirement` for which the ordering
663 /// satisfaction check will be done.
664 ///
665 /// # Returns
666 ///
667 /// Returns `true` if the specified ordering is satisfied, `false` otherwise.
668 fn ordering_satisfy_single(&self, req: &PhysicalSortRequirement) -> bool {
669 let ExprProperties {
670 sort_properties, ..
671 } = self.get_expr_properties(Arc::clone(&req.expr));
672 match sort_properties {
673 SortProperties::Ordered(options) => {
674 let sort_expr = PhysicalSortExpr {
675 expr: Arc::clone(&req.expr),
676 options,
677 };
678 sort_expr.satisfy(req, self.schema())
679 }
680 // Singleton expressions satisfies any ordering.
681 SortProperties::Singleton => true,
682 SortProperties::Unordered => false,
683 }
684 }
685
686 /// Checks whether the `given` sort requirements are equal or more specific
687 /// than the `reference` sort requirements.
688 pub fn requirements_compatible(
689 &self,
690 given: &LexRequirement,
691 reference: &LexRequirement,
692 ) -> bool {
693 let normalized_given = self.normalize_sort_requirements(given);
694 let normalized_reference = self.normalize_sort_requirements(reference);
695
696 (normalized_reference.len() <= normalized_given.len())
697 && normalized_reference
698 .into_iter()
699 .zip(normalized_given)
700 .all(|(reference, given)| given.compatible(&reference))
701 }
702
703 /// Returns the finer ordering among the orderings `lhs` and `rhs`, breaking
704 /// any ties by choosing `lhs`.
705 ///
706 /// The finer ordering is the ordering that satisfies both of the orderings.
707 /// If the orderings are incomparable, returns `None`.
708 ///
709 /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is
710 /// the latter.
711 pub fn get_finer_ordering(
712 &self,
713 lhs: &LexOrdering,
714 rhs: &LexOrdering,
715 ) -> Option<LexOrdering> {
716 // Convert the given sort expressions to sort requirements:
717 let lhs = LexRequirement::from(lhs.clone());
718 let rhs = LexRequirement::from(rhs.clone());
719 let finer = self.get_finer_requirement(&lhs, &rhs);
720 // Convert the chosen sort requirements back to sort expressions:
721 finer.map(LexOrdering::from)
722 }
723
724 /// Returns the finer ordering among the requirements `lhs` and `rhs`,
725 /// breaking any ties by choosing `lhs`.
726 ///
727 /// The finer requirements are the ones that satisfy both of the given
728 /// requirements. If the requirements are incomparable, returns `None`.
729 ///
730 /// For example, the finer requirements among `[a ASC]` and `[a ASC, b ASC]`
731 /// is the latter.
732 pub fn get_finer_requirement(
733 &self,
734 req1: &LexRequirement,
735 req2: &LexRequirement,
736 ) -> Option<LexRequirement> {
737 let mut lhs = self.normalize_sort_requirements(req1);
738 let mut rhs = self.normalize_sort_requirements(req2);
739 lhs.inner
740 .iter_mut()
741 .zip(rhs.inner.iter_mut())
742 .all(|(lhs, rhs)| {
743 lhs.expr.eq(&rhs.expr)
744 && match (lhs.options, rhs.options) {
745 (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt,
746 (Some(options), None) => {
747 rhs.options = Some(options);
748 true
749 }
750 (None, Some(options)) => {
751 lhs.options = Some(options);
752 true
753 }
754 (None, None) => true,
755 }
756 })
757 .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
758 }
759
760 /// we substitute the ordering according to input expression type, this is a simplified version
761 /// In this case, we just substitute when the expression satisfy the following condition:
762 /// I. just have one column and is a CAST expression
763 /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
764 /// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if
765 /// x is DESC or ASC
766 /// After substitution, we may generate more than 1 `LexOrdering`. As an example,
767 /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied.
768 pub fn substitute_ordering_component(
769 &self,
770 mapping: &ProjectionMapping,
771 sort_expr: &LexOrdering,
772 ) -> Result<Vec<LexOrdering>> {
773 let new_orderings = sort_expr
774 .iter()
775 .map(|sort_expr| {
776 let referring_exprs: Vec<_> = mapping
777 .iter()
778 .map(|(source, _target)| source)
779 .filter(|source| expr_refers(source, &sort_expr.expr))
780 .cloned()
781 .collect();
782 let mut res = LexOrdering::new(vec![sort_expr.clone()]);
783 // TODO: Add one-to-ones analysis for ScalarFunctions.
784 for r_expr in referring_exprs {
785 // we check whether this expression is substitutable or not
786 if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
787 // we need to know whether the Cast Expr matches or not
788 let expr_type = sort_expr.expr.data_type(&self.schema)?;
789 if cast_expr.expr.eq(&sort_expr.expr)
790 && cast_expr.is_bigger_cast(expr_type)
791 {
792 res.push(PhysicalSortExpr {
793 expr: Arc::clone(&r_expr),
794 options: sort_expr.options,
795 });
796 }
797 }
798 }
799 Ok(res)
800 })
801 .collect::<Result<Vec<_>>>()?;
802 // Generate all valid orderings, given substituted expressions.
803 let res = new_orderings
804 .into_iter()
805 .multi_cartesian_product()
806 .map(LexOrdering::new)
807 .collect::<Vec<_>>();
808 Ok(res)
809 }
810
811 /// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
812 /// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
813 /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
814 /// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
815 /// dependency map, happen in issue 8838: <https://github.com/apache/datafusion/issues/8838>
816 pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
817 let new_order = self
818 .oeq_class
819 .iter()
820 .map(|order| self.substitute_ordering_component(mapping, order))
821 .collect::<Result<Vec<_>>>()?;
822 let new_order = new_order.into_iter().flatten().collect();
823 self.oeq_class = OrderingEquivalenceClass::new(new_order);
824 Ok(())
825 }
826 /// Projects argument `expr` according to `projection_mapping`, taking
827 /// equivalences into account.
828 ///
829 /// For example, assume that columns `a` and `c` are always equal, and that
830 /// `projection_mapping` encodes following mapping:
831 ///
832 /// ```text
833 /// a -> a1
834 /// b -> b1
835 /// ```
836 ///
837 /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to
838 /// `Some(a1 + b1)` and `d` to `None`, meaning that it cannot be projected.
839 pub fn project_expr(
840 &self,
841 expr: &Arc<dyn PhysicalExpr>,
842 projection_mapping: &ProjectionMapping,
843 ) -> Option<Arc<dyn PhysicalExpr>> {
844 self.eq_group.project_expr(projection_mapping, expr)
845 }
846
847 /// Constructs a dependency map based on existing orderings referred to in
848 /// the projection.
849 ///
850 /// This function analyzes the orderings in the normalized order-equivalence
851 /// class and builds a dependency map. The dependency map captures relationships
852 /// between expressions within the orderings, helping to identify dependencies
853 /// and construct valid projected orderings during projection operations.
854 ///
855 /// # Parameters
856 ///
857 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
858 /// relationship between source and target expressions.
859 ///
860 /// # Returns
861 ///
862 /// A [`DependencyMap`] representing the dependency map, where each
863 /// \[`DependencyNode`\] contains dependencies for the key [`PhysicalSortExpr`].
864 ///
865 /// # Example
866 ///
867 /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, c ASC]`,
868 /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + c]`.
869 /// Then, the dependency map will be:
870 ///
871 /// ```text
872 /// a ASC: Node {Some(a_new ASC), HashSet{}}
873 /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
874 /// c ASC: Node {None, HashSet{a ASC}}
875 /// ```
876 fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
877 let mut dependency_map = DependencyMap::new();
878 for ordering in self.normalized_oeq_class().iter() {
879 for (idx, sort_expr) in ordering.iter().enumerate() {
880 let target_sort_expr =
881 self.project_expr(&sort_expr.expr, mapping).map(|expr| {
882 PhysicalSortExpr {
883 expr,
884 options: sort_expr.options,
885 }
886 });
887 let is_projected = target_sort_expr.is_some();
888 if is_projected
889 || mapping
890 .iter()
891 .any(|(source, _)| expr_refers(source, &sort_expr.expr))
892 {
893 // Previous ordering is a dependency. Note that there is no,
894 // dependency for a leading ordering (i.e. the first sort
895 // expression).
896 let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
897 // Add sort expressions that can be projected or referred to
898 // by any of the projection expressions to the dependency map:
899 dependency_map.insert(
900 sort_expr,
901 target_sort_expr.as_ref(),
902 dependency,
903 );
904 }
905 if !is_projected {
906 // If we can not project, stop constructing the dependency
907 // map as remaining dependencies will be invalid after projection.
908 break;
909 }
910 }
911 }
912 dependency_map
913 }
914
915 /// Returns a new `ProjectionMapping` where source expressions are normalized.
916 ///
917 /// This normalization ensures that source expressions are transformed into a
918 /// consistent representation. This is beneficial for algorithms that rely on
919 /// exact equalities, as it allows for more precise and reliable comparisons.
920 ///
921 /// # Parameters
922 ///
923 /// - `mapping`: A reference to the original `ProjectionMapping` to be normalized.
924 ///
925 /// # Returns
926 ///
927 /// A new `ProjectionMapping` with normalized source expressions.
928 fn normalized_mapping(&self, mapping: &ProjectionMapping) -> ProjectionMapping {
929 // Construct the mapping where source expressions are normalized. In this way
930 // In the algorithms below we can work on exact equalities
931 ProjectionMapping {
932 map: mapping
933 .iter()
934 .map(|(source, target)| {
935 let normalized_source =
936 self.eq_group.normalize_expr(Arc::clone(source));
937 (normalized_source, Arc::clone(target))
938 })
939 .collect(),
940 }
941 }
942
943 /// Computes projected orderings based on a given projection mapping.
944 ///
945 /// This function takes a `ProjectionMapping` and computes the possible
946 /// orderings for the projected expressions. It considers dependencies
947 /// between expressions and generates valid orderings according to the
948 /// specified sort properties.
949 ///
950 /// # Parameters
951 ///
952 /// - `mapping`: A reference to the `ProjectionMapping` that defines the
953 /// relationship between source and target expressions.
954 ///
955 /// # Returns
956 ///
957 /// A vector of `LexOrdering` containing all valid orderings after projection.
958 fn projected_orderings(&self, mapping: &ProjectionMapping) -> Vec<LexOrdering> {
959 let mapping = self.normalized_mapping(mapping);
960
961 // Get dependency map for existing orderings:
962 let dependency_map = self.construct_dependency_map(&mapping);
963 let orderings = mapping.iter().flat_map(|(source, target)| {
964 referred_dependencies(&dependency_map, source)
965 .into_iter()
966 .filter_map(|relevant_deps| {
967 if let Ok(SortProperties::Ordered(options)) =
968 get_expr_properties(source, &relevant_deps, &self.schema)
969 .map(|prop| prop.sort_properties)
970 {
971 Some((options, relevant_deps))
972 } else {
973 // Do not consider unordered cases
974 None
975 }
976 })
977 .flat_map(|(options, relevant_deps)| {
978 let sort_expr = PhysicalSortExpr {
979 expr: Arc::clone(target),
980 options,
981 };
982 // Generate dependent orderings (i.e. prefixes for `sort_expr`):
983 let mut dependency_orderings =
984 generate_dependency_orderings(&relevant_deps, &dependency_map);
985 // Append `sort_expr` to the dependent orderings:
986 for ordering in dependency_orderings.iter_mut() {
987 ordering.push(sort_expr.clone());
988 }
989 dependency_orderings
990 })
991 });
992
993 // Add valid projected orderings. For example, if existing ordering is
994 // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
995 // preserve `a_new + b_new` as ordered. Please note that `a_new` and
996 // `b_new` themselves need not be ordered. Such dependencies cannot be
997 // deduced via the pass above.
998 let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, node)| {
999 let mut prefixes = construct_prefix_orderings(sort_expr, &dependency_map);
1000 if prefixes.is_empty() {
1001 // If prefix is empty, there is no dependency. Insert
1002 // empty ordering:
1003 prefixes = vec![LexOrdering::default()];
1004 }
1005 // Append current ordering on top its dependencies:
1006 for ordering in prefixes.iter_mut() {
1007 if let Some(target) = &node.target_sort_expr {
1008 ordering.push(target.clone())
1009 }
1010 }
1011 prefixes
1012 });
1013
1014 // Simplify each ordering by removing redundant sections:
1015 orderings
1016 .chain(projected_orderings)
1017 .map(|lex_ordering| lex_ordering.collapse())
1018 .collect()
1019 }
1020
1021 /// Projects constants based on the provided `ProjectionMapping`.
1022 ///
1023 /// This function takes a `ProjectionMapping` and identifies/projects
1024 /// constants based on the existing constants and the mapping. It ensures
1025 /// that constants are appropriately propagated through the projection.
1026 ///
1027 /// # Parameters
1028 ///
1029 /// - `mapping`: A reference to a `ProjectionMapping` representing the
1030 /// mapping of source expressions to target expressions in the projection.
1031 ///
1032 /// # Returns
1033 ///
1034 /// Returns a `Vec<Arc<dyn PhysicalExpr>>` containing the projected constants.
1035 fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> {
1036 // First, project existing constants. For example, assume that `a + b`
1037 // is known to be constant. If the projection were `a as a_new`, `b as b_new`,
1038 // then we would project constant `a + b` as `a_new + b_new`.
1039 let mut projected_constants = self
1040 .constants
1041 .iter()
1042 .flat_map(|const_expr| {
1043 const_expr
1044 .map(|expr| self.eq_group.project_expr(mapping, expr))
1045 .map(|projected_expr| {
1046 projected_expr
1047 .with_across_partitions(const_expr.across_partitions())
1048 })
1049 })
1050 .collect::<Vec<_>>();
1051
1052 // Add projection expressions that are known to be constant:
1053 for (source, target) in mapping.iter() {
1054 if self.is_expr_constant(source)
1055 && !const_exprs_contains(&projected_constants, target)
1056 {
1057 if self.is_expr_constant_across_partitions(source) {
1058 projected_constants.push(
1059 ConstExpr::from(target)
1060 .with_across_partitions(self.get_expr_constant_value(source)),
1061 )
1062 } else {
1063 projected_constants.push(
1064 ConstExpr::from(target)
1065 .with_across_partitions(AcrossPartitions::Heterogeneous),
1066 )
1067 }
1068 }
1069 }
1070 projected_constants
1071 }
1072
1073 /// Projects constraints according to the given projection mapping.
1074 ///
1075 /// This function takes a projection mapping and extracts the column indices of the target columns.
1076 /// It then projects the constraints to only include relationships between
1077 /// columns that exist in the projected output.
1078 ///
1079 /// # Arguments
1080 ///
1081 /// * `mapping` - A reference to `ProjectionMapping` that defines how expressions are mapped
1082 /// in the projection operation
1083 ///
1084 /// # Returns
1085 ///
1086 /// Returns a new `Constraints` object containing only the constraints
1087 /// that are valid for the projected columns.
1088 fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> {
1089 let indices = mapping
1090 .iter()
1091 .filter_map(|(_, target)| target.as_any().downcast_ref::<Column>())
1092 .map(|col| col.index())
1093 .collect::<Vec<_>>();
1094 debug_assert_eq!(mapping.map.len(), indices.len());
1095 self.constraints.project(&indices)
1096 }
1097
1098 /// Projects the equivalences within according to `mapping`
1099 /// and `output_schema`.
1100 pub fn project(&self, mapping: &ProjectionMapping, output_schema: SchemaRef) -> Self {
1101 let eq_group = self.eq_group.project(mapping);
1102 let oeq_class = OrderingEquivalenceClass::new(self.projected_orderings(mapping));
1103 let constants = self.projected_constants(mapping);
1104 let constraints = self
1105 .projected_constraints(mapping)
1106 .unwrap_or_else(Constraints::empty);
1107 Self {
1108 schema: output_schema,
1109 eq_group,
1110 oeq_class,
1111 constants,
1112 constraints,
1113 }
1114 }
1115
1116 /// Returns the longest (potentially partial) permutation satisfying the
1117 /// existing ordering. For example, if we have the equivalent orderings
1118 /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`,
1119 /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`.
1120 /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied
1121 /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0`
1122 /// inside the argument `exprs` (respectively). For the mathematical
1123 /// definition of "partial permutation", see:
1124 ///
1125 /// <https://en.wikipedia.org/wiki/Permutation#k-permutations_of_n>
1126 pub fn find_longest_permutation(
1127 &self,
1128 exprs: &[Arc<dyn PhysicalExpr>],
1129 ) -> (LexOrdering, Vec<usize>) {
1130 let mut eq_properties = self.clone();
1131 let mut result = vec![];
1132 // The algorithm is as follows:
1133 // - Iterate over all the expressions and insert ordered expressions
1134 // into the result.
1135 // - Treat inserted expressions as constants (i.e. add them as constants
1136 // to the state).
1137 // - Continue the above procedure until no expression is inserted; i.e.
1138 // the algorithm reaches a fixed point.
1139 // This algorithm should reach a fixed point in at most `exprs.len()`
1140 // iterations.
1141 let mut search_indices = (0..exprs.len()).collect::<IndexSet<_>>();
1142 for _idx in 0..exprs.len() {
1143 // Get ordered expressions with their indices.
1144 let ordered_exprs = search_indices
1145 .iter()
1146 .flat_map(|&idx| {
1147 let ExprProperties {
1148 sort_properties, ..
1149 } = eq_properties.get_expr_properties(Arc::clone(&exprs[idx]));
1150 match sort_properties {
1151 SortProperties::Ordered(options) => Some((
1152 PhysicalSortExpr {
1153 expr: Arc::clone(&exprs[idx]),
1154 options,
1155 },
1156 idx,
1157 )),
1158 SortProperties::Singleton => {
1159 // Assign default ordering to constant expressions
1160 let options = SortOptions::default();
1161 Some((
1162 PhysicalSortExpr {
1163 expr: Arc::clone(&exprs[idx]),
1164 options,
1165 },
1166 idx,
1167 ))
1168 }
1169 SortProperties::Unordered => None,
1170 }
1171 })
1172 .collect::<Vec<_>>();
1173 // We reached a fixed point, exit.
1174 if ordered_exprs.is_empty() {
1175 break;
1176 }
1177 // Remove indices that have an ordering from `search_indices`, and
1178 // treat ordered expressions as constants in subsequent iterations.
1179 // We can do this because the "next" key only matters in a lexicographical
1180 // ordering when the keys to its left have the same values.
1181 //
1182 // Note that these expressions are not properly "constants". This is just
1183 // an implementation strategy confined to this function.
1184 for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
1185 eq_properties =
1186 eq_properties.with_constants(std::iter::once(ConstExpr::from(expr)));
1187 search_indices.shift_remove(idx);
1188 }
1189 // Add new ordered section to the state.
1190 result.extend(ordered_exprs);
1191 }
1192 let (left, right) = result.into_iter().unzip();
1193 (LexOrdering::new(left), right)
1194 }
1195
1196 /// This function determines whether the provided expression is constant
1197 /// based on the known constants.
1198 ///
1199 /// # Parameters
1200 ///
1201 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1202 /// expression to be checked.
1203 ///
1204 /// # Returns
1205 ///
1206 /// Returns `true` if the expression is constant according to equivalence
1207 /// group, `false` otherwise.
1208 pub fn is_expr_constant(&self, expr: &Arc<dyn PhysicalExpr>) -> bool {
1209 // As an example, assume that we know columns `a` and `b` are constant.
1210 // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1211 // return `false`.
1212 let const_exprs = self
1213 .constants
1214 .iter()
1215 .map(|const_expr| Arc::clone(const_expr.expr()));
1216 let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1217 let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1218 is_constant_recurse(&normalized_constants, &normalized_expr)
1219 }
1220
1221 /// This function determines whether the provided expression is constant
1222 /// across partitions based on the known constants.
1223 ///
1224 /// # Parameters
1225 ///
1226 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1227 /// expression to be checked.
1228 ///
1229 /// # Returns
1230 ///
1231 /// Returns `true` if the expression is constant across all partitions according
1232 /// to equivalence group, `false` otherwise
1233 #[deprecated(
1234 since = "45.0.0",
1235 note = "Use [`is_expr_constant_across_partitions`] instead"
1236 )]
1237 pub fn is_expr_constant_accross_partitions(
1238 &self,
1239 expr: &Arc<dyn PhysicalExpr>,
1240 ) -> bool {
1241 self.is_expr_constant_across_partitions(expr)
1242 }
1243
1244 /// This function determines whether the provided expression is constant
1245 /// across partitions based on the known constants.
1246 ///
1247 /// # Parameters
1248 ///
1249 /// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the
1250 /// expression to be checked.
1251 ///
1252 /// # Returns
1253 ///
1254 /// Returns `true` if the expression is constant across all partitions according
1255 /// to equivalence group, `false` otherwise.
1256 pub fn is_expr_constant_across_partitions(
1257 &self,
1258 expr: &Arc<dyn PhysicalExpr>,
1259 ) -> bool {
1260 // As an example, assume that we know columns `a` and `b` are constant.
1261 // Then, `a`, `b` and `a + b` will all return `true` whereas `c` will
1262 // return `false`.
1263 let const_exprs = self
1264 .constants
1265 .iter()
1266 .filter_map(|const_expr| {
1267 if matches!(
1268 const_expr.across_partitions(),
1269 AcrossPartitions::Uniform { .. }
1270 ) {
1271 Some(Arc::clone(const_expr.expr()))
1272 } else {
1273 None
1274 }
1275 })
1276 .collect::<Vec<_>>();
1277 let normalized_constants = self.eq_group.normalize_exprs(const_exprs);
1278 let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1279 is_constant_recurse(&normalized_constants, &normalized_expr)
1280 }
1281
1282 /// Retrieves the constant value of a given physical expression, if it exists.
1283 ///
1284 /// Normalizes the input expression and checks if it matches any known constants
1285 /// in the current context. Returns whether the expression has a uniform value,
1286 /// varies across partitions, or is not constant.
1287 ///
1288 /// # Parameters
1289 /// - `expr`: A reference to the physical expression to evaluate.
1290 ///
1291 /// # Returns
1292 /// - `AcrossPartitions::Uniform(value)`: If the expression has the same value across partitions.
1293 /// - `AcrossPartitions::Heterogeneous`: If the expression varies across partitions.
1294 /// - `None`: If the expression is not recognized as constant.
1295 pub fn get_expr_constant_value(
1296 &self,
1297 expr: &Arc<dyn PhysicalExpr>,
1298 ) -> AcrossPartitions {
1299 let normalized_expr = self.eq_group.normalize_expr(Arc::clone(expr));
1300
1301 if let Some(lit) = normalized_expr.as_any().downcast_ref::<Literal>() {
1302 return AcrossPartitions::Uniform(Some(lit.value().clone()));
1303 }
1304
1305 for const_expr in self.constants.iter() {
1306 if normalized_expr.eq(const_expr.expr()) {
1307 return const_expr.across_partitions();
1308 }
1309 }
1310
1311 AcrossPartitions::Heterogeneous
1312 }
1313
1314 /// Retrieves the properties for a given physical expression.
1315 ///
1316 /// This function constructs an [`ExprProperties`] object for the given
1317 /// expression, which encapsulates information about the expression's
1318 /// properties, including its [`SortProperties`] and [`Interval`].
1319 ///
1320 /// # Parameters
1321 ///
1322 /// - `expr`: An `Arc<dyn PhysicalExpr>` representing the physical expression
1323 /// for which ordering information is sought.
1324 ///
1325 /// # Returns
1326 ///
1327 /// Returns an [`ExprProperties`] object containing the ordering and range
1328 /// information for the given expression.
1329 pub fn get_expr_properties(&self, expr: Arc<dyn PhysicalExpr>) -> ExprProperties {
1330 ExprPropertiesNode::new_unknown(expr)
1331 .transform_up(|expr| update_properties(expr, self))
1332 .data()
1333 .map(|node| node.data)
1334 .unwrap_or(ExprProperties::new_unknown())
1335 }
1336
1337 /// Transforms this `EquivalenceProperties` into a new `EquivalenceProperties`
1338 /// by mapping columns in the original schema to columns in the new schema
1339 /// by index.
1340 pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
1341 // The new schema and the original schema is aligned when they have the
1342 // same number of columns, and fields at the same index have the same
1343 // type in both schemas.
1344 let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
1345 && self
1346 .schema
1347 .fields
1348 .iter()
1349 .zip(schema.fields.iter())
1350 .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
1351 if !schemas_aligned {
1352 // Rewriting equivalence properties in terms of new schema is not
1353 // safe when schemas are not aligned:
1354 return plan_err!(
1355 "Cannot rewrite old_schema:{:?} with new schema: {:?}",
1356 self.schema,
1357 schema
1358 );
1359 }
1360 // Rewrite constants according to new schema:
1361 let new_constants = self
1362 .constants
1363 .into_iter()
1364 .map(|const_expr| {
1365 let across_partitions = const_expr.across_partitions();
1366 let new_const_expr = with_new_schema(const_expr.owned_expr(), &schema)?;
1367 Ok(ConstExpr::new(new_const_expr)
1368 .with_across_partitions(across_partitions))
1369 })
1370 .collect::<Result<Vec<_>>>()?;
1371
1372 // Rewrite orderings according to new schema:
1373 let mut new_orderings = vec![];
1374 for ordering in self.oeq_class {
1375 let new_ordering = ordering
1376 .into_iter()
1377 .map(|mut sort_expr| {
1378 sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
1379 Ok(sort_expr)
1380 })
1381 .collect::<Result<_>>()?;
1382 new_orderings.push(new_ordering);
1383 }
1384
1385 // Rewrite equivalence classes according to the new schema:
1386 let mut eq_classes = vec![];
1387 for eq_class in self.eq_group {
1388 let new_eq_exprs = eq_class
1389 .into_vec()
1390 .into_iter()
1391 .map(|expr| with_new_schema(expr, &schema))
1392 .collect::<Result<_>>()?;
1393 eq_classes.push(EquivalenceClass::new(new_eq_exprs));
1394 }
1395
1396 // Construct the resulting equivalence properties:
1397 let mut result = EquivalenceProperties::new(schema);
1398 result.constants = new_constants;
1399 result.add_new_orderings(new_orderings);
1400 result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
1401
1402 Ok(result)
1403 }
1404}
1405
1406/// More readable display version of the `EquivalenceProperties`.
1407///
1408/// Format:
1409/// ```text
1410/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1]
1411/// ```
1412impl Display for EquivalenceProperties {
1413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1414 if self.eq_group.is_empty()
1415 && self.oeq_class.is_empty()
1416 && self.constants.is_empty()
1417 {
1418 return write!(f, "No properties");
1419 }
1420 if !self.oeq_class.is_empty() {
1421 write!(f, "order: {}", self.oeq_class)?;
1422 }
1423 if !self.eq_group.is_empty() {
1424 write!(f, ", eq: {}", self.eq_group)?;
1425 }
1426 if !self.constants.is_empty() {
1427 write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?;
1428 }
1429 Ok(())
1430 }
1431}
1432
1433/// Calculates the properties of a given [`ExprPropertiesNode`].
1434///
1435/// Order information can be retrieved as:
1436/// - If it is a leaf node, we directly find the order of the node by looking
1437/// at the given sort expression and equivalence properties if it is a `Column`
1438/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark
1439/// it as singleton so that it can cooperate with all ordered columns.
1440/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1441/// and operator has its own rules on how to propagate the children orderings.
1442/// However, before we engage in recursion, we check whether this intermediate
1443/// node directly matches with the sort expression. If there is a match, the
1444/// sort expression emerges at that node immediately, discarding the recursive
1445/// result coming from its children.
1446///
1447/// Range information is calculated as:
1448/// - If it is a `Literal` node, we set the range as a point value. If it is a
1449/// `Column` node, we set the datatype of the range, but cannot give an interval
1450/// for the range, yet.
1451/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr`
1452/// and operator has its own rules on how to propagate the children range.
1453fn update_properties(
1454 mut node: ExprPropertiesNode,
1455 eq_properties: &EquivalenceProperties,
1456) -> Result<Transformed<ExprPropertiesNode>> {
1457 // First, try to gather the information from the children:
1458 if !node.expr.children().is_empty() {
1459 // We have an intermediate (non-leaf) node, account for its children:
1460 let children_props = node.children.iter().map(|c| c.data.clone()).collect_vec();
1461 node.data = node.expr.get_properties(&children_props)?;
1462 } else if node.expr.as_any().is::<Literal>() {
1463 // We have a Literal, which is one of the two possible leaf node types:
1464 node.data = node.expr.get_properties(&[])?;
1465 } else if node.expr.as_any().is::<Column>() {
1466 // We have a Column, which is the other possible leaf node type:
1467 node.data.range =
1468 Interval::make_unbounded(&node.expr.data_type(eq_properties.schema())?)?
1469 }
1470 // Now, check what we know about orderings:
1471 let normalized_expr = eq_properties
1472 .eq_group
1473 .normalize_expr(Arc::clone(&node.expr));
1474 let oeq_class = eq_properties.normalized_oeq_class();
1475 if eq_properties.is_expr_constant(&normalized_expr)
1476 || oeq_class.is_expr_partial_const(&normalized_expr)
1477 {
1478 node.data.sort_properties = SortProperties::Singleton;
1479 } else if let Some(options) = oeq_class.get_options(&normalized_expr) {
1480 node.data.sort_properties = SortProperties::Ordered(options);
1481 }
1482 Ok(Transformed::yes(node))
1483}
1484
1485/// This function determines whether the provided expression is constant
1486/// based on the known constants.
1487///
1488/// # Parameters
1489///
1490/// - `constants`: A `&[Arc<dyn PhysicalExpr>]` containing expressions known to
1491/// be a constant.
1492/// - `expr`: A reference to a `Arc<dyn PhysicalExpr>` representing the expression
1493/// to check.
1494///
1495/// # Returns
1496///
1497/// Returns `true` if the expression is constant according to equivalence
1498/// group, `false` otherwise.
1499fn is_constant_recurse(
1500 constants: &[Arc<dyn PhysicalExpr>],
1501 expr: &Arc<dyn PhysicalExpr>,
1502) -> bool {
1503 if physical_exprs_contains(constants, expr) || expr.as_any().is::<Literal>() {
1504 return true;
1505 }
1506 let children = expr.children();
1507 !children.is_empty() && children.iter().all(|c| is_constant_recurse(constants, c))
1508}
1509
1510/// This function examines whether a referring expression directly refers to a
1511/// given referred expression or if any of its children in the expression tree
1512/// refer to the specified expression.
1513///
1514/// # Parameters
1515///
1516/// - `referring_expr`: A reference to the referring expression (`Arc<dyn PhysicalExpr>`).
1517/// - `referred_expr`: A reference to the referred expression (`Arc<dyn PhysicalExpr>`)
1518///
1519/// # Returns
1520///
1521/// A boolean value indicating whether `referring_expr` refers (needs it to evaluate its result)
1522/// `referred_expr` or not.
1523fn expr_refers(
1524 referring_expr: &Arc<dyn PhysicalExpr>,
1525 referred_expr: &Arc<dyn PhysicalExpr>,
1526) -> bool {
1527 referring_expr.eq(referred_expr)
1528 || referring_expr
1529 .children()
1530 .iter()
1531 .any(|child| expr_refers(child, referred_expr))
1532}
1533
1534/// This function examines the given expression and its properties to determine
1535/// the ordering properties of the expression. The range knowledge is not utilized
1536/// yet in the scope of this function.
1537///
1538/// # Parameters
1539///
1540/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) for
1541/// which ordering properties need to be determined.
1542/// - `dependencies`: A reference to `Dependencies`, containing sort expressions
1543/// referred to by `expr`.
1544/// - `schema``: A reference to the schema which the `expr` columns refer.
1545///
1546/// # Returns
1547///
1548/// A `SortProperties` indicating the ordering information of the given expression.
1549fn get_expr_properties(
1550 expr: &Arc<dyn PhysicalExpr>,
1551 dependencies: &Dependencies,
1552 schema: &SchemaRef,
1553) -> Result<ExprProperties> {
1554 if let Some(column_order) = dependencies.iter().find(|&order| expr.eq(&order.expr)) {
1555 // If exact match is found, return its ordering.
1556 Ok(ExprProperties {
1557 sort_properties: SortProperties::Ordered(column_order.options),
1558 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1559 preserves_lex_ordering: false,
1560 })
1561 } else if expr.as_any().downcast_ref::<Column>().is_some() {
1562 Ok(ExprProperties {
1563 sort_properties: SortProperties::Unordered,
1564 range: Interval::make_unbounded(&expr.data_type(schema)?)?,
1565 preserves_lex_ordering: false,
1566 })
1567 } else if let Some(literal) = expr.as_any().downcast_ref::<Literal>() {
1568 Ok(ExprProperties {
1569 sort_properties: SortProperties::Singleton,
1570 range: Interval::try_new(literal.value().clone(), literal.value().clone())?,
1571 preserves_lex_ordering: true,
1572 })
1573 } else {
1574 // Find orderings of its children
1575 let child_states = expr
1576 .children()
1577 .iter()
1578 .map(|child| get_expr_properties(child, dependencies, schema))
1579 .collect::<Result<Vec<_>>>()?;
1580 // Calculate expression ordering using ordering of its children.
1581 expr.get_properties(&child_states)
1582 }
1583}
1584
1585/// Wrapper struct for `Arc<dyn PhysicalExpr>` to use them as keys in a hash map.
1586#[derive(Debug, Clone)]
1587struct ExprWrapper(Arc<dyn PhysicalExpr>);
1588
1589impl PartialEq<Self> for ExprWrapper {
1590 fn eq(&self, other: &Self) -> bool {
1591 self.0.eq(&other.0)
1592 }
1593}
1594
1595impl Eq for ExprWrapper {}
1596
1597impl Hash for ExprWrapper {
1598 fn hash<H: Hasher>(&self, state: &mut H) {
1599 self.0.hash(state);
1600 }
1601}
1602
1603#[cfg(test)]
1604mod tests {
1605
1606 use super::*;
1607 use crate::expressions::{col, BinaryExpr};
1608
1609 use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1610 use datafusion_expr::Operator;
1611
1612 #[test]
1613 fn test_expr_consists_of_constants() -> Result<()> {
1614 let schema = Arc::new(Schema::new(vec![
1615 Field::new("a", DataType::Int32, true),
1616 Field::new("b", DataType::Int32, true),
1617 Field::new("c", DataType::Int32, true),
1618 Field::new("d", DataType::Int32, true),
1619 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1620 ]));
1621 let col_a = col("a", &schema)?;
1622 let col_b = col("b", &schema)?;
1623 let col_d = col("d", &schema)?;
1624 let b_plus_d = Arc::new(BinaryExpr::new(
1625 Arc::clone(&col_b),
1626 Operator::Plus,
1627 Arc::clone(&col_d),
1628 )) as Arc<dyn PhysicalExpr>;
1629
1630 let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b)];
1631 let expr = Arc::clone(&b_plus_d);
1632 assert!(!is_constant_recurse(&constants, &expr));
1633
1634 let constants = vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_d)];
1635 let expr = Arc::clone(&b_plus_d);
1636 assert!(is_constant_recurse(&constants, &expr));
1637 Ok(())
1638 }
1639}