1use std::cmp::Ordering;
21use std::collections::{BTreeMap, HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::dml::CopyTo;
27use super::invariants::{
28 assert_always_invariants_at_current_node, assert_executable_invariants,
29 InvariantLevel,
30};
31use super::DdlStatement;
32use crate::builder::{change_redundant_column, unnest_with_options};
33use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
34use crate::expr_rewriter::{
35 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
36};
37use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
38use crate::logical_plan::extension::UserDefinedLogicalNode;
39use crate::logical_plan::{DmlStatement, Statement};
40use crate::utils::{
41 enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
42 find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist,
43 split_conjunction,
44};
45use crate::{
46 build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47 Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48 TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
49};
50
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::cse::{NormalizeEq, Normalizeable};
53use datafusion_common::tree_node::{
54 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
55};
56use datafusion_common::{
57 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
58 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
59 FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
60 UnnestOptions,
61};
62use indexmap::IndexSet;
63
64use crate::display::PgJsonVisitor;
66pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
67pub use datafusion_common::{JoinConstraint, JoinType};
68
69#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
203pub enum LogicalPlan {
204 Projection(Projection),
207 Filter(Filter),
216 Window(Window),
222 Aggregate(Aggregate),
228 Sort(Sort),
231 Join(Join),
234 Repartition(Repartition),
238 Union(Union),
242 TableScan(TableScan),
245 EmptyRelation(EmptyRelation),
249 Subquery(Subquery),
252 SubqueryAlias(SubqueryAlias),
254 Limit(Limit),
256 Statement(Statement),
258 Values(Values),
263 Explain(Explain),
266 Analyze(Analyze),
270 Extension(Extension),
273 Distinct(Distinct),
276 Dml(DmlStatement),
278 Ddl(DdlStatement),
280 Copy(CopyTo),
282 DescribeTable(DescribeTable),
285 Unnest(Unnest),
288 RecursiveQuery(RecursiveQuery),
290}
291
292impl Default for LogicalPlan {
293 fn default() -> Self {
294 LogicalPlan::EmptyRelation(EmptyRelation {
295 produce_one_row: false,
296 schema: Arc::new(DFSchema::empty()),
297 })
298 }
299}
300
301impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
302 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
303 &'a self,
304 mut f: F,
305 ) -> Result<TreeNodeRecursion> {
306 f(self)
307 }
308
309 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
310 self,
311 mut f: F,
312 ) -> Result<Transformed<Self>> {
313 f(self)
314 }
315}
316
317impl LogicalPlan {
318 pub fn schema(&self) -> &DFSchemaRef {
320 match self {
321 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
322 LogicalPlan::Values(Values { schema, .. }) => schema,
323 LogicalPlan::TableScan(TableScan {
324 projected_schema, ..
325 }) => projected_schema,
326 LogicalPlan::Projection(Projection { schema, .. }) => schema,
327 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
328 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
329 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
330 LogicalPlan::Window(Window { schema, .. }) => schema,
331 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
332 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
333 LogicalPlan::Join(Join { schema, .. }) => schema,
334 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
335 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
336 LogicalPlan::Statement(statement) => statement.schema(),
337 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
338 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
339 LogicalPlan::Explain(explain) => &explain.schema,
340 LogicalPlan::Analyze(analyze) => &analyze.schema,
341 LogicalPlan::Extension(extension) => extension.node.schema(),
342 LogicalPlan::Union(Union { schema, .. }) => schema,
343 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
344 output_schema
345 }
346 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347 LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348 LogicalPlan::Ddl(ddl) => ddl.schema(),
349 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
351 static_term.schema()
353 }
354 }
355 }
356
357 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
360 match self {
361 LogicalPlan::Window(_)
362 | LogicalPlan::Projection(_)
363 | LogicalPlan::Aggregate(_)
364 | LogicalPlan::Unnest(_)
365 | LogicalPlan::Join(_) => self
366 .inputs()
367 .iter()
368 .map(|input| input.schema().as_ref())
369 .collect(),
370 _ => vec![],
371 }
372 }
373
374 pub fn explain_schema() -> SchemaRef {
376 SchemaRef::new(Schema::new(vec![
377 Field::new("plan_type", DataType::Utf8, false),
378 Field::new("plan", DataType::Utf8, false),
379 ]))
380 }
381
382 pub fn describe_schema() -> Schema {
384 Schema::new(vec![
385 Field::new("column_name", DataType::Utf8, false),
386 Field::new("data_type", DataType::Utf8, false),
387 Field::new("is_nullable", DataType::Utf8, false),
388 ])
389 }
390
391 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
408 let mut exprs = vec![];
409 self.apply_expressions(|e| {
410 exprs.push(e.clone());
411 Ok(TreeNodeRecursion::Continue)
412 })
413 .unwrap();
415 exprs
416 }
417
418 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
421 let mut exprs = vec![];
422 self.apply_expressions(|e| {
423 find_out_reference_exprs(e).into_iter().for_each(|e| {
424 if !exprs.contains(&e) {
425 exprs.push(e)
426 }
427 });
428 Ok(TreeNodeRecursion::Continue)
429 })
430 .unwrap();
432 self.inputs()
433 .into_iter()
434 .flat_map(|child| child.all_out_ref_exprs())
435 .for_each(|e| {
436 if !exprs.contains(&e) {
437 exprs.push(e)
438 }
439 });
440 exprs
441 }
442
443 pub fn inputs(&self) -> Vec<&LogicalPlan> {
447 match self {
448 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
449 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
450 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
451 LogicalPlan::Window(Window { input, .. }) => vec![input],
452 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
453 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
454 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
455 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
456 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
457 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
458 LogicalPlan::Extension(extension) => extension.node.inputs(),
459 LogicalPlan::Union(Union { inputs, .. }) => {
460 inputs.iter().map(|arc| arc.as_ref()).collect()
461 }
462 LogicalPlan::Distinct(
463 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
464 ) => vec![input],
465 LogicalPlan::Explain(explain) => vec![&explain.plan],
466 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
467 LogicalPlan::Dml(write) => vec![&write.input],
468 LogicalPlan::Copy(copy) => vec![©.input],
469 LogicalPlan::Ddl(ddl) => ddl.inputs(),
470 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
471 LogicalPlan::RecursiveQuery(RecursiveQuery {
472 static_term,
473 recursive_term,
474 ..
475 }) => vec![static_term, recursive_term],
476 LogicalPlan::Statement(stmt) => stmt.inputs(),
477 LogicalPlan::TableScan { .. }
479 | LogicalPlan::EmptyRelation { .. }
480 | LogicalPlan::Values { .. }
481 | LogicalPlan::DescribeTable(_) => vec![],
482 }
483 }
484
485 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
487 let mut using_columns: Vec<HashSet<Column>> = vec![];
488
489 self.apply_with_subqueries(|plan| {
490 if let LogicalPlan::Join(Join {
491 join_constraint: JoinConstraint::Using,
492 on,
493 ..
494 }) = plan
495 {
496 let columns =
498 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
499 let Some(l) = l.get_as_join_column() else {
500 return internal_err!(
501 "Invalid join key. Expected column, found {l:?}"
502 );
503 };
504 let Some(r) = r.get_as_join_column() else {
505 return internal_err!(
506 "Invalid join key. Expected column, found {r:?}"
507 );
508 };
509 accumu.insert(l.to_owned());
510 accumu.insert(r.to_owned());
511 Result::<_, DataFusionError>::Ok(accumu)
512 })?;
513 using_columns.push(columns);
514 }
515 Ok(TreeNodeRecursion::Continue)
516 })?;
517
518 Ok(using_columns)
519 }
520
521 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
523 match self {
524 LogicalPlan::Projection(projection) => {
525 Ok(Some(projection.expr.as_slice()[0].clone()))
526 }
527 LogicalPlan::Aggregate(agg) => {
528 if agg.group_expr.is_empty() {
529 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
530 } else {
531 Ok(Some(agg.group_expr.as_slice()[0].clone()))
532 }
533 }
534 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
535 Ok(Some(select_expr[0].clone()))
536 }
537 LogicalPlan::Filter(Filter { input, .. })
538 | LogicalPlan::Distinct(Distinct::All(input))
539 | LogicalPlan::Sort(Sort { input, .. })
540 | LogicalPlan::Limit(Limit { input, .. })
541 | LogicalPlan::Repartition(Repartition { input, .. })
542 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
543 LogicalPlan::Join(Join {
544 left,
545 right,
546 join_type,
547 ..
548 }) => match join_type {
549 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
550 if left.schema().fields().is_empty() {
551 right.head_output_expr()
552 } else {
553 left.head_output_expr()
554 }
555 }
556 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
557 left.head_output_expr()
558 }
559 JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
560 },
561 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
562 static_term.head_output_expr()
563 }
564 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
565 union.schema.qualified_field(0),
566 )))),
567 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
568 table.projected_schema.qualified_field(0),
569 )))),
570 LogicalPlan::SubqueryAlias(subquery_alias) => {
571 let expr_opt = subquery_alias.input.head_output_expr()?;
572 expr_opt
573 .map(|expr| {
574 Ok(Expr::Column(create_col_from_scalar_expr(
575 &expr,
576 subquery_alias.alias.to_string(),
577 )?))
578 })
579 .map_or(Ok(None), |v| v.map(Some))
580 }
581 LogicalPlan::Subquery(_) => Ok(None),
582 LogicalPlan::EmptyRelation(_)
583 | LogicalPlan::Statement(_)
584 | LogicalPlan::Values(_)
585 | LogicalPlan::Explain(_)
586 | LogicalPlan::Analyze(_)
587 | LogicalPlan::Extension(_)
588 | LogicalPlan::Dml(_)
589 | LogicalPlan::Copy(_)
590 | LogicalPlan::Ddl(_)
591 | LogicalPlan::DescribeTable(_)
592 | LogicalPlan::Unnest(_) => Ok(None),
593 }
594 }
595
596 pub fn recompute_schema(self) -> Result<Self> {
619 match self {
620 LogicalPlan::Projection(Projection {
623 expr,
624 input,
625 schema: _,
626 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
627 LogicalPlan::Dml(_) => Ok(self),
628 LogicalPlan::Copy(_) => Ok(self),
629 LogicalPlan::Values(Values { schema, values }) => {
630 Ok(LogicalPlan::Values(Values { schema, values }))
632 }
633 LogicalPlan::Filter(Filter {
634 predicate,
635 input,
636 having,
637 }) => Filter::try_new_internal(predicate, input, having)
638 .map(LogicalPlan::Filter),
639 LogicalPlan::Repartition(_) => Ok(self),
640 LogicalPlan::Window(Window {
641 input,
642 window_expr,
643 schema: _,
644 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
645 LogicalPlan::Aggregate(Aggregate {
646 input,
647 group_expr,
648 aggr_expr,
649 schema: _,
650 }) => Aggregate::try_new(input, group_expr, aggr_expr)
651 .map(LogicalPlan::Aggregate),
652 LogicalPlan::Sort(_) => Ok(self),
653 LogicalPlan::Join(Join {
654 left,
655 right,
656 filter,
657 join_type,
658 join_constraint,
659 on,
660 schema: _,
661 null_equals_null,
662 }) => {
663 let schema =
664 build_join_schema(left.schema(), right.schema(), &join_type)?;
665
666 let new_on: Vec<_> = on
667 .into_iter()
668 .map(|equi_expr| {
669 (equi_expr.0.unalias(), equi_expr.1.unalias())
671 })
672 .collect();
673
674 Ok(LogicalPlan::Join(Join {
675 left,
676 right,
677 join_type,
678 join_constraint,
679 on: new_on,
680 filter,
681 schema: DFSchemaRef::new(schema),
682 null_equals_null,
683 }))
684 }
685 LogicalPlan::Subquery(_) => Ok(self),
686 LogicalPlan::SubqueryAlias(SubqueryAlias {
687 input,
688 alias,
689 schema: _,
690 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
691 LogicalPlan::Limit(_) => Ok(self),
692 LogicalPlan::Ddl(_) => Ok(self),
693 LogicalPlan::Extension(Extension { node }) => {
694 let expr = node.expressions();
697 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
698 Ok(LogicalPlan::Extension(Extension {
699 node: node.with_exprs_and_inputs(expr, inputs)?,
700 }))
701 }
702 LogicalPlan::Union(Union { inputs, schema }) => {
703 let first_input_schema = inputs[0].schema();
704 if schema.fields().len() == first_input_schema.fields().len() {
705 Ok(LogicalPlan::Union(Union { inputs, schema }))
707 } else {
708 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
716 }
717 }
718 LogicalPlan::Distinct(distinct) => {
719 let distinct = match distinct {
720 Distinct::All(input) => Distinct::All(input),
721 Distinct::On(DistinctOn {
722 on_expr,
723 select_expr,
724 sort_expr,
725 input,
726 schema: _,
727 }) => Distinct::On(DistinctOn::try_new(
728 on_expr,
729 select_expr,
730 sort_expr,
731 input,
732 )?),
733 };
734 Ok(LogicalPlan::Distinct(distinct))
735 }
736 LogicalPlan::RecursiveQuery(_) => Ok(self),
737 LogicalPlan::Analyze(_) => Ok(self),
738 LogicalPlan::Explain(_) => Ok(self),
739 LogicalPlan::TableScan(_) => Ok(self),
740 LogicalPlan::EmptyRelation(_) => Ok(self),
741 LogicalPlan::Statement(_) => Ok(self),
742 LogicalPlan::DescribeTable(_) => Ok(self),
743 LogicalPlan::Unnest(Unnest {
744 input,
745 exec_columns,
746 options,
747 ..
748 }) => {
749 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
751 }
752 }
753 }
754
755 pub fn with_new_exprs(
781 &self,
782 mut expr: Vec<Expr>,
783 inputs: Vec<LogicalPlan>,
784 ) -> Result<LogicalPlan> {
785 match self {
786 LogicalPlan::Projection(Projection { .. }) => {
789 let input = self.only_input(inputs)?;
790 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
791 }
792 LogicalPlan::Dml(DmlStatement {
793 table_name,
794 target,
795 op,
796 ..
797 }) => {
798 self.assert_no_expressions(expr)?;
799 let input = self.only_input(inputs)?;
800 Ok(LogicalPlan::Dml(DmlStatement::new(
801 table_name.clone(),
802 Arc::clone(target),
803 op.clone(),
804 Arc::new(input),
805 )))
806 }
807 LogicalPlan::Copy(CopyTo {
808 input: _,
809 output_url,
810 file_type,
811 options,
812 partition_by,
813 }) => {
814 self.assert_no_expressions(expr)?;
815 let input = self.only_input(inputs)?;
816 Ok(LogicalPlan::Copy(CopyTo {
817 input: Arc::new(input),
818 output_url: output_url.clone(),
819 file_type: Arc::clone(file_type),
820 options: options.clone(),
821 partition_by: partition_by.clone(),
822 }))
823 }
824 LogicalPlan::Values(Values { schema, .. }) => {
825 self.assert_no_inputs(inputs)?;
826 Ok(LogicalPlan::Values(Values {
827 schema: Arc::clone(schema),
828 values: expr
829 .chunks_exact(schema.fields().len())
830 .map(|s| s.to_vec())
831 .collect(),
832 }))
833 }
834 LogicalPlan::Filter { .. } => {
835 let predicate = self.only_expr(expr)?;
836 let input = self.only_input(inputs)?;
837
838 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
839 }
840 LogicalPlan::Repartition(Repartition {
841 partitioning_scheme,
842 ..
843 }) => match partitioning_scheme {
844 Partitioning::RoundRobinBatch(n) => {
845 self.assert_no_expressions(expr)?;
846 let input = self.only_input(inputs)?;
847 Ok(LogicalPlan::Repartition(Repartition {
848 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
849 input: Arc::new(input),
850 }))
851 }
852 Partitioning::Hash(_, n) => {
853 let input = self.only_input(inputs)?;
854 Ok(LogicalPlan::Repartition(Repartition {
855 partitioning_scheme: Partitioning::Hash(expr, *n),
856 input: Arc::new(input),
857 }))
858 }
859 Partitioning::DistributeBy(_) => {
860 let input = self.only_input(inputs)?;
861 Ok(LogicalPlan::Repartition(Repartition {
862 partitioning_scheme: Partitioning::DistributeBy(expr),
863 input: Arc::new(input),
864 }))
865 }
866 },
867 LogicalPlan::Window(Window { window_expr, .. }) => {
868 assert_eq!(window_expr.len(), expr.len());
869 let input = self.only_input(inputs)?;
870 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
871 }
872 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
873 let input = self.only_input(inputs)?;
874 let agg_expr = expr.split_off(group_expr.len());
876
877 Aggregate::try_new(Arc::new(input), expr, agg_expr)
878 .map(LogicalPlan::Aggregate)
879 }
880 LogicalPlan::Sort(Sort {
881 expr: sort_expr,
882 fetch,
883 ..
884 }) => {
885 let input = self.only_input(inputs)?;
886 Ok(LogicalPlan::Sort(Sort {
887 expr: expr
888 .into_iter()
889 .zip(sort_expr.iter())
890 .map(|(expr, sort)| sort.with_expr(expr))
891 .collect(),
892 input: Arc::new(input),
893 fetch: *fetch,
894 }))
895 }
896 LogicalPlan::Join(Join {
897 join_type,
898 join_constraint,
899 on,
900 null_equals_null,
901 ..
902 }) => {
903 let (left, right) = self.only_two_inputs(inputs)?;
904 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
905
906 let equi_expr_count = on.len();
907 assert!(expr.len() >= equi_expr_count);
908
909 let filter_expr = if expr.len() > equi_expr_count {
912 expr.pop()
913 } else {
914 None
915 };
916
917 assert_eq!(expr.len(), equi_expr_count);
920 let new_on = expr.into_iter().map(|equi_expr| {
921 let unalias_expr = equi_expr.clone().unalias();
923 if let Expr::BinaryExpr(BinaryExpr { left, op: Operator::Eq, right }) = unalias_expr {
924 Ok((*left, *right))
925 } else {
926 internal_err!(
927 "The front part expressions should be an binary equality expression, actual:{equi_expr}"
928 )
929 }
930 }).collect::<Result<Vec<(Expr, Expr)>>>()?;
931
932 Ok(LogicalPlan::Join(Join {
933 left: Arc::new(left),
934 right: Arc::new(right),
935 join_type: *join_type,
936 join_constraint: *join_constraint,
937 on: new_on,
938 filter: filter_expr,
939 schema: DFSchemaRef::new(schema),
940 null_equals_null: *null_equals_null,
941 }))
942 }
943 LogicalPlan::Subquery(Subquery {
944 outer_ref_columns, ..
945 }) => {
946 self.assert_no_expressions(expr)?;
947 let input = self.only_input(inputs)?;
948 let subquery = LogicalPlanBuilder::from(input).build()?;
949 Ok(LogicalPlan::Subquery(Subquery {
950 subquery: Arc::new(subquery),
951 outer_ref_columns: outer_ref_columns.clone(),
952 }))
953 }
954 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
955 self.assert_no_expressions(expr)?;
956 let input = self.only_input(inputs)?;
957 SubqueryAlias::try_new(Arc::new(input), alias.clone())
958 .map(LogicalPlan::SubqueryAlias)
959 }
960 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
961 let old_expr_len = skip.iter().chain(fetch.iter()).count();
962 if old_expr_len != expr.len() {
963 return internal_err!(
964 "Invalid number of new Limit expressions: expected {}, got {}",
965 old_expr_len,
966 expr.len()
967 );
968 }
969 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
971 let new_skip = skip.as_ref().and_then(|_| expr.pop());
972 let input = self.only_input(inputs)?;
973 Ok(LogicalPlan::Limit(Limit {
974 skip: new_skip.map(Box::new),
975 fetch: new_fetch.map(Box::new),
976 input: Arc::new(input),
977 }))
978 }
979 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
980 name,
981 if_not_exists,
982 or_replace,
983 column_defaults,
984 temporary,
985 ..
986 })) => {
987 self.assert_no_expressions(expr)?;
988 let input = self.only_input(inputs)?;
989 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
990 CreateMemoryTable {
991 input: Arc::new(input),
992 constraints: Constraints::empty(),
993 name: name.clone(),
994 if_not_exists: *if_not_exists,
995 or_replace: *or_replace,
996 column_defaults: column_defaults.clone(),
997 temporary: *temporary,
998 },
999 )))
1000 }
1001 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1002 name,
1003 or_replace,
1004 definition,
1005 temporary,
1006 ..
1007 })) => {
1008 self.assert_no_expressions(expr)?;
1009 let input = self.only_input(inputs)?;
1010 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1011 input: Arc::new(input),
1012 name: name.clone(),
1013 or_replace: *or_replace,
1014 temporary: *temporary,
1015 definition: definition.clone(),
1016 })))
1017 }
1018 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1019 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1020 })),
1021 LogicalPlan::Union(Union { schema, .. }) => {
1022 self.assert_no_expressions(expr)?;
1023 let input_schema = inputs[0].schema();
1024 let schema = if schema.fields().len() == input_schema.fields().len() {
1026 Arc::clone(schema)
1027 } else {
1028 Arc::clone(input_schema)
1029 };
1030 Ok(LogicalPlan::Union(Union {
1031 inputs: inputs.into_iter().map(Arc::new).collect(),
1032 schema,
1033 }))
1034 }
1035 LogicalPlan::Distinct(distinct) => {
1036 let distinct = match distinct {
1037 Distinct::All(_) => {
1038 self.assert_no_expressions(expr)?;
1039 let input = self.only_input(inputs)?;
1040 Distinct::All(Arc::new(input))
1041 }
1042 Distinct::On(DistinctOn {
1043 on_expr,
1044 select_expr,
1045 ..
1046 }) => {
1047 let input = self.only_input(inputs)?;
1048 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1049 let select_expr = expr.split_off(on_expr.len());
1050 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1051 Distinct::On(DistinctOn::try_new(
1052 expr,
1053 select_expr,
1054 None, Arc::new(input),
1056 )?)
1057 }
1058 };
1059 Ok(LogicalPlan::Distinct(distinct))
1060 }
1061 LogicalPlan::RecursiveQuery(RecursiveQuery {
1062 name, is_distinct, ..
1063 }) => {
1064 self.assert_no_expressions(expr)?;
1065 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1066 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1067 name: name.clone(),
1068 static_term: Arc::new(static_term),
1069 recursive_term: Arc::new(recursive_term),
1070 is_distinct: *is_distinct,
1071 }))
1072 }
1073 LogicalPlan::Analyze(a) => {
1074 self.assert_no_expressions(expr)?;
1075 let input = self.only_input(inputs)?;
1076 Ok(LogicalPlan::Analyze(Analyze {
1077 verbose: a.verbose,
1078 schema: Arc::clone(&a.schema),
1079 input: Arc::new(input),
1080 }))
1081 }
1082 LogicalPlan::Explain(e) => {
1083 self.assert_no_expressions(expr)?;
1084 let input = self.only_input(inputs)?;
1085 Ok(LogicalPlan::Explain(Explain {
1086 verbose: e.verbose,
1087 plan: Arc::new(input),
1088 stringified_plans: e.stringified_plans.clone(),
1089 schema: Arc::clone(&e.schema),
1090 logical_optimization_succeeded: e.logical_optimization_succeeded,
1091 }))
1092 }
1093 LogicalPlan::Statement(Statement::Prepare(Prepare {
1094 name,
1095 data_types,
1096 ..
1097 })) => {
1098 self.assert_no_expressions(expr)?;
1099 let input = self.only_input(inputs)?;
1100 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1101 name: name.clone(),
1102 data_types: data_types.clone(),
1103 input: Arc::new(input),
1104 })))
1105 }
1106 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1107 self.assert_no_inputs(inputs)?;
1108 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1109 name: name.clone(),
1110 parameters: expr,
1111 })))
1112 }
1113 LogicalPlan::TableScan(ts) => {
1114 self.assert_no_inputs(inputs)?;
1115 Ok(LogicalPlan::TableScan(TableScan {
1116 filters: expr,
1117 ..ts.clone()
1118 }))
1119 }
1120 LogicalPlan::EmptyRelation(_)
1121 | LogicalPlan::Ddl(_)
1122 | LogicalPlan::Statement(_)
1123 | LogicalPlan::DescribeTable(_) => {
1124 self.assert_no_expressions(expr)?;
1126 self.assert_no_inputs(inputs)?;
1127 Ok(self.clone())
1128 }
1129 LogicalPlan::Unnest(Unnest {
1130 exec_columns: columns,
1131 options,
1132 ..
1133 }) => {
1134 self.assert_no_expressions(expr)?;
1135 let input = self.only_input(inputs)?;
1136 let new_plan =
1138 unnest_with_options(input, columns.clone(), options.clone())?;
1139 Ok(new_plan)
1140 }
1141 }
1142 }
1143
1144 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1146 match check {
1147 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1148 InvariantLevel::Executable => assert_executable_invariants(self),
1149 }
1150 }
1151
1152 #[inline]
1154 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1156 if !expr.is_empty() {
1157 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1158 }
1159 Ok(())
1160 }
1161
1162 #[inline]
1164 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1166 if !inputs.is_empty() {
1167 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1168 }
1169 Ok(())
1170 }
1171
1172 #[inline]
1174 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1175 if expr.len() != 1 {
1176 return internal_err!(
1177 "{self:?} should have exactly one expr, got {:?}",
1178 expr
1179 );
1180 }
1181 Ok(expr.remove(0))
1182 }
1183
1184 #[inline]
1186 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1187 if inputs.len() != 1 {
1188 return internal_err!(
1189 "{self:?} should have exactly one input, got {:?}",
1190 inputs
1191 );
1192 }
1193 Ok(inputs.remove(0))
1194 }
1195
1196 #[inline]
1198 fn only_two_inputs(
1199 &self,
1200 mut inputs: Vec<LogicalPlan>,
1201 ) -> Result<(LogicalPlan, LogicalPlan)> {
1202 if inputs.len() != 2 {
1203 return internal_err!(
1204 "{self:?} should have exactly two inputs, got {:?}",
1205 inputs
1206 );
1207 }
1208 let right = inputs.remove(1);
1209 let left = inputs.remove(0);
1210 Ok((left, right))
1211 }
1212
1213 pub fn with_param_values(
1267 self,
1268 param_values: impl Into<ParamValues>,
1269 ) -> Result<LogicalPlan> {
1270 let param_values = param_values.into();
1271 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1272
1273 Ok(
1275 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1276 plan_with_values
1277 {
1278 param_values.verify(&prepare_lp.data_types)?;
1279 Arc::unwrap_or_clone(prepare_lp.input)
1281 } else {
1282 plan_with_values
1283 },
1284 )
1285 }
1286
1287 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1292 match self {
1293 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1294 LogicalPlan::Filter(filter) => {
1295 if filter.is_scalar() {
1296 Some(1)
1297 } else {
1298 filter.input.max_rows()
1299 }
1300 }
1301 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1302 LogicalPlan::Aggregate(Aggregate {
1303 input, group_expr, ..
1304 }) => {
1305 if group_expr
1307 .iter()
1308 .all(|expr| matches!(expr, Expr::Literal(_)))
1309 {
1310 Some(1)
1311 } else {
1312 input.max_rows()
1313 }
1314 }
1315 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1316 match (fetch, input.max_rows()) {
1317 (Some(fetch_limit), Some(input_max)) => {
1318 Some(input_max.min(*fetch_limit))
1319 }
1320 (Some(fetch_limit), None) => Some(*fetch_limit),
1321 (None, Some(input_max)) => Some(input_max),
1322 (None, None) => None,
1323 }
1324 }
1325 LogicalPlan::Join(Join {
1326 left,
1327 right,
1328 join_type,
1329 ..
1330 }) => match join_type {
1331 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1332 JoinType::Left | JoinType::Right | JoinType::Full => {
1333 match (left.max_rows()?, right.max_rows()?, join_type) {
1334 (0, 0, _) => Some(0),
1335 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1336 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1337 (left_max, right_max, _) => Some(left_max * right_max),
1338 }
1339 }
1340 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1341 left.max_rows()
1342 }
1343 JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
1344 },
1345 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1346 LogicalPlan::Union(Union { inputs, .. }) => {
1347 inputs.iter().try_fold(0usize, |mut acc, plan| {
1348 acc += plan.max_rows()?;
1349 Some(acc)
1350 })
1351 }
1352 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1353 LogicalPlan::EmptyRelation(_) => Some(0),
1354 LogicalPlan::RecursiveQuery(_) => None,
1355 LogicalPlan::Subquery(_) => None,
1356 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1357 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1358 Ok(FetchType::Literal(s)) => s,
1359 _ => None,
1360 },
1361 LogicalPlan::Distinct(
1362 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1363 ) => input.max_rows(),
1364 LogicalPlan::Values(v) => Some(v.values.len()),
1365 LogicalPlan::Unnest(_) => None,
1366 LogicalPlan::Ddl(_)
1367 | LogicalPlan::Explain(_)
1368 | LogicalPlan::Analyze(_)
1369 | LogicalPlan::Dml(_)
1370 | LogicalPlan::Copy(_)
1371 | LogicalPlan::DescribeTable(_)
1372 | LogicalPlan::Statement(_)
1373 | LogicalPlan::Extension(_) => None,
1374 }
1375 }
1376
1377 pub fn contains_outer_reference(&self) -> bool {
1379 let mut contains = false;
1380 self.apply_expressions(|expr| {
1381 Ok(if expr.contains_outer() {
1382 contains = true;
1383 TreeNodeRecursion::Stop
1384 } else {
1385 TreeNodeRecursion::Continue
1386 })
1387 })
1388 .unwrap();
1389 contains
1390 }
1391
1392 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1400 match self {
1401 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1402 .output_expressions()?
1403 .into_iter()
1404 .zip(self.schema().columns())
1405 .collect()),
1406 LogicalPlan::Window(Window {
1407 window_expr,
1408 input,
1409 schema,
1410 }) => {
1411 let mut output_exprs = input.columnized_output_exprs()?;
1419 let input_len = input.schema().fields().len();
1420 output_exprs.extend(
1421 window_expr
1422 .iter()
1423 .zip(schema.columns().into_iter().skip(input_len)),
1424 );
1425 Ok(output_exprs)
1426 }
1427 _ => Ok(vec![]),
1428 }
1429 }
1430}
1431
1432impl LogicalPlan {
1433 pub fn replace_params_with_values(
1440 self,
1441 param_values: &ParamValues,
1442 ) -> Result<LogicalPlan> {
1443 self.transform_up_with_subqueries(|plan| {
1444 let schema = Arc::clone(plan.schema());
1445 let name_preserver = NamePreserver::new(&plan);
1446 plan.map_expressions(|e| {
1447 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1448 if !has_placeholder {
1449 Ok(Transformed::no(e))
1453 } else {
1454 let original_name = name_preserver.save(&e);
1455 let transformed_expr = e.transform_up(|e| {
1456 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1457 let value = param_values.get_placeholders_with_values(&id)?;
1458 Ok(Transformed::yes(Expr::Literal(value)))
1459 } else {
1460 Ok(Transformed::no(e))
1461 }
1462 })?;
1463 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1465 }
1466 })
1467 })
1468 .map(|res| res.data)
1469 }
1470
1471 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1473 let mut param_names = HashSet::new();
1474 self.apply_with_subqueries(|plan| {
1475 plan.apply_expressions(|expr| {
1476 expr.apply(|expr| {
1477 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1478 param_names.insert(id.clone());
1479 }
1480 Ok(TreeNodeRecursion::Continue)
1481 })
1482 })
1483 })
1484 .map(|_| param_names)
1485 }
1486
1487 pub fn get_parameter_types(
1489 &self,
1490 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1491 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1492
1493 self.apply_with_subqueries(|plan| {
1494 plan.apply_expressions(|expr| {
1495 expr.apply(|expr| {
1496 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1497 let prev = param_types.get(id);
1498 match (prev, data_type) {
1499 (Some(Some(prev)), Some(dt)) => {
1500 if prev != dt {
1501 plan_err!("Conflicting types for {id}")?;
1502 }
1503 }
1504 (_, Some(dt)) => {
1505 param_types.insert(id.clone(), Some(dt.clone()));
1506 }
1507 _ => {
1508 param_types.insert(id.clone(), None);
1509 }
1510 }
1511 }
1512 Ok(TreeNodeRecursion::Continue)
1513 })
1514 })
1515 })
1516 .map(|_| param_types)
1517 }
1518
1519 pub fn display_indent(&self) -> impl Display + '_ {
1551 struct Wrapper<'a>(&'a LogicalPlan);
1554 impl Display for Wrapper<'_> {
1555 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1556 let with_schema = false;
1557 let mut visitor = IndentVisitor::new(f, with_schema);
1558 match self.0.visit_with_subqueries(&mut visitor) {
1559 Ok(_) => Ok(()),
1560 Err(_) => Err(fmt::Error),
1561 }
1562 }
1563 }
1564 Wrapper(self)
1565 }
1566
1567 pub fn display_indent_schema(&self) -> impl Display + '_ {
1594 struct Wrapper<'a>(&'a LogicalPlan);
1597 impl Display for Wrapper<'_> {
1598 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1599 let with_schema = true;
1600 let mut visitor = IndentVisitor::new(f, with_schema);
1601 match self.0.visit_with_subqueries(&mut visitor) {
1602 Ok(_) => Ok(()),
1603 Err(_) => Err(fmt::Error),
1604 }
1605 }
1606 }
1607 Wrapper(self)
1608 }
1609
1610 pub fn display_pg_json(&self) -> impl Display + '_ {
1614 struct Wrapper<'a>(&'a LogicalPlan);
1617 impl Display for Wrapper<'_> {
1618 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1619 let mut visitor = PgJsonVisitor::new(f);
1620 visitor.with_schema(true);
1621 match self.0.visit_with_subqueries(&mut visitor) {
1622 Ok(_) => Ok(()),
1623 Err(_) => Err(fmt::Error),
1624 }
1625 }
1626 }
1627 Wrapper(self)
1628 }
1629
1630 pub fn display_graphviz(&self) -> impl Display + '_ {
1660 struct Wrapper<'a>(&'a LogicalPlan);
1663 impl Display for Wrapper<'_> {
1664 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1665 let mut visitor = GraphvizVisitor::new(f);
1666
1667 visitor.start_graph()?;
1668
1669 visitor.pre_visit_plan("LogicalPlan")?;
1670 self.0
1671 .visit_with_subqueries(&mut visitor)
1672 .map_err(|_| fmt::Error)?;
1673 visitor.post_visit_plan()?;
1674
1675 visitor.set_with_schema(true);
1676 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1677 self.0
1678 .visit_with_subqueries(&mut visitor)
1679 .map_err(|_| fmt::Error)?;
1680 visitor.post_visit_plan()?;
1681
1682 visitor.end_graph()?;
1683 Ok(())
1684 }
1685 }
1686 Wrapper(self)
1687 }
1688
1689 pub fn display(&self) -> impl Display + '_ {
1711 struct Wrapper<'a>(&'a LogicalPlan);
1714 impl Display for Wrapper<'_> {
1715 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1716 match self.0 {
1717 LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1718 LogicalPlan::RecursiveQuery(RecursiveQuery {
1719 is_distinct, ..
1720 }) => {
1721 write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1722 }
1723 LogicalPlan::Values(Values { ref values, .. }) => {
1724 let str_values: Vec<_> = values
1725 .iter()
1726 .take(5)
1728 .map(|row| {
1729 let item = row
1730 .iter()
1731 .map(|expr| expr.to_string())
1732 .collect::<Vec<_>>()
1733 .join(", ");
1734 format!("({item})")
1735 })
1736 .collect();
1737
1738 let eclipse = if values.len() > 5 { "..." } else { "" };
1739 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1740 }
1741
1742 LogicalPlan::TableScan(TableScan {
1743 ref source,
1744 ref table_name,
1745 ref projection,
1746 ref filters,
1747 ref fetch,
1748 ..
1749 }) => {
1750 let projected_fields = match projection {
1751 Some(indices) => {
1752 let schema = source.schema();
1753 let names: Vec<&str> = indices
1754 .iter()
1755 .map(|i| schema.field(*i).name().as_str())
1756 .collect();
1757 format!(" projection=[{}]", names.join(", "))
1758 }
1759 _ => "".to_string(),
1760 };
1761
1762 write!(f, "TableScan: {table_name}{projected_fields}")?;
1763
1764 if !filters.is_empty() {
1765 let mut full_filter = vec![];
1766 let mut partial_filter = vec![];
1767 let mut unsupported_filters = vec![];
1768 let filters: Vec<&Expr> = filters.iter().collect();
1769
1770 if let Ok(results) =
1771 source.supports_filters_pushdown(&filters)
1772 {
1773 filters.iter().zip(results.iter()).for_each(
1774 |(x, res)| match res {
1775 TableProviderFilterPushDown::Exact => {
1776 full_filter.push(x)
1777 }
1778 TableProviderFilterPushDown::Inexact => {
1779 partial_filter.push(x)
1780 }
1781 TableProviderFilterPushDown::Unsupported => {
1782 unsupported_filters.push(x)
1783 }
1784 },
1785 );
1786 }
1787
1788 if !full_filter.is_empty() {
1789 write!(
1790 f,
1791 ", full_filters=[{}]",
1792 expr_vec_fmt!(full_filter)
1793 )?;
1794 };
1795 if !partial_filter.is_empty() {
1796 write!(
1797 f,
1798 ", partial_filters=[{}]",
1799 expr_vec_fmt!(partial_filter)
1800 )?;
1801 }
1802 if !unsupported_filters.is_empty() {
1803 write!(
1804 f,
1805 ", unsupported_filters=[{}]",
1806 expr_vec_fmt!(unsupported_filters)
1807 )?;
1808 }
1809 }
1810
1811 if let Some(n) = fetch {
1812 write!(f, ", fetch={n}")?;
1813 }
1814
1815 Ok(())
1816 }
1817 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1818 write!(f, "Projection: ")?;
1819 for (i, expr_item) in expr.iter().enumerate() {
1820 if i > 0 {
1821 write!(f, ", ")?;
1822 }
1823 write!(f, "{expr_item}")?;
1824 }
1825 Ok(())
1826 }
1827 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1828 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1829 }
1830 LogicalPlan::Copy(CopyTo {
1831 input: _,
1832 output_url,
1833 file_type,
1834 options,
1835 ..
1836 }) => {
1837 let op_str = options
1838 .iter()
1839 .map(|(k, v)| format!("{k} {v}"))
1840 .collect::<Vec<String>>()
1841 .join(", ");
1842
1843 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1844 }
1845 LogicalPlan::Ddl(ddl) => {
1846 write!(f, "{}", ddl.display())
1847 }
1848 LogicalPlan::Filter(Filter {
1849 predicate: ref expr,
1850 ..
1851 }) => write!(f, "Filter: {expr}"),
1852 LogicalPlan::Window(Window {
1853 ref window_expr, ..
1854 }) => {
1855 write!(
1856 f,
1857 "WindowAggr: windowExpr=[[{}]]",
1858 expr_vec_fmt!(window_expr)
1859 )
1860 }
1861 LogicalPlan::Aggregate(Aggregate {
1862 ref group_expr,
1863 ref aggr_expr,
1864 ..
1865 }) => write!(
1866 f,
1867 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1868 expr_vec_fmt!(group_expr),
1869 expr_vec_fmt!(aggr_expr)
1870 ),
1871 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1872 write!(f, "Sort: ")?;
1873 for (i, expr_item) in expr.iter().enumerate() {
1874 if i > 0 {
1875 write!(f, ", ")?;
1876 }
1877 write!(f, "{expr_item}")?;
1878 }
1879 if let Some(a) = fetch {
1880 write!(f, ", fetch={a}")?;
1881 }
1882
1883 Ok(())
1884 }
1885 LogicalPlan::Join(Join {
1886 on: ref keys,
1887 filter,
1888 join_constraint,
1889 join_type,
1890 ..
1891 }) => {
1892 let join_expr: Vec<String> =
1893 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1894 let filter_expr = filter
1895 .as_ref()
1896 .map(|expr| format!(" Filter: {expr}"))
1897 .unwrap_or_else(|| "".to_string());
1898 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1899 "Cross".to_string()
1900 } else {
1901 join_type.to_string()
1902 };
1903 match join_constraint {
1904 JoinConstraint::On => {
1905 write!(
1906 f,
1907 "{} Join: {}{}",
1908 join_type,
1909 join_expr.join(", "),
1910 filter_expr
1911 )
1912 }
1913 JoinConstraint::Using => {
1914 write!(
1915 f,
1916 "{} Join: Using {}{}",
1917 join_type,
1918 join_expr.join(", "),
1919 filter_expr,
1920 )
1921 }
1922 }
1923 }
1924 LogicalPlan::Repartition(Repartition {
1925 partitioning_scheme,
1926 ..
1927 }) => match partitioning_scheme {
1928 Partitioning::RoundRobinBatch(n) => {
1929 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1930 }
1931 Partitioning::Hash(expr, n) => {
1932 let hash_expr: Vec<String> =
1933 expr.iter().map(|e| format!("{e}")).collect();
1934 write!(
1935 f,
1936 "Repartition: Hash({}) partition_count={}",
1937 hash_expr.join(", "),
1938 n
1939 )
1940 }
1941 Partitioning::DistributeBy(expr) => {
1942 let dist_by_expr: Vec<String> =
1943 expr.iter().map(|e| format!("{e}")).collect();
1944 write!(
1945 f,
1946 "Repartition: DistributeBy({})",
1947 dist_by_expr.join(", "),
1948 )
1949 }
1950 },
1951 LogicalPlan::Limit(limit) => {
1952 let skip_str = match limit.get_skip_type() {
1954 Ok(SkipType::Literal(n)) => n.to_string(),
1955 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1956 };
1957 let fetch_str = match limit.get_fetch_type() {
1958 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1959 Ok(FetchType::Literal(None)) => "None".to_string(),
1960 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1961 };
1962 write!(
1963 f,
1964 "Limit: skip={}, fetch={}", skip_str,fetch_str,
1965 )
1966 }
1967 LogicalPlan::Subquery(Subquery { .. }) => {
1968 write!(f, "Subquery:")
1969 }
1970 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1971 write!(f, "SubqueryAlias: {alias}")
1972 }
1973 LogicalPlan::Statement(statement) => {
1974 write!(f, "{}", statement.display())
1975 }
1976 LogicalPlan::Distinct(distinct) => match distinct {
1977 Distinct::All(_) => write!(f, "Distinct:"),
1978 Distinct::On(DistinctOn {
1979 on_expr,
1980 select_expr,
1981 sort_expr,
1982 ..
1983 }) => write!(
1984 f,
1985 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1986 expr_vec_fmt!(on_expr),
1987 expr_vec_fmt!(select_expr),
1988 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1989 ),
1990 },
1991 LogicalPlan::Explain { .. } => write!(f, "Explain"),
1992 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1993 LogicalPlan::Union(_) => write!(f, "Union"),
1994 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1995 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1996 write!(f, "DescribeTable")
1997 }
1998 LogicalPlan::Unnest(Unnest {
1999 input: plan,
2000 list_type_columns: list_col_indices,
2001 struct_type_columns: struct_col_indices, .. }) => {
2002 let input_columns = plan.schema().columns();
2003 let list_type_columns = list_col_indices
2004 .iter()
2005 .map(|(i,unnest_info)|
2006 format!("{}|depth={}", &input_columns[*i].to_string(),
2007 unnest_info.depth))
2008 .collect::<Vec<String>>();
2009 let struct_type_columns = struct_col_indices
2010 .iter()
2011 .map(|i| &input_columns[*i])
2012 .collect::<Vec<&Column>>();
2013 write!(f, "Unnest: lists[{}] structs[{}]",
2015 expr_vec_fmt!(list_type_columns),
2016 expr_vec_fmt!(struct_type_columns))
2017 }
2018 }
2019 }
2020 }
2021 Wrapper(self)
2022 }
2023}
2024
2025impl Display for LogicalPlan {
2026 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2027 self.display_indent().fmt(f)
2028 }
2029}
2030
2031impl ToStringifiedPlan for LogicalPlan {
2032 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2033 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2034 }
2035}
2036
2037#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2039pub struct EmptyRelation {
2040 pub produce_one_row: bool,
2042 pub schema: DFSchemaRef,
2044}
2045
2046impl PartialOrd for EmptyRelation {
2048 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2049 self.produce_one_row.partial_cmp(&other.produce_one_row)
2050 }
2051}
2052
2053#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2076pub struct RecursiveQuery {
2077 pub name: String,
2079 pub static_term: Arc<LogicalPlan>,
2081 pub recursive_term: Arc<LogicalPlan>,
2084 pub is_distinct: bool,
2087}
2088
2089#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2093pub struct Values {
2094 pub schema: DFSchemaRef,
2096 pub values: Vec<Vec<Expr>>,
2098}
2099
2100impl PartialOrd for Values {
2102 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2103 self.values.partial_cmp(&other.values)
2104 }
2105}
2106
2107#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2110#[non_exhaustive]
2112pub struct Projection {
2113 pub expr: Vec<Expr>,
2115 pub input: Arc<LogicalPlan>,
2117 pub schema: DFSchemaRef,
2119}
2120
2121impl PartialOrd for Projection {
2123 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2124 match self.expr.partial_cmp(&other.expr) {
2125 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2126 cmp => cmp,
2127 }
2128 }
2129}
2130
2131impl Projection {
2132 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2134 let projection_schema = projection_schema(&input, &expr)?;
2135 Self::try_new_with_schema(expr, input, projection_schema)
2136 }
2137
2138 pub fn try_new_with_schema(
2140 expr: Vec<Expr>,
2141 input: Arc<LogicalPlan>,
2142 schema: DFSchemaRef,
2143 ) -> Result<Self> {
2144 #[expect(deprecated)]
2145 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2146 && expr.len() != schema.fields().len()
2147 {
2148 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2149 }
2150 Ok(Self {
2151 expr,
2152 input,
2153 schema,
2154 })
2155 }
2156
2157 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2159 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2160 Self {
2161 expr,
2162 input,
2163 schema,
2164 }
2165 }
2166}
2167
2168pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2182 let metadata = input.schema().metadata().clone();
2183
2184 let schema =
2185 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2186 .with_functional_dependencies(calc_func_dependencies_for_project(
2187 exprs, input,
2188 )?)?;
2189
2190 Ok(Arc::new(schema))
2191}
2192
2193#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2195#[non_exhaustive]
2197pub struct SubqueryAlias {
2198 pub input: Arc<LogicalPlan>,
2200 pub alias: TableReference,
2202 pub schema: DFSchemaRef,
2204}
2205
2206impl SubqueryAlias {
2207 pub fn try_new(
2208 plan: Arc<LogicalPlan>,
2209 alias: impl Into<TableReference>,
2210 ) -> Result<Self> {
2211 let alias = alias.into();
2212 let fields = change_redundant_column(plan.schema().fields());
2213 let meta_data = plan.schema().as_ref().metadata().clone();
2214 let schema: Schema =
2215 DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2216 let func_dependencies = plan.schema().functional_dependencies().clone();
2219 let schema = DFSchemaRef::new(
2220 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2221 .with_functional_dependencies(func_dependencies)?,
2222 );
2223 Ok(SubqueryAlias {
2224 input: plan,
2225 alias,
2226 schema,
2227 })
2228 }
2229}
2230
2231impl PartialOrd for SubqueryAlias {
2233 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2234 match self.input.partial_cmp(&other.input) {
2235 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2236 cmp => cmp,
2237 }
2238 }
2239}
2240
2241#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2253#[non_exhaustive]
2254pub struct Filter {
2255 pub predicate: Expr,
2257 pub input: Arc<LogicalPlan>,
2259 pub having: bool,
2261}
2262
2263impl Filter {
2264 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2269 Self::try_new_internal(predicate, input, false)
2270 }
2271
2272 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2275 Self::try_new_internal(predicate, input, true)
2276 }
2277
2278 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2279 match data_type {
2280 DataType::Boolean | DataType::Null => true,
2282 DataType::Dictionary(_, value_type) => {
2283 Filter::is_allowed_filter_type(value_type.as_ref())
2284 }
2285 _ => false,
2286 }
2287 }
2288
2289 fn try_new_internal(
2290 predicate: Expr,
2291 input: Arc<LogicalPlan>,
2292 having: bool,
2293 ) -> Result<Self> {
2294 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2299 if !Filter::is_allowed_filter_type(&predicate_type) {
2300 return plan_err!(
2301 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2302 );
2303 }
2304 }
2305
2306 Ok(Self {
2307 predicate: predicate.unalias_nested().data,
2308 input,
2309 having,
2310 })
2311 }
2312
2313 fn is_scalar(&self) -> bool {
2329 let schema = self.input.schema();
2330
2331 let functional_dependencies = self.input.schema().functional_dependencies();
2332 let unique_keys = functional_dependencies.iter().filter(|dep| {
2333 let nullable = dep.nullable
2334 && dep
2335 .source_indices
2336 .iter()
2337 .any(|&source| schema.field(source).is_nullable());
2338 !nullable
2339 && dep.mode == Dependency::Single
2340 && dep.target_indices.len() == schema.fields().len()
2341 });
2342
2343 let exprs = split_conjunction(&self.predicate);
2344 let eq_pred_cols: HashSet<_> = exprs
2345 .iter()
2346 .filter_map(|expr| {
2347 let Expr::BinaryExpr(BinaryExpr {
2348 left,
2349 op: Operator::Eq,
2350 right,
2351 }) = expr
2352 else {
2353 return None;
2354 };
2355 if left == right {
2357 return None;
2358 }
2359
2360 match (left.as_ref(), right.as_ref()) {
2361 (Expr::Column(_), Expr::Column(_)) => None,
2362 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2363 Some(schema.index_of_column(c).unwrap())
2364 }
2365 _ => None,
2366 }
2367 })
2368 .collect();
2369
2370 for key in unique_keys {
2373 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2374 return true;
2375 }
2376 }
2377 false
2378 }
2379}
2380
2381#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2396pub struct Window {
2397 pub input: Arc<LogicalPlan>,
2399 pub window_expr: Vec<Expr>,
2401 pub schema: DFSchemaRef,
2403}
2404
2405impl Window {
2406 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2408 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2409 .schema()
2410 .iter()
2411 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2412 .collect();
2413 let input_len = fields.len();
2414 let mut window_fields = fields;
2415 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2416 window_fields.extend_from_slice(expr_fields.as_slice());
2417 let metadata = input.schema().metadata().clone();
2418
2419 let mut window_func_dependencies =
2421 input.schema().functional_dependencies().clone();
2422 window_func_dependencies.extend_target_indices(window_fields.len());
2423
2424 let mut new_dependencies = window_expr
2428 .iter()
2429 .enumerate()
2430 .filter_map(|(idx, expr)| {
2431 if let Expr::WindowFunction(WindowFunction {
2432 fun: WindowFunctionDefinition::WindowUDF(udwf),
2433 params: WindowFunctionParams { partition_by, .. },
2434 }) = expr
2435 {
2436 if udwf.name() == "row_number" && partition_by.is_empty() {
2439 return Some(idx + input_len);
2440 }
2441 }
2442 None
2443 })
2444 .map(|idx| {
2445 FunctionalDependence::new(vec![idx], vec![], false)
2446 .with_mode(Dependency::Single)
2447 })
2448 .collect::<Vec<_>>();
2449
2450 if !new_dependencies.is_empty() {
2451 for dependence in new_dependencies.iter_mut() {
2452 dependence.target_indices = (0..window_fields.len()).collect();
2453 }
2454 let new_deps = FunctionalDependencies::new(new_dependencies);
2456 window_func_dependencies.extend(new_deps);
2457 }
2458
2459 Self::try_new_with_schema(
2460 window_expr,
2461 input,
2462 Arc::new(
2463 DFSchema::new_with_metadata(window_fields, metadata)?
2464 .with_functional_dependencies(window_func_dependencies)?,
2465 ),
2466 )
2467 }
2468
2469 pub fn try_new_with_schema(
2470 window_expr: Vec<Expr>,
2471 input: Arc<LogicalPlan>,
2472 schema: DFSchemaRef,
2473 ) -> Result<Self> {
2474 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2475 return plan_err!(
2476 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2477 window_expr.len(),
2478 schema.fields().len() - input.schema().fields().len()
2479 );
2480 }
2481
2482 Ok(Window {
2483 input,
2484 window_expr,
2485 schema,
2486 })
2487 }
2488}
2489
2490impl PartialOrd for Window {
2492 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2493 match self.input.partial_cmp(&other.input) {
2494 Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2495 cmp => cmp,
2496 }
2497 }
2498}
2499
2500#[derive(Clone)]
2502pub struct TableScan {
2503 pub table_name: TableReference,
2505 pub source: Arc<dyn TableSource>,
2507 pub projection: Option<Vec<usize>>,
2509 pub projected_schema: DFSchemaRef,
2511 pub filters: Vec<Expr>,
2513 pub fetch: Option<usize>,
2515}
2516
2517impl Debug for TableScan {
2518 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2519 f.debug_struct("TableScan")
2520 .field("table_name", &self.table_name)
2521 .field("source", &"...")
2522 .field("projection", &self.projection)
2523 .field("projected_schema", &self.projected_schema)
2524 .field("filters", &self.filters)
2525 .field("fetch", &self.fetch)
2526 .finish_non_exhaustive()
2527 }
2528}
2529
2530impl PartialEq for TableScan {
2531 fn eq(&self, other: &Self) -> bool {
2532 self.table_name == other.table_name
2533 && self.projection == other.projection
2534 && self.projected_schema == other.projected_schema
2535 && self.filters == other.filters
2536 && self.fetch == other.fetch
2537 }
2538}
2539
2540impl Eq for TableScan {}
2541
2542impl PartialOrd for TableScan {
2545 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2546 #[derive(PartialEq, PartialOrd)]
2547 struct ComparableTableScan<'a> {
2548 pub table_name: &'a TableReference,
2550 pub projection: &'a Option<Vec<usize>>,
2552 pub filters: &'a Vec<Expr>,
2554 pub fetch: &'a Option<usize>,
2556 }
2557 let comparable_self = ComparableTableScan {
2558 table_name: &self.table_name,
2559 projection: &self.projection,
2560 filters: &self.filters,
2561 fetch: &self.fetch,
2562 };
2563 let comparable_other = ComparableTableScan {
2564 table_name: &other.table_name,
2565 projection: &other.projection,
2566 filters: &other.filters,
2567 fetch: &other.fetch,
2568 };
2569 comparable_self.partial_cmp(&comparable_other)
2570 }
2571}
2572
2573impl Hash for TableScan {
2574 fn hash<H: Hasher>(&self, state: &mut H) {
2575 self.table_name.hash(state);
2576 self.projection.hash(state);
2577 self.projected_schema.hash(state);
2578 self.filters.hash(state);
2579 self.fetch.hash(state);
2580 }
2581}
2582
2583impl TableScan {
2584 pub fn try_new(
2587 table_name: impl Into<TableReference>,
2588 table_source: Arc<dyn TableSource>,
2589 projection: Option<Vec<usize>>,
2590 filters: Vec<Expr>,
2591 fetch: Option<usize>,
2592 ) -> Result<Self> {
2593 let table_name = table_name.into();
2594
2595 if table_name.table().is_empty() {
2596 return plan_err!("table_name cannot be empty");
2597 }
2598 let schema = table_source.schema();
2599 let func_dependencies = FunctionalDependencies::new_from_constraints(
2600 table_source.constraints(),
2601 schema.fields.len(),
2602 );
2603 let projected_schema = projection
2604 .as_ref()
2605 .map(|p| {
2606 let projected_func_dependencies =
2607 func_dependencies.project_functional_dependencies(p, p.len());
2608
2609 let df_schema = DFSchema::new_with_metadata(
2610 p.iter()
2611 .map(|i| {
2612 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2613 })
2614 .collect(),
2615 schema.metadata.clone(),
2616 )?;
2617 df_schema.with_functional_dependencies(projected_func_dependencies)
2618 })
2619 .unwrap_or_else(|| {
2620 let df_schema =
2621 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2622 df_schema.with_functional_dependencies(func_dependencies)
2623 })?;
2624 let projected_schema = Arc::new(projected_schema);
2625
2626 Ok(Self {
2627 table_name,
2628 source: table_source,
2629 projection,
2630 projected_schema,
2631 filters,
2632 fetch,
2633 })
2634 }
2635}
2636
2637#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2639pub struct Repartition {
2640 pub input: Arc<LogicalPlan>,
2642 pub partitioning_scheme: Partitioning,
2644}
2645
2646#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2648pub struct Union {
2649 pub inputs: Vec<Arc<LogicalPlan>>,
2651 pub schema: DFSchemaRef,
2653}
2654
2655impl Union {
2656 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2658 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2659 Ok(Union { inputs, schema })
2660 }
2661
2662 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2667 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2668 Ok(Union { inputs, schema })
2669 }
2670
2671 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2675 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2676 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2677
2678 Ok(Union { inputs, schema })
2679 }
2680
2681 fn rewrite_inputs_from_schema(
2685 schema: &DFSchema,
2686 inputs: Vec<Arc<LogicalPlan>>,
2687 ) -> Result<Vec<Arc<LogicalPlan>>> {
2688 let schema_width = schema.iter().count();
2689 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2690 for input in inputs {
2691 if input.schema().iter().count() == schema_width {
2695 wrapped_inputs.push(input);
2696 continue;
2697 }
2698
2699 let mut expr = Vec::with_capacity(schema_width);
2703 for column in schema.columns() {
2704 if input
2705 .schema()
2706 .has_column_with_unqualified_name(column.name())
2707 {
2708 expr.push(Expr::Column(column));
2709 } else {
2710 expr.push(Expr::Literal(ScalarValue::Null).alias(column.name()));
2711 }
2712 }
2713 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(Projection::try_new(
2714 expr, input,
2715 )?)));
2716 }
2717
2718 Ok(wrapped_inputs)
2719 }
2720
2721 fn derive_schema_from_inputs(
2730 inputs: &[Arc<LogicalPlan>],
2731 loose_types: bool,
2732 by_name: bool,
2733 ) -> Result<DFSchemaRef> {
2734 if inputs.len() < 2 {
2735 return plan_err!("UNION requires at least two inputs");
2736 }
2737
2738 if by_name {
2739 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2740 } else {
2741 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2742 }
2743 }
2744
2745 fn derive_schema_from_inputs_by_name(
2746 inputs: &[Arc<LogicalPlan>],
2747 loose_types: bool,
2748 ) -> Result<DFSchemaRef> {
2749 type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap<String, String>>);
2750 let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new();
2752 for input in inputs.iter() {
2753 for field in input.schema().fields() {
2754 match cols.entry(field.name()) {
2755 std::collections::btree_map::Entry::Occupied(mut occupied) => {
2756 let (data_type, is_nullable, metadata) = occupied.get_mut();
2757 if !loose_types && *data_type != field.data_type() {
2758 return plan_err!(
2759 "Found different types for field {}",
2760 field.name()
2761 );
2762 }
2763
2764 metadata.push(field.metadata());
2765 *is_nullable |= field.is_nullable();
2768 }
2769 std::collections::btree_map::Entry::Vacant(vacant) => {
2770 vacant.insert((
2771 field.data_type(),
2772 field.is_nullable(),
2773 vec![field.metadata()],
2774 ));
2775 }
2776 }
2777 }
2778 }
2779
2780 let union_fields = cols
2781 .into_iter()
2782 .map(|(name, (data_type, is_nullable, unmerged_metadata))| {
2783 let mut field = Field::new(name, data_type.clone(), is_nullable);
2784 field.set_metadata(intersect_maps(unmerged_metadata));
2785
2786 (None, Arc::new(field))
2787 })
2788 .collect::<Vec<(Option<TableReference>, _)>>();
2789
2790 let union_schema_metadata =
2791 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2792
2793 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2795 let schema = Arc::new(schema);
2796
2797 Ok(schema)
2798 }
2799
2800 fn derive_schema_from_inputs_by_position(
2801 inputs: &[Arc<LogicalPlan>],
2802 loose_types: bool,
2803 ) -> Result<DFSchemaRef> {
2804 let first_schema = inputs[0].schema();
2805 let fields_count = first_schema.fields().len();
2806 for input in inputs.iter().skip(1) {
2807 if fields_count != input.schema().fields().len() {
2808 return plan_err!(
2809 "UNION queries have different number of columns: \
2810 left has {} columns whereas right has {} columns",
2811 fields_count,
2812 input.schema().fields().len()
2813 );
2814 }
2815 }
2816
2817 let union_fields = (0..fields_count)
2818 .map(|i| {
2819 let fields = inputs
2820 .iter()
2821 .map(|input| input.schema().field(i))
2822 .collect::<Vec<_>>();
2823 let first_field = fields[0];
2824 let name = first_field.name();
2825 let data_type = if loose_types {
2826 first_field.data_type()
2830 } else {
2831 fields.iter().skip(1).try_fold(
2832 first_field.data_type(),
2833 |acc, field| {
2834 if acc != field.data_type() {
2835 return plan_err!(
2836 "UNION field {i} have different type in inputs: \
2837 left has {} whereas right has {}",
2838 first_field.data_type(),
2839 field.data_type()
2840 );
2841 }
2842 Ok(acc)
2843 },
2844 )?
2845 };
2846 let nullable = fields.iter().any(|field| field.is_nullable());
2847 let mut field = Field::new(name, data_type.clone(), nullable);
2848 let field_metadata =
2849 intersect_maps(fields.iter().map(|field| field.metadata()));
2850 field.set_metadata(field_metadata);
2851 let table_reference = first_schema.qualified_field(i).0.cloned();
2853 Ok((table_reference, Arc::new(field)))
2854 })
2855 .collect::<Result<_>>()?;
2856 let union_schema_metadata =
2857 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2858
2859 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2861 let schema = Arc::new(schema);
2862
2863 Ok(schema)
2864 }
2865}
2866
2867fn intersect_maps<'a>(
2868 inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2869) -> HashMap<String, String> {
2870 let mut inputs = inputs.into_iter();
2871 let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2872 for input in inputs {
2873 merged.retain(|k, v| input.get(k) == Some(&*v));
2878 }
2879 merged
2880}
2881
2882impl PartialOrd for Union {
2884 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2885 self.inputs.partial_cmp(&other.inputs)
2886 }
2887}
2888
2889#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2912pub struct DescribeTable {
2913 pub schema: Arc<Schema>,
2915 pub output_schema: DFSchemaRef,
2917}
2918
2919impl PartialOrd for DescribeTable {
2922 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2923 None
2925 }
2926}
2927
2928#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2931pub struct Explain {
2932 pub verbose: bool,
2934 pub plan: Arc<LogicalPlan>,
2936 pub stringified_plans: Vec<StringifiedPlan>,
2938 pub schema: DFSchemaRef,
2940 pub logical_optimization_succeeded: bool,
2942}
2943
2944impl PartialOrd for Explain {
2946 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2947 #[derive(PartialEq, PartialOrd)]
2948 struct ComparableExplain<'a> {
2949 pub verbose: &'a bool,
2951 pub plan: &'a Arc<LogicalPlan>,
2953 pub stringified_plans: &'a Vec<StringifiedPlan>,
2955 pub logical_optimization_succeeded: &'a bool,
2957 }
2958 let comparable_self = ComparableExplain {
2959 verbose: &self.verbose,
2960 plan: &self.plan,
2961 stringified_plans: &self.stringified_plans,
2962 logical_optimization_succeeded: &self.logical_optimization_succeeded,
2963 };
2964 let comparable_other = ComparableExplain {
2965 verbose: &other.verbose,
2966 plan: &other.plan,
2967 stringified_plans: &other.stringified_plans,
2968 logical_optimization_succeeded: &other.logical_optimization_succeeded,
2969 };
2970 comparable_self.partial_cmp(&comparable_other)
2971 }
2972}
2973
2974#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2977pub struct Analyze {
2978 pub verbose: bool,
2980 pub input: Arc<LogicalPlan>,
2982 pub schema: DFSchemaRef,
2984}
2985
2986impl PartialOrd for Analyze {
2988 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2989 match self.verbose.partial_cmp(&other.verbose) {
2990 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2991 cmp => cmp,
2992 }
2993 }
2994}
2995
2996#[allow(clippy::derived_hash_with_manual_eq)]
3001#[derive(Debug, Clone, Eq, Hash)]
3002pub struct Extension {
3003 pub node: Arc<dyn UserDefinedLogicalNode>,
3005}
3006
3007impl PartialEq for Extension {
3011 fn eq(&self, other: &Self) -> bool {
3012 self.node.eq(&other.node)
3013 }
3014}
3015
3016impl PartialOrd for Extension {
3017 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3018 self.node.partial_cmp(&other.node)
3019 }
3020}
3021
3022#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3024pub struct Limit {
3025 pub skip: Option<Box<Expr>>,
3027 pub fetch: Option<Box<Expr>>,
3030 pub input: Arc<LogicalPlan>,
3032}
3033
3034pub enum SkipType {
3036 Literal(usize),
3038 UnsupportedExpr,
3040}
3041
3042pub enum FetchType {
3044 Literal(Option<usize>),
3047 UnsupportedExpr,
3049}
3050
3051impl Limit {
3052 pub fn get_skip_type(&self) -> Result<SkipType> {
3054 match self.skip.as_deref() {
3055 Some(expr) => match *expr {
3056 Expr::Literal(ScalarValue::Int64(s)) => {
3057 let s = s.unwrap_or(0);
3059 if s >= 0 {
3060 Ok(SkipType::Literal(s as usize))
3061 } else {
3062 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3063 }
3064 }
3065 _ => Ok(SkipType::UnsupportedExpr),
3066 },
3067 None => Ok(SkipType::Literal(0)),
3069 }
3070 }
3071
3072 pub fn get_fetch_type(&self) -> Result<FetchType> {
3074 match self.fetch.as_deref() {
3075 Some(expr) => match *expr {
3076 Expr::Literal(ScalarValue::Int64(Some(s))) => {
3077 if s >= 0 {
3078 Ok(FetchType::Literal(Some(s as usize)))
3079 } else {
3080 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3081 }
3082 }
3083 Expr::Literal(ScalarValue::Int64(None)) => Ok(FetchType::Literal(None)),
3084 _ => Ok(FetchType::UnsupportedExpr),
3085 },
3086 None => Ok(FetchType::Literal(None)),
3087 }
3088 }
3089}
3090
3091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3093pub enum Distinct {
3094 All(Arc<LogicalPlan>),
3096 On(DistinctOn),
3098}
3099
3100impl Distinct {
3101 pub fn input(&self) -> &Arc<LogicalPlan> {
3103 match self {
3104 Distinct::All(input) => input,
3105 Distinct::On(DistinctOn { input, .. }) => input,
3106 }
3107 }
3108}
3109
3110#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3112pub struct DistinctOn {
3113 pub on_expr: Vec<Expr>,
3115 pub select_expr: Vec<Expr>,
3117 pub sort_expr: Option<Vec<SortExpr>>,
3121 pub input: Arc<LogicalPlan>,
3123 pub schema: DFSchemaRef,
3125}
3126
3127impl DistinctOn {
3128 pub fn try_new(
3130 on_expr: Vec<Expr>,
3131 select_expr: Vec<Expr>,
3132 sort_expr: Option<Vec<SortExpr>>,
3133 input: Arc<LogicalPlan>,
3134 ) -> Result<Self> {
3135 if on_expr.is_empty() {
3136 return plan_err!("No `ON` expressions provided");
3137 }
3138
3139 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3140 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3141 .into_iter()
3142 .collect();
3143
3144 let dfschema = DFSchema::new_with_metadata(
3145 qualified_fields,
3146 input.schema().metadata().clone(),
3147 )?;
3148
3149 let mut distinct_on = DistinctOn {
3150 on_expr,
3151 select_expr,
3152 sort_expr: None,
3153 input,
3154 schema: Arc::new(dfschema),
3155 };
3156
3157 if let Some(sort_expr) = sort_expr {
3158 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3159 }
3160
3161 Ok(distinct_on)
3162 }
3163
3164 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3168 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3169
3170 let mut matched = true;
3172 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3173 if on != &sort.expr {
3174 matched = false;
3175 break;
3176 }
3177 }
3178
3179 if self.on_expr.len() > sort_expr.len() || !matched {
3180 return plan_err!(
3181 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3182 );
3183 }
3184
3185 self.sort_expr = Some(sort_expr);
3186 Ok(self)
3187 }
3188}
3189
3190impl PartialOrd for DistinctOn {
3192 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3193 #[derive(PartialEq, PartialOrd)]
3194 struct ComparableDistinctOn<'a> {
3195 pub on_expr: &'a Vec<Expr>,
3197 pub select_expr: &'a Vec<Expr>,
3199 pub sort_expr: &'a Option<Vec<SortExpr>>,
3203 pub input: &'a Arc<LogicalPlan>,
3205 }
3206 let comparable_self = ComparableDistinctOn {
3207 on_expr: &self.on_expr,
3208 select_expr: &self.select_expr,
3209 sort_expr: &self.sort_expr,
3210 input: &self.input,
3211 };
3212 let comparable_other = ComparableDistinctOn {
3213 on_expr: &other.on_expr,
3214 select_expr: &other.select_expr,
3215 sort_expr: &other.sort_expr,
3216 input: &other.input,
3217 };
3218 comparable_self.partial_cmp(&comparable_other)
3219 }
3220}
3221
3222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3235#[non_exhaustive]
3237pub struct Aggregate {
3238 pub input: Arc<LogicalPlan>,
3240 pub group_expr: Vec<Expr>,
3242 pub aggr_expr: Vec<Expr>,
3244 pub schema: DFSchemaRef,
3246}
3247
3248impl Aggregate {
3249 pub fn try_new(
3251 input: Arc<LogicalPlan>,
3252 group_expr: Vec<Expr>,
3253 aggr_expr: Vec<Expr>,
3254 ) -> Result<Self> {
3255 let group_expr = enumerate_grouping_sets(group_expr)?;
3256
3257 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3258
3259 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3260
3261 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3262
3263 if is_grouping_set {
3265 qualified_fields = qualified_fields
3266 .into_iter()
3267 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3268 .collect::<Vec<_>>();
3269 qualified_fields.push((
3270 None,
3271 Field::new(
3272 Self::INTERNAL_GROUPING_ID,
3273 Self::grouping_id_type(qualified_fields.len()),
3274 false,
3275 )
3276 .into(),
3277 ));
3278 }
3279
3280 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3281
3282 let schema = DFSchema::new_with_metadata(
3283 qualified_fields,
3284 input.schema().metadata().clone(),
3285 )?;
3286
3287 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3288 }
3289
3290 pub fn try_new_with_schema(
3296 input: Arc<LogicalPlan>,
3297 group_expr: Vec<Expr>,
3298 aggr_expr: Vec<Expr>,
3299 schema: DFSchemaRef,
3300 ) -> Result<Self> {
3301 if group_expr.is_empty() && aggr_expr.is_empty() {
3302 return plan_err!(
3303 "Aggregate requires at least one grouping or aggregate expression"
3304 );
3305 }
3306 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3307 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3308 return plan_err!(
3309 "Aggregate schema has wrong number of fields. Expected {} got {}",
3310 group_expr_count + aggr_expr.len(),
3311 schema.fields().len()
3312 );
3313 }
3314
3315 let aggregate_func_dependencies =
3316 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3317 let new_schema = schema.as_ref().clone();
3318 let schema = Arc::new(
3319 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3320 );
3321 Ok(Self {
3322 input,
3323 group_expr,
3324 aggr_expr,
3325 schema,
3326 })
3327 }
3328
3329 fn is_grouping_set(&self) -> bool {
3330 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3331 }
3332
3333 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3335 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3336 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3337 });
3338 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3339 if self.is_grouping_set() {
3340 exprs.push(&INTERNAL_ID_EXPR);
3341 }
3342 exprs.extend(self.aggr_expr.iter());
3343 debug_assert!(exprs.len() == self.schema.fields().len());
3344 Ok(exprs)
3345 }
3346
3347 pub fn group_expr_len(&self) -> Result<usize> {
3351 grouping_set_expr_count(&self.group_expr)
3352 }
3353
3354 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3359 if group_exprs <= 8 {
3360 DataType::UInt8
3361 } else if group_exprs <= 16 {
3362 DataType::UInt16
3363 } else if group_exprs <= 32 {
3364 DataType::UInt32
3365 } else {
3366 DataType::UInt64
3367 }
3368 }
3369
3370 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3388}
3389
3390impl PartialOrd for Aggregate {
3392 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3393 match self.input.partial_cmp(&other.input) {
3394 Some(Ordering::Equal) => {
3395 match self.group_expr.partial_cmp(&other.group_expr) {
3396 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3397 cmp => cmp,
3398 }
3399 }
3400 cmp => cmp,
3401 }
3402 }
3403}
3404
3405fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3407 group_expr
3408 .iter()
3409 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3410}
3411
3412fn calc_func_dependencies_for_aggregate(
3414 group_expr: &[Expr],
3416 input: &LogicalPlan,
3418 aggr_schema: &DFSchema,
3420) -> Result<FunctionalDependencies> {
3421 if !contains_grouping_set(group_expr) {
3427 let group_by_expr_names = group_expr
3428 .iter()
3429 .map(|item| item.schema_name().to_string())
3430 .collect::<IndexSet<_>>()
3431 .into_iter()
3432 .collect::<Vec<_>>();
3433 let aggregate_func_dependencies = aggregate_functional_dependencies(
3434 input.schema(),
3435 &group_by_expr_names,
3436 aggr_schema,
3437 );
3438 Ok(aggregate_func_dependencies)
3439 } else {
3440 Ok(FunctionalDependencies::empty())
3441 }
3442}
3443
3444fn calc_func_dependencies_for_project(
3447 exprs: &[Expr],
3448 input: &LogicalPlan,
3449) -> Result<FunctionalDependencies> {
3450 let input_fields = input.schema().field_names();
3451 let proj_indices = exprs
3453 .iter()
3454 .map(|expr| match expr {
3455 #[expect(deprecated)]
3456 Expr::Wildcard { qualifier, options } => {
3457 let wildcard_fields = exprlist_to_fields(
3458 vec![&Expr::Wildcard {
3459 qualifier: qualifier.clone(),
3460 options: options.clone(),
3461 }],
3462 input,
3463 )?;
3464 Ok::<_, DataFusionError>(
3465 wildcard_fields
3466 .into_iter()
3467 .filter_map(|(qualifier, f)| {
3468 let flat_name = qualifier
3469 .map(|t| format!("{}.{}", t, f.name()))
3470 .unwrap_or_else(|| f.name().clone());
3471 input_fields.iter().position(|item| *item == flat_name)
3472 })
3473 .collect::<Vec<_>>(),
3474 )
3475 }
3476 Expr::Alias(alias) => {
3477 let name = format!("{}", alias.expr);
3478 Ok(input_fields
3479 .iter()
3480 .position(|item| *item == name)
3481 .map(|i| vec![i])
3482 .unwrap_or(vec![]))
3483 }
3484 _ => {
3485 let name = format!("{}", expr);
3486 Ok(input_fields
3487 .iter()
3488 .position(|item| *item == name)
3489 .map(|i| vec![i])
3490 .unwrap_or(vec![]))
3491 }
3492 })
3493 .collect::<Result<Vec<_>>>()?
3494 .into_iter()
3495 .flatten()
3496 .collect::<Vec<_>>();
3497
3498 let len = exprlist_len(exprs, input.schema(), Some(find_base_plan(input).schema()))?;
3499 Ok(input
3500 .schema()
3501 .functional_dependencies()
3502 .project_functional_dependencies(&proj_indices, len))
3503}
3504
3505#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3507pub struct Sort {
3508 pub expr: Vec<SortExpr>,
3510 pub input: Arc<LogicalPlan>,
3512 pub fetch: Option<usize>,
3514}
3515
3516#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3518pub struct Join {
3519 pub left: Arc<LogicalPlan>,
3521 pub right: Arc<LogicalPlan>,
3523 pub on: Vec<(Expr, Expr)>,
3525 pub filter: Option<Expr>,
3527 pub join_type: JoinType,
3529 pub join_constraint: JoinConstraint,
3531 pub schema: DFSchemaRef,
3533 pub null_equals_null: bool,
3535}
3536
3537impl Join {
3538 pub fn try_new_with_project_input(
3540 original: &LogicalPlan,
3541 left: Arc<LogicalPlan>,
3542 right: Arc<LogicalPlan>,
3543 column_on: (Vec<Column>, Vec<Column>),
3544 ) -> Result<Self> {
3545 let original_join = match original {
3546 LogicalPlan::Join(join) => join,
3547 _ => return plan_err!("Could not create join with project input"),
3548 };
3549
3550 let on: Vec<(Expr, Expr)> = column_on
3551 .0
3552 .into_iter()
3553 .zip(column_on.1)
3554 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3555 .collect();
3556 let join_schema =
3557 build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
3558
3559 Ok(Join {
3560 left,
3561 right,
3562 on,
3563 filter: original_join.filter.clone(),
3564 join_type: original_join.join_type,
3565 join_constraint: original_join.join_constraint,
3566 schema: Arc::new(join_schema),
3567 null_equals_null: original_join.null_equals_null,
3568 })
3569 }
3570}
3571
3572impl PartialOrd for Join {
3574 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3575 #[derive(PartialEq, PartialOrd)]
3576 struct ComparableJoin<'a> {
3577 pub left: &'a Arc<LogicalPlan>,
3579 pub right: &'a Arc<LogicalPlan>,
3581 pub on: &'a Vec<(Expr, Expr)>,
3583 pub filter: &'a Option<Expr>,
3585 pub join_type: &'a JoinType,
3587 pub join_constraint: &'a JoinConstraint,
3589 pub null_equals_null: &'a bool,
3591 }
3592 let comparable_self = ComparableJoin {
3593 left: &self.left,
3594 right: &self.right,
3595 on: &self.on,
3596 filter: &self.filter,
3597 join_type: &self.join_type,
3598 join_constraint: &self.join_constraint,
3599 null_equals_null: &self.null_equals_null,
3600 };
3601 let comparable_other = ComparableJoin {
3602 left: &other.left,
3603 right: &other.right,
3604 on: &other.on,
3605 filter: &other.filter,
3606 join_type: &other.join_type,
3607 join_constraint: &other.join_constraint,
3608 null_equals_null: &other.null_equals_null,
3609 };
3610 comparable_self.partial_cmp(&comparable_other)
3611 }
3612}
3613
3614#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3616pub struct Subquery {
3617 pub subquery: Arc<LogicalPlan>,
3619 pub outer_ref_columns: Vec<Expr>,
3621}
3622
3623impl Normalizeable for Subquery {
3624 fn can_normalize(&self) -> bool {
3625 false
3626 }
3627}
3628
3629impl NormalizeEq for Subquery {
3630 fn normalize_eq(&self, other: &Self) -> bool {
3631 *self.subquery == *other.subquery
3633 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3634 && self
3635 .outer_ref_columns
3636 .iter()
3637 .zip(other.outer_ref_columns.iter())
3638 .all(|(a, b)| a.normalize_eq(b))
3639 }
3640}
3641
3642impl Subquery {
3643 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3644 match plan {
3645 Expr::ScalarSubquery(it) => Ok(it),
3646 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3647 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3648 }
3649 }
3650
3651 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3652 Subquery {
3653 subquery: plan,
3654 outer_ref_columns: self.outer_ref_columns.clone(),
3655 }
3656 }
3657}
3658
3659impl Debug for Subquery {
3660 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3661 write!(f, "<subquery>")
3662 }
3663}
3664
3665#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3671pub enum Partitioning {
3672 RoundRobinBatch(usize),
3674 Hash(Vec<Expr>, usize),
3677 DistributeBy(Vec<Expr>),
3679}
3680
3681#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3701pub struct ColumnUnnestList {
3702 pub output_column: Column,
3703 pub depth: usize,
3704}
3705
3706impl Display for ColumnUnnestList {
3707 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3708 write!(f, "{}|depth={}", self.output_column, self.depth)
3709 }
3710}
3711
3712#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3715pub struct Unnest {
3716 pub input: Arc<LogicalPlan>,
3718 pub exec_columns: Vec<Column>,
3720 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3723 pub struct_type_columns: Vec<usize>,
3726 pub dependency_indices: Vec<usize>,
3729 pub schema: DFSchemaRef,
3731 pub options: UnnestOptions,
3733}
3734
3735impl PartialOrd for Unnest {
3737 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3738 #[derive(PartialEq, PartialOrd)]
3739 struct ComparableUnnest<'a> {
3740 pub input: &'a Arc<LogicalPlan>,
3742 pub exec_columns: &'a Vec<Column>,
3744 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
3747 pub struct_type_columns: &'a Vec<usize>,
3750 pub dependency_indices: &'a Vec<usize>,
3753 pub options: &'a UnnestOptions,
3755 }
3756 let comparable_self = ComparableUnnest {
3757 input: &self.input,
3758 exec_columns: &self.exec_columns,
3759 list_type_columns: &self.list_type_columns,
3760 struct_type_columns: &self.struct_type_columns,
3761 dependency_indices: &self.dependency_indices,
3762 options: &self.options,
3763 };
3764 let comparable_other = ComparableUnnest {
3765 input: &other.input,
3766 exec_columns: &other.exec_columns,
3767 list_type_columns: &other.list_type_columns,
3768 struct_type_columns: &other.struct_type_columns,
3769 dependency_indices: &other.dependency_indices,
3770 options: &other.options,
3771 };
3772 comparable_self.partial_cmp(&comparable_other)
3773 }
3774}
3775
3776#[cfg(test)]
3777mod tests {
3778
3779 use super::*;
3780 use crate::builder::LogicalTableSource;
3781 use crate::logical_plan::table_scan;
3782 use crate::{
3783 col, exists, in_subquery, lit, placeholder, scalar_subquery, GroupingSet,
3784 };
3785
3786 use datafusion_common::tree_node::{
3787 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
3788 };
3789 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
3790
3791 use crate::test::function_stub::count;
3792
3793 fn employee_schema() -> Schema {
3794 Schema::new(vec![
3795 Field::new("id", DataType::Int32, false),
3796 Field::new("first_name", DataType::Utf8, false),
3797 Field::new("last_name", DataType::Utf8, false),
3798 Field::new("state", DataType::Utf8, false),
3799 Field::new("salary", DataType::Int32, false),
3800 ])
3801 }
3802
3803 fn display_plan() -> Result<LogicalPlan> {
3804 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
3805 .build()?;
3806
3807 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
3808 .filter(in_subquery(col("state"), Arc::new(plan1)))?
3809 .project(vec![col("id")])?
3810 .build()
3811 }
3812
3813 #[test]
3814 fn test_display_indent() -> Result<()> {
3815 let plan = display_plan()?;
3816
3817 let expected = "Projection: employee_csv.id\
3818 \n Filter: employee_csv.state IN (<subquery>)\
3819 \n Subquery:\
3820 \n TableScan: employee_csv projection=[state]\
3821 \n TableScan: employee_csv projection=[id, state]";
3822
3823 assert_eq!(expected, format!("{}", plan.display_indent()));
3824 Ok(())
3825 }
3826
3827 #[test]
3828 fn test_display_indent_schema() -> Result<()> {
3829 let plan = display_plan()?;
3830
3831 let expected = "Projection: employee_csv.id [id:Int32]\
3832 \n Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]\
3833 \n Subquery: [state:Utf8]\
3834 \n TableScan: employee_csv projection=[state] [state:Utf8]\
3835 \n TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";
3836
3837 assert_eq!(expected, format!("{}", plan.display_indent_schema()));
3838 Ok(())
3839 }
3840
3841 #[test]
3842 fn test_display_subquery_alias() -> Result<()> {
3843 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
3844 .build()?;
3845 let plan1 = Arc::new(plan1);
3846
3847 let plan =
3848 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
3849 .project(vec![col("id"), exists(plan1).alias("exists")])?
3850 .build();
3851
3852 let expected = "Projection: employee_csv.id, EXISTS (<subquery>) AS exists\
3853 \n Subquery:\
3854 \n TableScan: employee_csv projection=[state]\
3855 \n TableScan: employee_csv projection=[id, state]";
3856
3857 assert_eq!(expected, format!("{}", plan?.display_indent()));
3858 Ok(())
3859 }
3860
3861 #[test]
3862 fn test_display_graphviz() -> Result<()> {
3863 let plan = display_plan()?;
3864
3865 let expected_graphviz = r#"
3866// Begin DataFusion GraphViz Plan,
3867// display it online here: https://dreampuf.github.io/GraphvizOnline
3868
3869digraph {
3870 subgraph cluster_1
3871 {
3872 graph[label="LogicalPlan"]
3873 2[shape=box label="Projection: employee_csv.id"]
3874 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
3875 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
3876 4[shape=box label="Subquery:"]
3877 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
3878 5[shape=box label="TableScan: employee_csv projection=[state]"]
3879 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
3880 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
3881 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
3882 }
3883 subgraph cluster_7
3884 {
3885 graph[label="Detailed LogicalPlan"]
3886 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
3887 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
3888 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
3889 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
3890 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
3891 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
3892 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
3893 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
3894 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
3895 }
3896}
3897// End DataFusion GraphViz Plan
3898"#;
3899
3900 let graphviz = format!("{}", plan.display_graphviz());
3903
3904 assert_eq!(expected_graphviz, graphviz);
3905 Ok(())
3906 }
3907
3908 #[test]
3909 fn test_display_pg_json() -> Result<()> {
3910 let plan = display_plan()?;
3911
3912 let expected_pg_json = r#"[
3913 {
3914 "Plan": {
3915 "Expressions": [
3916 "employee_csv.id"
3917 ],
3918 "Node Type": "Projection",
3919 "Output": [
3920 "id"
3921 ],
3922 "Plans": [
3923 {
3924 "Condition": "employee_csv.state IN (<subquery>)",
3925 "Node Type": "Filter",
3926 "Output": [
3927 "id",
3928 "state"
3929 ],
3930 "Plans": [
3931 {
3932 "Node Type": "Subquery",
3933 "Output": [
3934 "state"
3935 ],
3936 "Plans": [
3937 {
3938 "Node Type": "TableScan",
3939 "Output": [
3940 "state"
3941 ],
3942 "Plans": [],
3943 "Relation Name": "employee_csv"
3944 }
3945 ]
3946 },
3947 {
3948 "Node Type": "TableScan",
3949 "Output": [
3950 "id",
3951 "state"
3952 ],
3953 "Plans": [],
3954 "Relation Name": "employee_csv"
3955 }
3956 ]
3957 }
3958 ]
3959 }
3960 }
3961]"#;
3962
3963 let pg_json = format!("{}", plan.display_pg_json());
3964
3965 assert_eq!(expected_pg_json, pg_json);
3966 Ok(())
3967 }
3968
3969 #[derive(Debug, Default)]
3971 struct OkVisitor {
3972 strings: Vec<String>,
3973 }
3974
3975 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
3976 type Node = LogicalPlan;
3977
3978 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
3979 let s = match plan {
3980 LogicalPlan::Projection { .. } => "pre_visit Projection",
3981 LogicalPlan::Filter { .. } => "pre_visit Filter",
3982 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
3983 _ => {
3984 return not_impl_err!("unknown plan type");
3985 }
3986 };
3987
3988 self.strings.push(s.into());
3989 Ok(TreeNodeRecursion::Continue)
3990 }
3991
3992 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
3993 let s = match plan {
3994 LogicalPlan::Projection { .. } => "post_visit Projection",
3995 LogicalPlan::Filter { .. } => "post_visit Filter",
3996 LogicalPlan::TableScan { .. } => "post_visit TableScan",
3997 _ => {
3998 return not_impl_err!("unknown plan type");
3999 }
4000 };
4001
4002 self.strings.push(s.into());
4003 Ok(TreeNodeRecursion::Continue)
4004 }
4005 }
4006
4007 #[test]
4008 fn visit_order() {
4009 let mut visitor = OkVisitor::default();
4010 let plan = test_plan();
4011 let res = plan.visit_with_subqueries(&mut visitor);
4012 assert!(res.is_ok());
4013
4014 assert_eq!(
4015 visitor.strings,
4016 vec![
4017 "pre_visit Projection",
4018 "pre_visit Filter",
4019 "pre_visit TableScan",
4020 "post_visit TableScan",
4021 "post_visit Filter",
4022 "post_visit Projection",
4023 ]
4024 );
4025 }
4026
4027 #[derive(Debug, Default)]
4028 struct OptionalCounter {
4030 val: Option<usize>,
4031 }
4032
4033 impl OptionalCounter {
4034 fn new(val: usize) -> Self {
4035 Self { val: Some(val) }
4036 }
4037 fn dec(&mut self) -> bool {
4039 if Some(0) == self.val {
4040 true
4041 } else {
4042 self.val = self.val.take().map(|i| i - 1);
4043 false
4044 }
4045 }
4046 }
4047
4048 #[derive(Debug, Default)]
4049 struct StoppingVisitor {
4051 inner: OkVisitor,
4052 return_false_from_pre_in: OptionalCounter,
4054 return_false_from_post_in: OptionalCounter,
4056 }
4057
4058 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4059 type Node = LogicalPlan;
4060
4061 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4062 if self.return_false_from_pre_in.dec() {
4063 return Ok(TreeNodeRecursion::Stop);
4064 }
4065 self.inner.f_down(plan)?;
4066
4067 Ok(TreeNodeRecursion::Continue)
4068 }
4069
4070 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4071 if self.return_false_from_post_in.dec() {
4072 return Ok(TreeNodeRecursion::Stop);
4073 }
4074
4075 self.inner.f_up(plan)
4076 }
4077 }
4078
4079 #[test]
4081 fn early_stopping_pre_visit() {
4082 let mut visitor = StoppingVisitor {
4083 return_false_from_pre_in: OptionalCounter::new(2),
4084 ..Default::default()
4085 };
4086 let plan = test_plan();
4087 let res = plan.visit_with_subqueries(&mut visitor);
4088 assert!(res.is_ok());
4089
4090 assert_eq!(
4091 visitor.inner.strings,
4092 vec!["pre_visit Projection", "pre_visit Filter"]
4093 );
4094 }
4095
4096 #[test]
4097 fn early_stopping_post_visit() {
4098 let mut visitor = StoppingVisitor {
4099 return_false_from_post_in: OptionalCounter::new(1),
4100 ..Default::default()
4101 };
4102 let plan = test_plan();
4103 let res = plan.visit_with_subqueries(&mut visitor);
4104 assert!(res.is_ok());
4105
4106 assert_eq!(
4107 visitor.inner.strings,
4108 vec![
4109 "pre_visit Projection",
4110 "pre_visit Filter",
4111 "pre_visit TableScan",
4112 "post_visit TableScan",
4113 ]
4114 );
4115 }
4116
4117 #[derive(Debug, Default)]
4118 struct ErrorVisitor {
4120 inner: OkVisitor,
4121 return_error_from_pre_in: OptionalCounter,
4123 return_error_from_post_in: OptionalCounter,
4125 }
4126
4127 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4128 type Node = LogicalPlan;
4129
4130 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4131 if self.return_error_from_pre_in.dec() {
4132 return not_impl_err!("Error in pre_visit");
4133 }
4134
4135 self.inner.f_down(plan)
4136 }
4137
4138 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4139 if self.return_error_from_post_in.dec() {
4140 return not_impl_err!("Error in post_visit");
4141 }
4142
4143 self.inner.f_up(plan)
4144 }
4145 }
4146
4147 #[test]
4148 fn error_pre_visit() {
4149 let mut visitor = ErrorVisitor {
4150 return_error_from_pre_in: OptionalCounter::new(2),
4151 ..Default::default()
4152 };
4153 let plan = test_plan();
4154 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4155 assert_eq!(
4156 "This feature is not implemented: Error in pre_visit",
4157 res.strip_backtrace()
4158 );
4159 assert_eq!(
4160 visitor.inner.strings,
4161 vec!["pre_visit Projection", "pre_visit Filter"]
4162 );
4163 }
4164
4165 #[test]
4166 fn error_post_visit() {
4167 let mut visitor = ErrorVisitor {
4168 return_error_from_post_in: OptionalCounter::new(1),
4169 ..Default::default()
4170 };
4171 let plan = test_plan();
4172 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4173 assert_eq!(
4174 "This feature is not implemented: Error in post_visit",
4175 res.strip_backtrace()
4176 );
4177 assert_eq!(
4178 visitor.inner.strings,
4179 vec![
4180 "pre_visit Projection",
4181 "pre_visit Filter",
4182 "pre_visit TableScan",
4183 "post_visit TableScan",
4184 ]
4185 );
4186 }
4187
4188 #[test]
4189 fn projection_expr_schema_mismatch() -> Result<()> {
4190 let empty_schema = Arc::new(DFSchema::empty());
4191 let p = Projection::try_new_with_schema(
4192 vec![col("a")],
4193 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4194 produce_one_row: false,
4195 schema: Arc::clone(&empty_schema),
4196 })),
4197 empty_schema,
4198 );
4199 assert_eq!(p.err().unwrap().strip_backtrace(), "Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4200 Ok(())
4201 }
4202
4203 fn test_plan() -> LogicalPlan {
4204 let schema = Schema::new(vec![
4205 Field::new("id", DataType::Int32, false),
4206 Field::new("state", DataType::Utf8, false),
4207 ]);
4208
4209 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4210 .unwrap()
4211 .filter(col("state").eq(lit("CO")))
4212 .unwrap()
4213 .project(vec![col("id")])
4214 .unwrap()
4215 .build()
4216 .unwrap()
4217 }
4218
4219 #[test]
4220 fn test_replace_invalid_placeholder() {
4221 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4223
4224 let plan = table_scan(TableReference::none(), &schema, None)
4225 .unwrap()
4226 .filter(col("id").eq(placeholder("")))
4227 .unwrap()
4228 .build()
4229 .unwrap();
4230
4231 let param_values = vec![ScalarValue::Int32(Some(42))];
4232 plan.replace_params_with_values(¶m_values.clone().into())
4233 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4234
4235 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4237
4238 let plan = table_scan(TableReference::none(), &schema, None)
4239 .unwrap()
4240 .filter(col("id").eq(placeholder("$0")))
4241 .unwrap()
4242 .build()
4243 .unwrap();
4244
4245 plan.replace_params_with_values(¶m_values.clone().into())
4246 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4247
4248 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4250
4251 let plan = table_scan(TableReference::none(), &schema, None)
4252 .unwrap()
4253 .filter(col("id").eq(placeholder("$00")))
4254 .unwrap()
4255 .build()
4256 .unwrap();
4257
4258 plan.replace_params_with_values(¶m_values.into())
4259 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4260 }
4261
4262 #[test]
4263 fn test_nullable_schema_after_grouping_set() {
4264 let schema = Schema::new(vec![
4265 Field::new("foo", DataType::Int32, false),
4266 Field::new("bar", DataType::Int32, false),
4267 ]);
4268
4269 let plan = table_scan(TableReference::none(), &schema, None)
4270 .unwrap()
4271 .aggregate(
4272 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4273 vec![col("foo")],
4274 vec![col("bar")],
4275 ]))],
4276 vec![count(lit(true))],
4277 )
4278 .unwrap()
4279 .build()
4280 .unwrap();
4281
4282 let output_schema = plan.schema();
4283
4284 assert!(output_schema
4285 .field_with_name(None, "foo")
4286 .unwrap()
4287 .is_nullable(),);
4288 assert!(output_schema
4289 .field_with_name(None, "bar")
4290 .unwrap()
4291 .is_nullable());
4292 }
4293
4294 #[test]
4295 fn test_filter_is_scalar() {
4296 let schema =
4298 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4299
4300 let source = Arc::new(LogicalTableSource::new(schema));
4301 let schema = Arc::new(
4302 DFSchema::try_from_qualified_schema(
4303 TableReference::bare("tab"),
4304 &source.schema(),
4305 )
4306 .unwrap(),
4307 );
4308 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4309 table_name: TableReference::bare("tab"),
4310 source: Arc::clone(&source) as Arc<dyn TableSource>,
4311 projection: None,
4312 projected_schema: Arc::clone(&schema),
4313 filters: vec![],
4314 fetch: None,
4315 }));
4316 let col = schema.field_names()[0].clone();
4317
4318 let filter = Filter::try_new(
4319 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))),
4320 scan,
4321 )
4322 .unwrap();
4323 assert!(!filter.is_scalar());
4324 let unique_schema = Arc::new(
4325 schema
4326 .as_ref()
4327 .clone()
4328 .with_functional_dependencies(
4329 FunctionalDependencies::new_from_constraints(
4330 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4331 vec![0],
4332 )])),
4333 1,
4334 ),
4335 )
4336 .unwrap(),
4337 );
4338 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4339 table_name: TableReference::bare("tab"),
4340 source,
4341 projection: None,
4342 projected_schema: Arc::clone(&unique_schema),
4343 filters: vec![],
4344 fetch: None,
4345 }));
4346 let col = schema.field_names()[0].clone();
4347
4348 let filter =
4349 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4350 assert!(filter.is_scalar());
4351 }
4352
4353 #[test]
4354 fn test_transform_explain() {
4355 let schema = Schema::new(vec![
4356 Field::new("foo", DataType::Int32, false),
4357 Field::new("bar", DataType::Int32, false),
4358 ]);
4359
4360 let plan = table_scan(TableReference::none(), &schema, None)
4361 .unwrap()
4362 .explain(false, false)
4363 .unwrap()
4364 .build()
4365 .unwrap();
4366
4367 let external_filter = col("foo").eq(lit(true));
4368
4369 let plan = plan
4372 .transform(|plan| match plan {
4373 LogicalPlan::TableScan(table) => {
4374 let filter = Filter::try_new(
4375 external_filter.clone(),
4376 Arc::new(LogicalPlan::TableScan(table)),
4377 )
4378 .unwrap();
4379 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4380 }
4381 x => Ok(Transformed::no(x)),
4382 })
4383 .data()
4384 .unwrap();
4385
4386 let expected = "Explain\
4387 \n Filter: foo = Boolean(true)\
4388 \n TableScan: ?table?";
4389 let actual = format!("{}", plan.display_indent());
4390 assert_eq!(expected.to_string(), actual)
4391 }
4392
4393 #[test]
4394 fn test_plan_partial_ord() {
4395 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4396 produce_one_row: false,
4397 schema: Arc::new(DFSchema::empty()),
4398 });
4399
4400 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4401 schema: Arc::new(Schema::new(vec![Field::new(
4402 "foo",
4403 DataType::Int32,
4404 false,
4405 )])),
4406 output_schema: DFSchemaRef::new(DFSchema::empty()),
4407 });
4408
4409 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4410 schema: Arc::new(Schema::new(vec![Field::new(
4411 "foo",
4412 DataType::Int32,
4413 false,
4414 )])),
4415 output_schema: DFSchemaRef::new(DFSchema::empty()),
4416 });
4417
4418 assert_eq!(
4419 empty_relation.partial_cmp(&describe_table),
4420 Some(Ordering::Less)
4421 );
4422 assert_eq!(
4423 describe_table.partial_cmp(&empty_relation),
4424 Some(Ordering::Greater)
4425 );
4426 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4427 }
4428
4429 #[test]
4430 fn test_limit_with_new_children() {
4431 let input = Arc::new(LogicalPlan::Values(Values {
4432 schema: Arc::new(DFSchema::empty()),
4433 values: vec![vec![]],
4434 }));
4435 let cases = [
4436 LogicalPlan::Limit(Limit {
4437 skip: None,
4438 fetch: None,
4439 input: Arc::clone(&input),
4440 }),
4441 LogicalPlan::Limit(Limit {
4442 skip: None,
4443 fetch: Some(Box::new(Expr::Literal(
4444 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4445 ))),
4446 input: Arc::clone(&input),
4447 }),
4448 LogicalPlan::Limit(Limit {
4449 skip: Some(Box::new(Expr::Literal(
4450 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4451 ))),
4452 fetch: None,
4453 input: Arc::clone(&input),
4454 }),
4455 LogicalPlan::Limit(Limit {
4456 skip: Some(Box::new(Expr::Literal(
4457 ScalarValue::new_one(&DataType::UInt32).unwrap(),
4458 ))),
4459 fetch: Some(Box::new(Expr::Literal(
4460 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4461 ))),
4462 input,
4463 }),
4464 ];
4465
4466 for limit in cases {
4467 let new_limit = limit
4468 .with_new_exprs(
4469 limit.expressions(),
4470 limit.inputs().into_iter().cloned().collect(),
4471 )
4472 .unwrap();
4473 assert_eq!(limit, new_limit);
4474 }
4475 }
4476
4477 #[test]
4478 fn test_with_subqueries_jump() {
4479 let subquery_schema =
4484 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4485
4486 let subquery_plan =
4487 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4488 .unwrap()
4489 .filter(col("sub_id").eq(lit(0)))
4490 .unwrap()
4491 .build()
4492 .unwrap();
4493
4494 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4495
4496 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
4497 .unwrap()
4498 .filter(col("id").eq(lit(0)))
4499 .unwrap()
4500 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
4501 .unwrap()
4502 .build()
4503 .unwrap();
4504
4505 let mut filter_found = false;
4506 plan.apply_with_subqueries(|plan| {
4507 match plan {
4508 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4509 LogicalPlan::Filter(..) => filter_found = true,
4510 _ => {}
4511 }
4512 Ok(TreeNodeRecursion::Continue)
4513 })
4514 .unwrap();
4515 assert!(!filter_found);
4516
4517 struct ProjectJumpVisitor {
4518 filter_found: bool,
4519 }
4520
4521 impl ProjectJumpVisitor {
4522 fn new() -> Self {
4523 Self {
4524 filter_found: false,
4525 }
4526 }
4527 }
4528
4529 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
4530 type Node = LogicalPlan;
4531
4532 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
4533 match node {
4534 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4535 LogicalPlan::Filter(..) => self.filter_found = true,
4536 _ => {}
4537 }
4538 Ok(TreeNodeRecursion::Continue)
4539 }
4540 }
4541
4542 let mut visitor = ProjectJumpVisitor::new();
4543 plan.visit_with_subqueries(&mut visitor).unwrap();
4544 assert!(!visitor.filter_found);
4545
4546 let mut filter_found = false;
4547 plan.clone()
4548 .transform_down_with_subqueries(|plan| {
4549 match plan {
4550 LogicalPlan::Projection(..) => {
4551 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
4552 }
4553 LogicalPlan::Filter(..) => filter_found = true,
4554 _ => {}
4555 }
4556 Ok(Transformed::no(plan))
4557 })
4558 .unwrap();
4559 assert!(!filter_found);
4560
4561 let mut filter_found = false;
4562 plan.clone()
4563 .transform_down_up_with_subqueries(
4564 |plan| {
4565 match plan {
4566 LogicalPlan::Projection(..) => {
4567 return Ok(Transformed::new(
4568 plan,
4569 false,
4570 TreeNodeRecursion::Jump,
4571 ))
4572 }
4573 LogicalPlan::Filter(..) => filter_found = true,
4574 _ => {}
4575 }
4576 Ok(Transformed::no(plan))
4577 },
4578 |plan| Ok(Transformed::no(plan)),
4579 )
4580 .unwrap();
4581 assert!(!filter_found);
4582
4583 struct ProjectJumpRewriter {
4584 filter_found: bool,
4585 }
4586
4587 impl ProjectJumpRewriter {
4588 fn new() -> Self {
4589 Self {
4590 filter_found: false,
4591 }
4592 }
4593 }
4594
4595 impl TreeNodeRewriter for ProjectJumpRewriter {
4596 type Node = LogicalPlan;
4597
4598 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
4599 match node {
4600 LogicalPlan::Projection(..) => {
4601 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
4602 }
4603 LogicalPlan::Filter(..) => self.filter_found = true,
4604 _ => {}
4605 }
4606 Ok(Transformed::no(node))
4607 }
4608 }
4609
4610 let mut rewriter = ProjectJumpRewriter::new();
4611 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
4612 assert!(!rewriter.filter_found);
4613 }
4614
4615 #[test]
4616 fn test_with_unresolved_placeholders() {
4617 let field_name = "id";
4618 let placeholder_value = "$1";
4619 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
4620
4621 let plan = table_scan(TableReference::none(), &schema, None)
4622 .unwrap()
4623 .filter(col(field_name).eq(placeholder(placeholder_value)))
4624 .unwrap()
4625 .build()
4626 .unwrap();
4627
4628 let params = plan.get_parameter_types().unwrap();
4630 assert_eq!(params.len(), 1);
4631
4632 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
4633 assert_eq!(parameter_type, None);
4634 }
4635}