datafusion_common/functional_dependencies.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionalDependencies keeps track of functional dependencies
19//! inside DFSchema.
20
21use std::fmt::{Display, Formatter};
22use std::ops::Deref;
23use std::vec::IntoIter;
24
25use crate::utils::{merge_and_order_indices, set_difference};
26use crate::{DFSchema, HashSet, JoinType};
27
28/// This object defines a constraint on a table.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
30pub enum Constraint {
31 /// Columns with the given indices form a composite primary key (they are
32 /// jointly unique and not nullable):
33 PrimaryKey(Vec<usize>),
34 /// Columns with the given indices form a composite unique key:
35 Unique(Vec<usize>),
36}
37
38/// This object encapsulates a list of functional constraints:
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct Constraints {
41 inner: Vec<Constraint>,
42}
43
44impl Constraints {
45 /// Create empty constraints
46 pub fn empty() -> Self {
47 Constraints::new_unverified(vec![])
48 }
49
50 /// Create a new `Constraints` object from the given `constraints`.
51 /// Users should use the `empty` or `new_from_table_constraints` functions
52 /// for constructing `Constraints`. This constructor is for internal
53 /// purposes only and does not check whether the argument is valid. The user
54 /// is responsible for supplying a valid vector of `Constraint` objects.
55 pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
56 Self { inner: constraints }
57 }
58
59 /// Check whether constraints is empty
60 pub fn is_empty(&self) -> bool {
61 self.inner.is_empty()
62 }
63
64 /// Projects constraints using the given projection indices.
65 /// Returns None if any of the constraint columns are not included in the projection.
66 pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
67 let projected = self
68 .inner
69 .iter()
70 .filter_map(|constraint| {
71 match constraint {
72 Constraint::PrimaryKey(indices) => {
73 let new_indices =
74 update_elements_with_matching_indices(indices, proj_indices);
75 // Only keep constraint if all columns are preserved
76 (new_indices.len() == indices.len())
77 .then_some(Constraint::PrimaryKey(new_indices))
78 }
79 Constraint::Unique(indices) => {
80 let new_indices =
81 update_elements_with_matching_indices(indices, proj_indices);
82 // Only keep constraint if all columns are preserved
83 (new_indices.len() == indices.len())
84 .then_some(Constraint::Unique(new_indices))
85 }
86 }
87 })
88 .collect::<Vec<_>>();
89
90 (!projected.is_empty()).then_some(Constraints::new_unverified(projected))
91 }
92}
93
94impl Default for Constraints {
95 fn default() -> Self {
96 Constraints::empty()
97 }
98}
99
100impl IntoIterator for Constraints {
101 type Item = Constraint;
102 type IntoIter = IntoIter<Constraint>;
103
104 fn into_iter(self) -> Self::IntoIter {
105 self.inner.into_iter()
106 }
107}
108
109impl Display for Constraints {
110 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
111 let pk = self
112 .inner
113 .iter()
114 .map(|c| format!("{:?}", c))
115 .collect::<Vec<_>>();
116 let pk = pk.join(", ");
117 write!(f, "constraints=[{pk}]")
118 }
119}
120
121impl Deref for Constraints {
122 type Target = [Constraint];
123
124 fn deref(&self) -> &Self::Target {
125 self.inner.as_slice()
126 }
127}
128
129/// This object defines a functional dependence in the schema. A functional
130/// dependence defines a relationship between determinant keys and dependent
131/// columns. A determinant key is a column, or a set of columns, whose value
132/// uniquely determines values of some other (dependent) columns. If two rows
133/// have the same determinant key, dependent columns in these rows are
134/// necessarily the same. If the determinant key is unique, the set of
135/// dependent columns is equal to the entire schema and the determinant key can
136/// serve as a primary key. Note that a primary key may "downgrade" into a
137/// determinant key due to an operation such as a join, and this object is
138/// used to track dependence relationships in such cases. For more information
139/// on functional dependencies, see:
140/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct FunctionalDependence {
143 // Column indices of the (possibly composite) determinant key:
144 pub source_indices: Vec<usize>,
145 // Column indices of dependent column(s):
146 pub target_indices: Vec<usize>,
147 /// Flag indicating whether one of the `source_indices` can receive NULL values.
148 /// For a data source, if the constraint in question is `Constraint::Unique`,
149 /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
150 /// this flag is `false`.
151 /// Note that as the schema changes between different stages in a plan,
152 /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
153 /// change.
154 pub nullable: bool,
155 // The functional dependency mode:
156 pub mode: Dependency,
157}
158
159/// Describes functional dependency mode.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum Dependency {
162 Single, // A determinant key may occur only once.
163 Multi, // A determinant key may occur multiple times (in multiple rows).
164}
165
166impl FunctionalDependence {
167 // Creates a new functional dependence.
168 pub fn new(
169 source_indices: Vec<usize>,
170 target_indices: Vec<usize>,
171 nullable: bool,
172 ) -> Self {
173 Self {
174 source_indices,
175 target_indices,
176 nullable,
177 // Start with the least restrictive mode by default:
178 mode: Dependency::Multi,
179 }
180 }
181
182 pub fn with_mode(mut self, mode: Dependency) -> Self {
183 self.mode = mode;
184 self
185 }
186}
187
188/// This object encapsulates all functional dependencies in a given relation.
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub struct FunctionalDependencies {
191 deps: Vec<FunctionalDependence>,
192}
193
194impl FunctionalDependencies {
195 /// Creates an empty `FunctionalDependencies` object.
196 pub fn empty() -> Self {
197 Self { deps: vec![] }
198 }
199
200 /// Creates a new `FunctionalDependencies` object from a vector of
201 /// `FunctionalDependence` objects.
202 pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
203 Self { deps: dependencies }
204 }
205
206 /// Creates a new `FunctionalDependencies` object from the given constraints.
207 pub fn new_from_constraints(
208 constraints: Option<&Constraints>,
209 n_field: usize,
210 ) -> Self {
211 if let Some(Constraints { inner: constraints }) = constraints {
212 // Construct dependency objects based on each individual constraint:
213 let dependencies = constraints
214 .iter()
215 .map(|constraint| {
216 // All the field indices are associated with the whole table
217 // since we are dealing with table level constraints:
218 let dependency = match constraint {
219 Constraint::PrimaryKey(indices) => FunctionalDependence::new(
220 indices.to_vec(),
221 (0..n_field).collect::<Vec<_>>(),
222 false,
223 ),
224 Constraint::Unique(indices) => FunctionalDependence::new(
225 indices.to_vec(),
226 (0..n_field).collect::<Vec<_>>(),
227 true,
228 ),
229 };
230 // As primary keys are guaranteed to be unique, set the
231 // functional dependency mode to `Dependency::Single`:
232 dependency.with_mode(Dependency::Single)
233 })
234 .collect::<Vec<_>>();
235 Self::new(dependencies)
236 } else {
237 // There is no constraint, return an empty object:
238 Self::empty()
239 }
240 }
241
242 pub fn with_dependency(mut self, mode: Dependency) -> Self {
243 self.deps.iter_mut().for_each(|item| item.mode = mode);
244 self
245 }
246
247 /// Merges the given functional dependencies with these.
248 pub fn extend(&mut self, other: FunctionalDependencies) {
249 self.deps.extend(other.deps);
250 }
251
252 /// Sanity checks if functional dependencies are valid. For example, if
253 /// there are 10 fields, we cannot receive any index further than 9.
254 pub fn is_valid(&self, n_field: usize) -> bool {
255 self.deps.iter().all(
256 |FunctionalDependence {
257 source_indices,
258 target_indices,
259 ..
260 }| {
261 source_indices
262 .iter()
263 .max()
264 .map(|&max_index| max_index < n_field)
265 .unwrap_or(true)
266 && target_indices
267 .iter()
268 .max()
269 .map(|&max_index| max_index < n_field)
270 .unwrap_or(true)
271 },
272 )
273 }
274
275 /// Adds the `offset` value to `source_indices` and `target_indices` for
276 /// each functional dependency.
277 pub fn add_offset(&mut self, offset: usize) {
278 self.deps.iter_mut().for_each(
279 |FunctionalDependence {
280 source_indices,
281 target_indices,
282 ..
283 }| {
284 *source_indices = add_offset_to_vec(source_indices, offset);
285 *target_indices = add_offset_to_vec(target_indices, offset);
286 },
287 )
288 }
289
290 /// Updates `source_indices` and `target_indices` of each functional
291 /// dependence using the index mapping given in `proj_indices`.
292 ///
293 /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
294 /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
295 /// In the updated schema, fields at indices \[2, 5, 8\] will transform
296 /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
297 /// be \[1\] -> \[1, 2\].
298 pub fn project_functional_dependencies(
299 &self,
300 proj_indices: &[usize],
301 // The argument `n_out` denotes the schema field length, which is needed
302 // to correctly associate a `Single`-mode dependence with the whole table.
303 n_out: usize,
304 ) -> FunctionalDependencies {
305 let mut projected_func_dependencies = vec![];
306 for FunctionalDependence {
307 source_indices,
308 target_indices,
309 nullable,
310 mode,
311 } in &self.deps
312 {
313 let new_source_indices =
314 update_elements_with_matching_indices(source_indices, proj_indices);
315 let new_target_indices = if *mode == Dependency::Single {
316 // Associate with all of the fields in the schema:
317 (0..n_out).collect()
318 } else {
319 // Update associations according to projection:
320 update_elements_with_matching_indices(target_indices, proj_indices)
321 };
322 // All of the composite indices should still be valid after projection;
323 // otherwise, functional dependency cannot be propagated.
324 if new_source_indices.len() == source_indices.len() {
325 let new_func_dependence = FunctionalDependence::new(
326 new_source_indices,
327 new_target_indices,
328 *nullable,
329 )
330 .with_mode(*mode);
331 projected_func_dependencies.push(new_func_dependence);
332 }
333 }
334 FunctionalDependencies::new(projected_func_dependencies)
335 }
336
337 /// This function joins this set of functional dependencies with the `other`
338 /// according to the given `join_type`.
339 pub fn join(
340 &self,
341 other: &FunctionalDependencies,
342 join_type: &JoinType,
343 left_cols_len: usize,
344 ) -> FunctionalDependencies {
345 // Get mutable copies of left and right side dependencies:
346 let mut right_func_dependencies = other.clone();
347 let mut left_func_dependencies = self.clone();
348
349 match join_type {
350 JoinType::Inner | JoinType::Left | JoinType::Right => {
351 // Add offset to right schema:
352 right_func_dependencies.add_offset(left_cols_len);
353
354 // Result may have multiple values, update the dependency mode:
355 left_func_dependencies =
356 left_func_dependencies.with_dependency(Dependency::Multi);
357 right_func_dependencies =
358 right_func_dependencies.with_dependency(Dependency::Multi);
359
360 if *join_type == JoinType::Left {
361 // Downgrade the right side, since it may have additional NULL values:
362 right_func_dependencies.downgrade_dependencies();
363 } else if *join_type == JoinType::Right {
364 // Downgrade the left side, since it may have additional NULL values:
365 left_func_dependencies.downgrade_dependencies();
366 }
367 // Combine left and right functional dependencies:
368 left_func_dependencies.extend(right_func_dependencies);
369 left_func_dependencies
370 }
371 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
372 // These joins preserve functional dependencies of the left side:
373 left_func_dependencies
374 }
375 JoinType::RightSemi | JoinType::RightAnti => {
376 // These joins preserve functional dependencies of the right side:
377 right_func_dependencies
378 }
379 JoinType::Full => {
380 // All of the functional dependencies are lost in a FULL join:
381 FunctionalDependencies::empty()
382 }
383 }
384 }
385
386 /// This function downgrades a functional dependency when nullability becomes
387 /// a possibility:
388 /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
389 /// invalidates the dependency.
390 /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
391 /// null value turns it into UNIQUE mode.
392 fn downgrade_dependencies(&mut self) {
393 // Delete nullable dependencies, since they are no longer valid:
394 self.deps.retain(|item| !item.nullable);
395 self.deps.iter_mut().for_each(|item| item.nullable = true);
396 }
397
398 /// This function ensures that functional dependencies involving uniquely
399 /// occurring determinant keys cover their entire table in terms of
400 /// dependent columns.
401 pub fn extend_target_indices(&mut self, n_out: usize) {
402 self.deps.iter_mut().for_each(
403 |FunctionalDependence {
404 mode,
405 target_indices,
406 ..
407 }| {
408 // If unique, cover the whole table:
409 if *mode == Dependency::Single {
410 *target_indices = (0..n_out).collect::<Vec<_>>();
411 }
412 },
413 )
414 }
415}
416
417impl Deref for FunctionalDependencies {
418 type Target = [FunctionalDependence];
419
420 fn deref(&self) -> &Self::Target {
421 self.deps.as_slice()
422 }
423}
424
425/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
426pub fn aggregate_functional_dependencies(
427 aggr_input_schema: &DFSchema,
428 group_by_expr_names: &[String],
429 aggr_schema: &DFSchema,
430) -> FunctionalDependencies {
431 let mut aggregate_func_dependencies = vec![];
432 let aggr_input_fields = aggr_input_schema.field_names();
433 let aggr_fields = aggr_schema.fields();
434 // Association covers the whole table:
435 let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
436 // Get functional dependencies of the schema:
437 let func_dependencies = aggr_input_schema.functional_dependencies();
438 for FunctionalDependence {
439 source_indices,
440 nullable,
441 mode,
442 ..
443 } in &func_dependencies.deps
444 {
445 // Keep source indices in a `HashSet` to prevent duplicate entries:
446 let mut new_source_indices = vec![];
447 let mut new_source_field_names = vec![];
448 let source_field_names = source_indices
449 .iter()
450 .map(|&idx| &aggr_input_fields[idx])
451 .collect::<Vec<_>>();
452
453 for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
454 // When one of the input determinant expressions matches with
455 // the GROUP BY expression, add the index of the GROUP BY
456 // expression as a new determinant key:
457 if source_field_names.contains(&group_by_expr_name) {
458 new_source_indices.push(idx);
459 new_source_field_names.push(group_by_expr_name.clone());
460 }
461 }
462 let existing_target_indices =
463 get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
464 let new_target_indices = get_target_functional_dependencies(
465 aggr_input_schema,
466 &new_source_field_names,
467 );
468 let mode = if existing_target_indices == new_target_indices
469 && new_target_indices.is_some()
470 {
471 // If dependency covers all GROUP BY expressions, mode will be `Single`:
472 Dependency::Single
473 } else {
474 // Otherwise, existing mode is preserved:
475 *mode
476 };
477 // All of the composite indices occur in the GROUP BY expression:
478 if new_source_indices.len() == source_indices.len() {
479 aggregate_func_dependencies.push(
480 FunctionalDependence::new(
481 new_source_indices,
482 target_indices.clone(),
483 *nullable,
484 )
485 .with_mode(mode),
486 );
487 }
488 }
489
490 // When we have a GROUP BY key, we can guarantee uniqueness after
491 // aggregation:
492 if !group_by_expr_names.is_empty() {
493 let count = group_by_expr_names.len();
494 let source_indices = (0..count).collect::<Vec<_>>();
495 let nullable = source_indices
496 .iter()
497 .any(|idx| aggr_fields[*idx].is_nullable());
498 // If GROUP BY expressions do not already act as a determinant:
499 if !aggregate_func_dependencies.iter().any(|item| {
500 // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
501 // them since `item.source_indices` defines this relation already.
502
503 // The following simple comparison is working well because
504 // GROUP BY expressions come here as a prefix.
505 item.source_indices.iter().all(|idx| idx < &count)
506 }) {
507 // Add a new functional dependency associated with the whole table:
508 // Use nullable property of the GROUP BY expression:
509 aggregate_func_dependencies.push(
510 // Use nullable property of the GROUP BY expression:
511 FunctionalDependence::new(source_indices, target_indices, nullable)
512 .with_mode(Dependency::Single),
513 );
514 }
515 }
516 FunctionalDependencies::new(aggregate_func_dependencies)
517}
518
519/// Returns target indices, for the determinant keys that are inside
520/// group by expressions.
521pub fn get_target_functional_dependencies(
522 schema: &DFSchema,
523 group_by_expr_names: &[String],
524) -> Option<Vec<usize>> {
525 let mut combined_target_indices = HashSet::new();
526 let dependencies = schema.functional_dependencies();
527 let field_names = schema.field_names();
528 for FunctionalDependence {
529 source_indices,
530 target_indices,
531 ..
532 } in &dependencies.deps
533 {
534 let source_key_names = source_indices
535 .iter()
536 .map(|id_key_idx| &field_names[*id_key_idx])
537 .collect::<Vec<_>>();
538 // If the GROUP BY expression contains a determinant key, we can use
539 // the associated fields after aggregation even if they are not part
540 // of the GROUP BY expression.
541 if source_key_names
542 .iter()
543 .all(|source_key_name| group_by_expr_names.contains(source_key_name))
544 {
545 combined_target_indices.extend(target_indices.iter());
546 }
547 }
548 (!combined_target_indices.is_empty()).then_some({
549 let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
550 result.sort();
551 result
552 })
553}
554
555/// Returns indices for the minimal subset of GROUP BY expressions that are
556/// functionally equivalent to the original set of GROUP BY expressions.
557pub fn get_required_group_by_exprs_indices(
558 schema: &DFSchema,
559 group_by_expr_names: &[String],
560) -> Option<Vec<usize>> {
561 let dependencies = schema.functional_dependencies();
562 let field_names = schema.field_names();
563 let mut groupby_expr_indices = group_by_expr_names
564 .iter()
565 .map(|group_by_expr_name| {
566 field_names
567 .iter()
568 .position(|field_name| field_name == group_by_expr_name)
569 })
570 .collect::<Option<Vec<_>>>()?;
571
572 groupby_expr_indices.sort();
573 for FunctionalDependence {
574 source_indices,
575 target_indices,
576 ..
577 } in &dependencies.deps
578 {
579 if source_indices
580 .iter()
581 .all(|source_idx| groupby_expr_indices.contains(source_idx))
582 {
583 // If all source indices are among GROUP BY expression indices, we
584 // can remove target indices from GROUP BY expression indices and
585 // use source indices instead.
586 groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
587 groupby_expr_indices =
588 merge_and_order_indices(groupby_expr_indices, source_indices);
589 }
590 }
591 groupby_expr_indices
592 .iter()
593 .map(|idx| {
594 group_by_expr_names
595 .iter()
596 .position(|name| &field_names[*idx] == name)
597 })
598 .collect()
599}
600
601/// Updates entries inside the `entries` vector with their corresponding
602/// indices inside the `proj_indices` vector.
603fn update_elements_with_matching_indices(
604 entries: &[usize],
605 proj_indices: &[usize],
606) -> Vec<usize> {
607 entries
608 .iter()
609 .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
610 .collect()
611}
612
613/// Adds `offset` value to each entry inside `in_data`.
614fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
615 in_data: &[T],
616 offset: T,
617) -> Vec<T> {
618 in_data.iter().map(|&item| item + offset).collect()
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624
625 #[test]
626 fn constraints_iter() {
627 let constraints = Constraints::new_unverified(vec![
628 Constraint::PrimaryKey(vec![10]),
629 Constraint::Unique(vec![20]),
630 ]);
631 let mut iter = constraints.iter();
632 assert_eq!(iter.next(), Some(&Constraint::PrimaryKey(vec![10])));
633 assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
634 assert_eq!(iter.next(), None);
635 }
636
637 #[test]
638 fn test_project_constraints() {
639 let constraints = Constraints::new_unverified(vec![
640 Constraint::PrimaryKey(vec![1, 2]),
641 Constraint::Unique(vec![0, 3]),
642 ]);
643
644 // Project keeping columns 1,2,3
645 let projected = constraints.project(&[1, 2, 3]).unwrap();
646 assert_eq!(
647 projected,
648 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
649 );
650
651 // Project keeping only column 0 - should return None as no constraints are preserved
652 assert!(constraints.project(&[0]).is_none());
653 }
654
655 #[test]
656 fn test_get_updated_id_keys() {
657 let fund_dependencies =
658 FunctionalDependencies::new(vec![FunctionalDependence::new(
659 vec![1],
660 vec![0, 1, 2],
661 true,
662 )]);
663 let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
664 let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
665 vec![0],
666 vec![0, 1],
667 true,
668 )]);
669 assert_eq!(res, expected);
670 }
671}