datafusion_common/
functional_dependencies.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionalDependencies keeps track of functional dependencies
19//! inside DFSchema.
20
21use std::fmt::{Display, Formatter};
22use std::ops::Deref;
23use std::vec::IntoIter;
24
25use crate::utils::{merge_and_order_indices, set_difference};
26use crate::{DFSchema, HashSet, JoinType};
27
28/// This object defines a constraint on a table.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
30pub enum Constraint {
31    /// Columns with the given indices form a composite primary key (they are
32    /// jointly unique and not nullable):
33    PrimaryKey(Vec<usize>),
34    /// Columns with the given indices form a composite unique key:
35    Unique(Vec<usize>),
36}
37
38/// This object encapsulates a list of functional constraints:
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct Constraints {
41    inner: Vec<Constraint>,
42}
43
44impl Constraints {
45    /// Create empty constraints
46    pub fn empty() -> Self {
47        Constraints::new_unverified(vec![])
48    }
49
50    /// Create a new `Constraints` object from the given `constraints`.
51    /// Users should use the `empty` or `new_from_table_constraints` functions
52    /// for constructing `Constraints`. This constructor is for internal
53    /// purposes only and does not check whether the argument is valid. The user
54    /// is responsible for supplying a valid vector of `Constraint` objects.
55    pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
56        Self { inner: constraints }
57    }
58
59    /// Check whether constraints is empty
60    pub fn is_empty(&self) -> bool {
61        self.inner.is_empty()
62    }
63
64    /// Projects constraints using the given projection indices.
65    /// Returns None if any of the constraint columns are not included in the projection.
66    pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
67        let projected = self
68            .inner
69            .iter()
70            .filter_map(|constraint| {
71                match constraint {
72                    Constraint::PrimaryKey(indices) => {
73                        let new_indices =
74                            update_elements_with_matching_indices(indices, proj_indices);
75                        // Only keep constraint if all columns are preserved
76                        (new_indices.len() == indices.len())
77                            .then_some(Constraint::PrimaryKey(new_indices))
78                    }
79                    Constraint::Unique(indices) => {
80                        let new_indices =
81                            update_elements_with_matching_indices(indices, proj_indices);
82                        // Only keep constraint if all columns are preserved
83                        (new_indices.len() == indices.len())
84                            .then_some(Constraint::Unique(new_indices))
85                    }
86                }
87            })
88            .collect::<Vec<_>>();
89
90        (!projected.is_empty()).then_some(Constraints::new_unverified(projected))
91    }
92}
93
94impl Default for Constraints {
95    fn default() -> Self {
96        Constraints::empty()
97    }
98}
99
100impl IntoIterator for Constraints {
101    type Item = Constraint;
102    type IntoIter = IntoIter<Constraint>;
103
104    fn into_iter(self) -> Self::IntoIter {
105        self.inner.into_iter()
106    }
107}
108
109impl Display for Constraints {
110    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111        let pk = self
112            .inner
113            .iter()
114            .map(|c| format!("{:?}", c))
115            .collect::<Vec<_>>();
116        let pk = pk.join(", ");
117        write!(f, "constraints=[{pk}]")
118    }
119}
120
121impl Deref for Constraints {
122    type Target = [Constraint];
123
124    fn deref(&self) -> &Self::Target {
125        self.inner.as_slice()
126    }
127}
128
129/// This object defines a functional dependence in the schema. A functional
130/// dependence defines a relationship between determinant keys and dependent
131/// columns. A determinant key is a column, or a set of columns, whose value
132/// uniquely determines values of some other (dependent) columns. If two rows
133/// have the same determinant key, dependent columns in these rows are
134/// necessarily the same. If the determinant key is unique, the set of
135/// dependent columns is equal to the entire schema and the determinant key can
136/// serve as a primary key. Note that a primary key may "downgrade" into a
137/// determinant key due to an operation such as a join, and this object is
138/// used to track dependence relationships in such cases. For more information
139/// on functional dependencies, see:
140/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct FunctionalDependence {
143    // Column indices of the (possibly composite) determinant key:
144    pub source_indices: Vec<usize>,
145    // Column indices of dependent column(s):
146    pub target_indices: Vec<usize>,
147    /// Flag indicating whether one of the `source_indices` can receive NULL values.
148    /// For a data source, if the constraint in question is `Constraint::Unique`,
149    /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
150    /// this flag is `false`.
151    /// Note that as the schema changes between different stages in a plan,
152    /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
153    /// change.
154    pub nullable: bool,
155    // The functional dependency mode:
156    pub mode: Dependency,
157}
158
159/// Describes functional dependency mode.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum Dependency {
162    Single, // A determinant key may occur only once.
163    Multi,  // A determinant key may occur multiple times (in multiple rows).
164}
165
166impl FunctionalDependence {
167    // Creates a new functional dependence.
168    pub fn new(
169        source_indices: Vec<usize>,
170        target_indices: Vec<usize>,
171        nullable: bool,
172    ) -> Self {
173        Self {
174            source_indices,
175            target_indices,
176            nullable,
177            // Start with the least restrictive mode by default:
178            mode: Dependency::Multi,
179        }
180    }
181
182    pub fn with_mode(mut self, mode: Dependency) -> Self {
183        self.mode = mode;
184        self
185    }
186}
187
188/// This object encapsulates all functional dependencies in a given relation.
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub struct FunctionalDependencies {
191    deps: Vec<FunctionalDependence>,
192}
193
194impl FunctionalDependencies {
195    /// Creates an empty `FunctionalDependencies` object.
196    pub fn empty() -> Self {
197        Self { deps: vec![] }
198    }
199
200    /// Creates a new `FunctionalDependencies` object from a vector of
201    /// `FunctionalDependence` objects.
202    pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
203        Self { deps: dependencies }
204    }
205
206    /// Creates a new `FunctionalDependencies` object from the given constraints.
207    pub fn new_from_constraints(
208        constraints: Option<&Constraints>,
209        n_field: usize,
210    ) -> Self {
211        if let Some(Constraints { inner: constraints }) = constraints {
212            // Construct dependency objects based on each individual constraint:
213            let dependencies = constraints
214                .iter()
215                .map(|constraint| {
216                    // All the field indices are associated with the whole table
217                    // since we are dealing with table level constraints:
218                    let dependency = match constraint {
219                        Constraint::PrimaryKey(indices) => FunctionalDependence::new(
220                            indices.to_vec(),
221                            (0..n_field).collect::<Vec<_>>(),
222                            false,
223                        ),
224                        Constraint::Unique(indices) => FunctionalDependence::new(
225                            indices.to_vec(),
226                            (0..n_field).collect::<Vec<_>>(),
227                            true,
228                        ),
229                    };
230                    // As primary keys are guaranteed to be unique, set the
231                    // functional dependency mode to `Dependency::Single`:
232                    dependency.with_mode(Dependency::Single)
233                })
234                .collect::<Vec<_>>();
235            Self::new(dependencies)
236        } else {
237            // There is no constraint, return an empty object:
238            Self::empty()
239        }
240    }
241
242    pub fn with_dependency(mut self, mode: Dependency) -> Self {
243        self.deps.iter_mut().for_each(|item| item.mode = mode);
244        self
245    }
246
247    /// Merges the given functional dependencies with these.
248    pub fn extend(&mut self, other: FunctionalDependencies) {
249        self.deps.extend(other.deps);
250    }
251
252    /// Sanity checks if functional dependencies are valid. For example, if
253    /// there are 10 fields, we cannot receive any index further than 9.
254    pub fn is_valid(&self, n_field: usize) -> bool {
255        self.deps.iter().all(
256            |FunctionalDependence {
257                 source_indices,
258                 target_indices,
259                 ..
260             }| {
261                source_indices
262                    .iter()
263                    .max()
264                    .map(|&max_index| max_index < n_field)
265                    .unwrap_or(true)
266                    && target_indices
267                        .iter()
268                        .max()
269                        .map(|&max_index| max_index < n_field)
270                        .unwrap_or(true)
271            },
272        )
273    }
274
275    /// Adds the `offset` value to `source_indices` and `target_indices` for
276    /// each functional dependency.
277    pub fn add_offset(&mut self, offset: usize) {
278        self.deps.iter_mut().for_each(
279            |FunctionalDependence {
280                 source_indices,
281                 target_indices,
282                 ..
283             }| {
284                *source_indices = add_offset_to_vec(source_indices, offset);
285                *target_indices = add_offset_to_vec(target_indices, offset);
286            },
287        )
288    }
289
290    /// Updates `source_indices` and `target_indices` of each functional
291    /// dependence using the index mapping given in `proj_indices`.
292    ///
293    /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
294    /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
295    /// In the updated schema, fields at indices \[2, 5, 8\] will transform
296    /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
297    /// be \[1\] -> \[1, 2\].
298    pub fn project_functional_dependencies(
299        &self,
300        proj_indices: &[usize],
301        // The argument `n_out` denotes the schema field length, which is needed
302        // to correctly associate a `Single`-mode dependence with the whole table.
303        n_out: usize,
304    ) -> FunctionalDependencies {
305        let mut projected_func_dependencies = vec![];
306        for FunctionalDependence {
307            source_indices,
308            target_indices,
309            nullable,
310            mode,
311        } in &self.deps
312        {
313            let new_source_indices =
314                update_elements_with_matching_indices(source_indices, proj_indices);
315            let new_target_indices = if *mode == Dependency::Single {
316                // Associate with all of the fields in the schema:
317                (0..n_out).collect()
318            } else {
319                // Update associations according to projection:
320                update_elements_with_matching_indices(target_indices, proj_indices)
321            };
322            // All of the composite indices should still be valid after projection;
323            // otherwise, functional dependency cannot be propagated.
324            if new_source_indices.len() == source_indices.len() {
325                let new_func_dependence = FunctionalDependence::new(
326                    new_source_indices,
327                    new_target_indices,
328                    *nullable,
329                )
330                .with_mode(*mode);
331                projected_func_dependencies.push(new_func_dependence);
332            }
333        }
334        FunctionalDependencies::new(projected_func_dependencies)
335    }
336
337    /// This function joins this set of functional dependencies with the `other`
338    /// according to the given `join_type`.
339    pub fn join(
340        &self,
341        other: &FunctionalDependencies,
342        join_type: &JoinType,
343        left_cols_len: usize,
344    ) -> FunctionalDependencies {
345        // Get mutable copies of left and right side dependencies:
346        let mut right_func_dependencies = other.clone();
347        let mut left_func_dependencies = self.clone();
348
349        match join_type {
350            JoinType::Inner | JoinType::Left | JoinType::Right => {
351                // Add offset to right schema:
352                right_func_dependencies.add_offset(left_cols_len);
353
354                // Result may have multiple values, update the dependency mode:
355                left_func_dependencies =
356                    left_func_dependencies.with_dependency(Dependency::Multi);
357                right_func_dependencies =
358                    right_func_dependencies.with_dependency(Dependency::Multi);
359
360                if *join_type == JoinType::Left {
361                    // Downgrade the right side, since it may have additional NULL values:
362                    right_func_dependencies.downgrade_dependencies();
363                } else if *join_type == JoinType::Right {
364                    // Downgrade the left side, since it may have additional NULL values:
365                    left_func_dependencies.downgrade_dependencies();
366                }
367                // Combine left and right functional dependencies:
368                left_func_dependencies.extend(right_func_dependencies);
369                left_func_dependencies
370            }
371            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
372                // These joins preserve functional dependencies of the left side:
373                left_func_dependencies
374            }
375            JoinType::RightSemi | JoinType::RightAnti => {
376                // These joins preserve functional dependencies of the right side:
377                right_func_dependencies
378            }
379            JoinType::Full => {
380                // All of the functional dependencies are lost in a FULL join:
381                FunctionalDependencies::empty()
382            }
383        }
384    }
385
386    /// This function downgrades a functional dependency when nullability becomes
387    /// a possibility:
388    /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
389    ///   invalidates the dependency.
390    /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
391    ///   null value turns it into UNIQUE mode.
392    fn downgrade_dependencies(&mut self) {
393        // Delete nullable dependencies, since they are no longer valid:
394        self.deps.retain(|item| !item.nullable);
395        self.deps.iter_mut().for_each(|item| item.nullable = true);
396    }
397
398    /// This function ensures that functional dependencies involving uniquely
399    /// occurring determinant keys cover their entire table in terms of
400    /// dependent columns.
401    pub fn extend_target_indices(&mut self, n_out: usize) {
402        self.deps.iter_mut().for_each(
403            |FunctionalDependence {
404                 mode,
405                 target_indices,
406                 ..
407             }| {
408                // If unique, cover the whole table:
409                if *mode == Dependency::Single {
410                    *target_indices = (0..n_out).collect::<Vec<_>>();
411                }
412            },
413        )
414    }
415}
416
417impl Deref for FunctionalDependencies {
418    type Target = [FunctionalDependence];
419
420    fn deref(&self) -> &Self::Target {
421        self.deps.as_slice()
422    }
423}
424
425/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
426pub fn aggregate_functional_dependencies(
427    aggr_input_schema: &DFSchema,
428    group_by_expr_names: &[String],
429    aggr_schema: &DFSchema,
430) -> FunctionalDependencies {
431    let mut aggregate_func_dependencies = vec![];
432    let aggr_input_fields = aggr_input_schema.field_names();
433    let aggr_fields = aggr_schema.fields();
434    // Association covers the whole table:
435    let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
436    // Get functional dependencies of the schema:
437    let func_dependencies = aggr_input_schema.functional_dependencies();
438    for FunctionalDependence {
439        source_indices,
440        nullable,
441        mode,
442        ..
443    } in &func_dependencies.deps
444    {
445        // Keep source indices in a `HashSet` to prevent duplicate entries:
446        let mut new_source_indices = vec![];
447        let mut new_source_field_names = vec![];
448        let source_field_names = source_indices
449            .iter()
450            .map(|&idx| &aggr_input_fields[idx])
451            .collect::<Vec<_>>();
452
453        for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
454            // When one of the input determinant expressions matches with
455            // the GROUP BY expression, add the index of the GROUP BY
456            // expression as a new determinant key:
457            if source_field_names.contains(&group_by_expr_name) {
458                new_source_indices.push(idx);
459                new_source_field_names.push(group_by_expr_name.clone());
460            }
461        }
462        let existing_target_indices =
463            get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
464        let new_target_indices = get_target_functional_dependencies(
465            aggr_input_schema,
466            &new_source_field_names,
467        );
468        let mode = if existing_target_indices == new_target_indices
469            && new_target_indices.is_some()
470        {
471            // If dependency covers all GROUP BY expressions, mode will be `Single`:
472            Dependency::Single
473        } else {
474            // Otherwise, existing mode is preserved:
475            *mode
476        };
477        // All of the composite indices occur in the GROUP BY expression:
478        if new_source_indices.len() == source_indices.len() {
479            aggregate_func_dependencies.push(
480                FunctionalDependence::new(
481                    new_source_indices,
482                    target_indices.clone(),
483                    *nullable,
484                )
485                .with_mode(mode),
486            );
487        }
488    }
489
490    // When we have a GROUP BY key, we can guarantee uniqueness after
491    // aggregation:
492    if !group_by_expr_names.is_empty() {
493        let count = group_by_expr_names.len();
494        let source_indices = (0..count).collect::<Vec<_>>();
495        let nullable = source_indices
496            .iter()
497            .any(|idx| aggr_fields[*idx].is_nullable());
498        // If GROUP BY expressions do not already act as a determinant:
499        if !aggregate_func_dependencies.iter().any(|item| {
500            // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
501            // them since `item.source_indices` defines this relation already.
502
503            // The following simple comparison is working well because
504            // GROUP BY expressions come here as a prefix.
505            item.source_indices.iter().all(|idx| idx < &count)
506        }) {
507            // Add a new functional dependency associated with the whole table:
508            // Use nullable property of the GROUP BY expression:
509            aggregate_func_dependencies.push(
510                // Use nullable property of the GROUP BY expression:
511                FunctionalDependence::new(source_indices, target_indices, nullable)
512                    .with_mode(Dependency::Single),
513            );
514        }
515    }
516    FunctionalDependencies::new(aggregate_func_dependencies)
517}
518
519/// Returns target indices, for the determinant keys that are inside
520/// group by expressions.
521pub fn get_target_functional_dependencies(
522    schema: &DFSchema,
523    group_by_expr_names: &[String],
524) -> Option<Vec<usize>> {
525    let mut combined_target_indices = HashSet::new();
526    let dependencies = schema.functional_dependencies();
527    let field_names = schema.field_names();
528    for FunctionalDependence {
529        source_indices,
530        target_indices,
531        ..
532    } in &dependencies.deps
533    {
534        let source_key_names = source_indices
535            .iter()
536            .map(|id_key_idx| &field_names[*id_key_idx])
537            .collect::<Vec<_>>();
538        // If the GROUP BY expression contains a determinant key, we can use
539        // the associated fields after aggregation even if they are not part
540        // of the GROUP BY expression.
541        if source_key_names
542            .iter()
543            .all(|source_key_name| group_by_expr_names.contains(source_key_name))
544        {
545            combined_target_indices.extend(target_indices.iter());
546        }
547    }
548    (!combined_target_indices.is_empty()).then_some({
549        let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
550        result.sort();
551        result
552    })
553}
554
555/// Returns indices for the minimal subset of GROUP BY expressions that are
556/// functionally equivalent to the original set of GROUP BY expressions.
557pub fn get_required_group_by_exprs_indices(
558    schema: &DFSchema,
559    group_by_expr_names: &[String],
560) -> Option<Vec<usize>> {
561    let dependencies = schema.functional_dependencies();
562    let field_names = schema.field_names();
563    let mut groupby_expr_indices = group_by_expr_names
564        .iter()
565        .map(|group_by_expr_name| {
566            field_names
567                .iter()
568                .position(|field_name| field_name == group_by_expr_name)
569        })
570        .collect::<Option<Vec<_>>>()?;
571
572    groupby_expr_indices.sort();
573    for FunctionalDependence {
574        source_indices,
575        target_indices,
576        ..
577    } in &dependencies.deps
578    {
579        if source_indices
580            .iter()
581            .all(|source_idx| groupby_expr_indices.contains(source_idx))
582        {
583            // If all source indices are among GROUP BY expression indices, we
584            // can remove target indices from GROUP BY expression indices and
585            // use source indices instead.
586            groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
587            groupby_expr_indices =
588                merge_and_order_indices(groupby_expr_indices, source_indices);
589        }
590    }
591    groupby_expr_indices
592        .iter()
593        .map(|idx| {
594            group_by_expr_names
595                .iter()
596                .position(|name| &field_names[*idx] == name)
597        })
598        .collect()
599}
600
601/// Updates entries inside the `entries` vector with their corresponding
602/// indices inside the `proj_indices` vector.
603fn update_elements_with_matching_indices(
604    entries: &[usize],
605    proj_indices: &[usize],
606) -> Vec<usize> {
607    entries
608        .iter()
609        .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
610        .collect()
611}
612
613/// Adds `offset` value to each entry inside `in_data`.
614fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
615    in_data: &[T],
616    offset: T,
617) -> Vec<T> {
618    in_data.iter().map(|&item| item + offset).collect()
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624
625    #[test]
626    fn constraints_iter() {
627        let constraints = Constraints::new_unverified(vec![
628            Constraint::PrimaryKey(vec![10]),
629            Constraint::Unique(vec![20]),
630        ]);
631        let mut iter = constraints.iter();
632        assert_eq!(iter.next(), Some(&Constraint::PrimaryKey(vec![10])));
633        assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
634        assert_eq!(iter.next(), None);
635    }
636
637    #[test]
638    fn test_project_constraints() {
639        let constraints = Constraints::new_unverified(vec![
640            Constraint::PrimaryKey(vec![1, 2]),
641            Constraint::Unique(vec![0, 3]),
642        ]);
643
644        // Project keeping columns 1,2,3
645        let projected = constraints.project(&[1, 2, 3]).unwrap();
646        assert_eq!(
647            projected,
648            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
649        );
650
651        // Project keeping only column 0 - should return None as no constraints are preserved
652        assert!(constraints.project(&[0]).is_none());
653    }
654
655    #[test]
656    fn test_get_updated_id_keys() {
657        let fund_dependencies =
658            FunctionalDependencies::new(vec![FunctionalDependence::new(
659                vec![1],
660                vec![0, 1, 2],
661                true,
662            )]);
663        let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
664        let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
665            vec![0],
666            vec![0, 1],
667            true,
668        )]);
669        assert_eq!(res, expected);
670    }
671}