1use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, Sort as SortExpr};
28use crate::expr_rewriter::{
29 coerce_plan_expr_for_schema, normalize_col,
30 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31 rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37 Window,
38};
39use crate::utils::{
40 can_hash, columnize_expr, compare_sort_expr, expr_to_columns,
41 find_valid_equijoin_key_pair, group_window_expr_by_sort_keys,
42};
43use crate::{
44 and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
45 Statement, TableProviderFilterPushDown, TableSource, WriteOp,
46};
47
48use super::dml::InsertOp;
49use super::plan::ColumnUnnestList;
50use arrow::compute::can_cast_types;
51use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
52use datafusion_common::display::ToStringifiedPlan;
53use datafusion_common::file_options::file_type::FileType;
54use datafusion_common::{
55 exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
56 plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
57 DataFusionError, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
58};
59use datafusion_expr_common::type_coercion::binary::type_union_resolution;
60
61use indexmap::IndexSet;
62
63pub const UNNAMED_TABLE: &str = "?table?";
65
66#[derive(Default, Debug, Clone)]
68pub struct LogicalPlanBuilderOptions {
69 add_implicit_group_by_exprs: bool,
72}
73
74impl LogicalPlanBuilderOptions {
75 pub fn new() -> Self {
76 Default::default()
77 }
78
79 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
81 self.add_implicit_group_by_exprs = add;
82 self
83 }
84}
85
86#[derive(Debug, Clone)]
124pub struct LogicalPlanBuilder {
125 plan: Arc<LogicalPlan>,
126 options: LogicalPlanBuilderOptions,
127}
128
129impl LogicalPlanBuilder {
130 pub fn new(plan: LogicalPlan) -> Self {
132 Self {
133 plan: Arc::new(plan),
134 options: LogicalPlanBuilderOptions::default(),
135 }
136 }
137
138 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
140 Self {
141 plan,
142 options: LogicalPlanBuilderOptions::default(),
143 }
144 }
145
146 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
147 self.options = options;
148 self
149 }
150
151 pub fn schema(&self) -> &DFSchemaRef {
153 self.plan.schema()
154 }
155
156 pub fn plan(&self) -> &LogicalPlan {
158 &self.plan
159 }
160
161 pub fn empty(produce_one_row: bool) -> Self {
165 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
166 produce_one_row,
167 schema: DFSchemaRef::new(DFSchema::empty()),
168 }))
169 }
170
171 pub fn to_recursive_query(
174 self,
175 name: String,
176 recursive_term: LogicalPlan,
177 is_distinct: bool,
178 ) -> Result<Self> {
179 if is_distinct {
181 return not_impl_err!(
182 "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
183 );
184 }
185 let static_fields_len = self.plan.schema().fields().len();
187 let recursive_fields_len = recursive_term.schema().fields().len();
188 if static_fields_len != recursive_fields_len {
189 return plan_err!(
190 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
191 static_fields_len, recursive_fields_len
192 );
193 }
194 let coerced_recursive_term =
196 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
197 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
198 name,
199 static_term: self.plan,
200 recursive_term: Arc::new(coerced_recursive_term),
201 is_distinct,
202 })))
203 }
204
205 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
213 if values.is_empty() {
214 return plan_err!("Values list cannot be empty");
215 }
216 let n_cols = values[0].len();
217 if n_cols == 0 {
218 return plan_err!("Values list cannot be zero length");
219 }
220 for (i, row) in values.iter().enumerate() {
221 if row.len() != n_cols {
222 return plan_err!(
223 "Inconsistent data length across values list: got {} values in row {} but expected {}",
224 row.len(),
225 i,
226 n_cols
227 );
228 }
229 }
230
231 Self::infer_data(values)
233 }
234
235 pub fn values_with_schema(
245 values: Vec<Vec<Expr>>,
246 schema: &DFSchemaRef,
247 ) -> Result<Self> {
248 if values.is_empty() {
249 return plan_err!("Values list cannot be empty");
250 }
251 let n_cols = schema.fields().len();
252 if n_cols == 0 {
253 return plan_err!("Values list cannot be zero length");
254 }
255 for (i, row) in values.iter().enumerate() {
256 if row.len() != n_cols {
257 return plan_err!(
258 "Inconsistent data length across values list: got {} values in row {} but expected {}",
259 row.len(),
260 i,
261 n_cols
262 );
263 }
264 }
265
266 Self::infer_values_from_schema(values, schema)
268 }
269
270 fn infer_values_from_schema(
271 values: Vec<Vec<Expr>>,
272 schema: &DFSchema,
273 ) -> Result<Self> {
274 let n_cols = values[0].len();
275 let mut fields = ValuesFields::new();
276 for j in 0..n_cols {
277 let field_type = schema.field(j).data_type();
278 let field_nullable = schema.field(j).is_nullable();
279 for row in values.iter() {
280 let value = &row[j];
281 let data_type = value.get_type(schema)?;
282
283 if !data_type.equals_datatype(field_type) {
284 if can_cast_types(&data_type, field_type) {
285 } else {
286 return exec_err!(
287 "type mismatch and can't cast to got {} and {}",
288 data_type,
289 field_type
290 );
291 }
292 }
293 }
294 fields.push(field_type.to_owned(), field_nullable);
295 }
296
297 Self::infer_inner(values, fields, schema)
298 }
299
300 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
301 let n_cols = values[0].len();
302 let schema = DFSchema::empty();
303 let mut fields = ValuesFields::new();
304
305 for j in 0..n_cols {
306 let mut common_type: Option<DataType> = None;
307 for (i, row) in values.iter().enumerate() {
308 let value = &row[j];
309 let data_type = value.get_type(&schema)?;
310 if data_type == DataType::Null {
311 continue;
312 }
313
314 if let Some(prev_type) = common_type {
315 let data_types = vec![prev_type.clone(), data_type.clone()];
317 let Some(new_type) = type_union_resolution(&data_types) else {
318 return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
319 };
320 common_type = Some(new_type);
321 } else {
322 common_type = Some(data_type);
323 }
324 }
325 fields.push(common_type.unwrap_or(DataType::Null), true);
328 }
329
330 Self::infer_inner(values, fields, &schema)
331 }
332
333 fn infer_inner(
334 mut values: Vec<Vec<Expr>>,
335 fields: ValuesFields,
336 schema: &DFSchema,
337 ) -> Result<Self> {
338 let fields = fields.into_fields();
339 for row in &mut values {
341 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
342 if let Expr::Literal(ScalarValue::Null) = row[j] {
343 row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
344 } else {
345 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
346 }
347 }
348 }
349
350 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
351 let schema = DFSchemaRef::new(dfschema);
352
353 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
354 }
355
356 pub fn scan(
389 table_name: impl Into<TableReference>,
390 table_source: Arc<dyn TableSource>,
391 projection: Option<Vec<usize>>,
392 ) -> Result<Self> {
393 Self::scan_with_filters(table_name, table_source, projection, vec![])
394 }
395
396 pub fn copy_to(
398 input: LogicalPlan,
399 output_url: String,
400 file_type: Arc<dyn FileType>,
401 options: HashMap<String, String>,
402 partition_by: Vec<String>,
403 ) -> Result<Self> {
404 Ok(Self::new(LogicalPlan::Copy(CopyTo {
405 input: Arc::new(input),
406 output_url,
407 partition_by,
408 file_type,
409 options,
410 })))
411 }
412
413 pub fn insert_into(
448 input: LogicalPlan,
449 table_name: impl Into<TableReference>,
450 target: Arc<dyn TableSource>,
451 insert_op: InsertOp,
452 ) -> Result<Self> {
453 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
454 table_name.into(),
455 target,
456 WriteOp::Insert(insert_op),
457 Arc::new(input),
458 ))))
459 }
460
461 pub fn scan_with_filters(
463 table_name: impl Into<TableReference>,
464 table_source: Arc<dyn TableSource>,
465 projection: Option<Vec<usize>>,
466 filters: Vec<Expr>,
467 ) -> Result<Self> {
468 TableScan::try_new(table_name, table_source, projection, filters, None)
469 .map(LogicalPlan::TableScan)
470 .map(Self::new)
471 }
472
473 pub fn scan_with_filters_fetch(
475 table_name: impl Into<TableReference>,
476 table_source: Arc<dyn TableSource>,
477 projection: Option<Vec<usize>>,
478 filters: Vec<Expr>,
479 fetch: Option<usize>,
480 ) -> Result<Self> {
481 TableScan::try_new(table_name, table_source, projection, filters, fetch)
482 .map(LogicalPlan::TableScan)
483 .map(Self::new)
484 }
485
486 pub fn window_plan(
488 input: LogicalPlan,
489 window_exprs: Vec<Expr>,
490 ) -> Result<LogicalPlan> {
491 let mut plan = input;
492 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
493 groups.sort_by(|(key_a, _), (key_b, _)| {
499 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
500 let key_ordering = compare_sort_expr(first, second, plan.schema());
501 match key_ordering {
502 Ordering::Less => {
503 return Ordering::Less;
504 }
505 Ordering::Greater => {
506 return Ordering::Greater;
507 }
508 Ordering::Equal => {}
509 }
510 }
511 key_b.len().cmp(&key_a.len())
512 });
513 for (_, exprs) in groups {
514 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
515 plan = LogicalPlanBuilder::from(plan)
518 .window(window_exprs)?
519 .build()?;
520 }
521 Ok(plan)
522 }
523 pub fn project(
525 self,
526 expr: impl IntoIterator<Item = impl Into<Expr>>,
527 ) -> Result<Self> {
528 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
529 }
530
531 pub fn project_with_validation(
534 self,
535 expr: Vec<(impl Into<Expr>, bool)>,
536 ) -> Result<Self> {
537 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
538 }
539
540 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
542 let exprs: Vec<_> = indices
543 .into_iter()
544 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
545 .collect();
546 self.project(exprs)
547 }
548
549 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
551 let expr = normalize_col(expr.into(), &self.plan)?;
552 Filter::try_new(expr, self.plan)
553 .map(LogicalPlan::Filter)
554 .map(Self::new)
555 }
556
557 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
559 let expr = normalize_col(expr.into(), &self.plan)?;
560 Filter::try_new_with_having(expr, self.plan)
561 .map(LogicalPlan::Filter)
562 .map(Self::from)
563 }
564
565 pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
567 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
568 Prepare {
569 name,
570 data_types,
571 input: self.plan,
572 },
573 ))))
574 }
575
576 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
583 let skip_expr = if skip == 0 {
584 None
585 } else {
586 Some(lit(skip as i64))
587 };
588 let fetch_expr = fetch.map(|f| lit(f as i64));
589 self.limit_by_expr(skip_expr, fetch_expr)
590 }
591
592 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
596 Ok(Self::new(LogicalPlan::Limit(Limit {
597 skip: skip.map(Box::new),
598 fetch: fetch.map(Box::new),
599 input: self.plan,
600 })))
601 }
602
603 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
605 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
606 }
607
608 fn add_missing_columns(
637 curr_plan: LogicalPlan,
638 missing_cols: &IndexSet<Column>,
639 is_distinct: bool,
640 ) -> Result<LogicalPlan> {
641 match curr_plan {
642 LogicalPlan::Projection(Projection {
643 input,
644 mut expr,
645 schema: _,
646 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
647 let mut missing_exprs = missing_cols
648 .iter()
649 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
650 .collect::<Result<Vec<_>>>()?;
651
652 missing_exprs.retain(|e| !expr.contains(e));
656 if is_distinct {
657 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
658 }
659 expr.extend(missing_exprs);
660 project(Arc::unwrap_or_clone(input), expr)
661 }
662 _ => {
663 let is_distinct =
664 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
665 let new_inputs = curr_plan
666 .inputs()
667 .into_iter()
668 .map(|input_plan| {
669 Self::add_missing_columns(
670 (*input_plan).clone(),
671 missing_cols,
672 is_distinct,
673 )
674 })
675 .collect::<Result<Vec<_>>>()?;
676 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
677 }
678 }
679 }
680
681 fn ambiguous_distinct_check(
682 missing_exprs: &[Expr],
683 missing_cols: &IndexSet<Column>,
684 projection_exprs: &[Expr],
685 ) -> Result<()> {
686 if missing_exprs.is_empty() {
687 return Ok(());
688 }
689
690 let all_aliases = missing_exprs.iter().all(|e| {
698 projection_exprs.iter().any(|proj_expr| {
699 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
700 e == expr.as_ref()
701 } else {
702 false
703 }
704 })
705 });
706 if all_aliases {
707 return Ok(());
708 }
709
710 let missing_col_names = missing_cols
711 .iter()
712 .map(|col| col.flat_name())
713 .collect::<String>();
714
715 plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
716 }
717
718 pub fn sort_by(
720 self,
721 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
722 ) -> Result<Self> {
723 self.sort(
724 expr.into_iter()
725 .map(|e| e.into().sort(true, false))
726 .collect::<Vec<SortExpr>>(),
727 )
728 }
729
730 pub fn sort(
731 self,
732 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
733 ) -> Result<Self> {
734 self.sort_with_limit(sorts, None)
735 }
736
737 pub fn sort_with_limit(
739 self,
740 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
741 fetch: Option<usize>,
742 ) -> Result<Self> {
743 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
744
745 let schema = self.plan.schema();
746
747 let mut missing_cols: IndexSet<Column> = IndexSet::new();
749 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
750 let columns = sort.expr.column_refs();
751
752 missing_cols.extend(
753 columns
754 .into_iter()
755 .filter(|c| !schema.has_column(c))
756 .cloned(),
757 );
758
759 Ok(())
760 })?;
761
762 if missing_cols.is_empty() {
763 return Ok(Self::new(LogicalPlan::Sort(Sort {
764 expr: normalize_sorts(sorts, &self.plan)?,
765 input: self.plan,
766 fetch,
767 })));
768 }
769
770 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
772
773 let is_distinct = false;
774 let plan = Self::add_missing_columns(
775 Arc::unwrap_or_clone(self.plan),
776 &missing_cols,
777 is_distinct,
778 )?;
779 let sort_plan = LogicalPlan::Sort(Sort {
780 expr: normalize_sorts(sorts, &plan)?,
781 input: Arc::new(plan),
782 fetch,
783 });
784
785 Projection::try_new(new_expr, Arc::new(sort_plan))
786 .map(LogicalPlan::Projection)
787 .map(Self::new)
788 }
789
790 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
792 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
793 }
794
795 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
797 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
798 }
799
800 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
802 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
803 let right_plan: LogicalPlan = plan;
804
805 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
806 union_by_name(left_plan, right_plan)?,
807 )))))
808 }
809
810 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
812 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
813 let right_plan: LogicalPlan = plan;
814
815 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
816 union(left_plan, right_plan)?,
817 )))))
818 }
819
820 pub fn distinct(self) -> Result<Self> {
822 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
823 }
824
825 pub fn distinct_on(
828 self,
829 on_expr: Vec<Expr>,
830 select_expr: Vec<Expr>,
831 sort_expr: Option<Vec<SortExpr>>,
832 ) -> Result<Self> {
833 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
834 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
835 ))))
836 }
837
838 pub fn join(
852 self,
853 right: LogicalPlan,
854 join_type: JoinType,
855 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
856 filter: Option<Expr>,
857 ) -> Result<Self> {
858 self.join_detailed(right, join_type, join_keys, filter, false)
859 }
860
861 pub fn join_on(
902 self,
903 right: LogicalPlan,
904 join_type: JoinType,
905 on_exprs: impl IntoIterator<Item = Expr>,
906 ) -> Result<Self> {
907 let filter = on_exprs.into_iter().reduce(Expr::and);
908
909 self.join_detailed(
910 right,
911 join_type,
912 (Vec::<Column>::new(), Vec::<Column>::new()),
913 filter,
914 false,
915 )
916 }
917
918 pub(crate) fn normalize(
919 plan: &LogicalPlan,
920 column: impl Into<Column>,
921 ) -> Result<Column> {
922 let column = column.into();
923 if column.relation.is_some() {
924 return Ok(column);
926 }
927
928 let schema = plan.schema();
929 let fallback_schemas = plan.fallback_normalize_schemas();
930 let using_columns = plan.using_columns()?;
931 column.normalize_with_schemas_and_ambiguity_check(
932 &[&[schema], &fallback_schemas],
933 &using_columns,
934 )
935 }
936
937 pub fn join_detailed(
946 self,
947 right: LogicalPlan,
948 join_type: JoinType,
949 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
950 filter: Option<Expr>,
951 null_equals_null: bool,
952 ) -> Result<Self> {
953 if join_keys.0.len() != join_keys.1.len() {
954 return plan_err!("left_keys and right_keys were not the same length");
955 }
956
957 let filter = if let Some(expr) = filter {
958 let filter = normalize_col_with_schemas_and_ambiguity_check(
959 expr,
960 &[&[self.schema(), right.schema()]],
961 &[],
962 )?;
963 Some(filter)
964 } else {
965 None
966 };
967
968 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
969 join_keys
970 .0
971 .into_iter()
972 .zip(join_keys.1)
973 .map(|(l, r)| {
974 let l = l.into();
975 let r = r.into();
976
977 match (&l.relation, &r.relation) {
978 (Some(lr), Some(rr)) => {
979 let l_is_left =
980 self.plan.schema().field_with_qualified_name(lr, &l.name);
981 let l_is_right =
982 right.schema().field_with_qualified_name(lr, &l.name);
983 let r_is_left =
984 self.plan.schema().field_with_qualified_name(rr, &r.name);
985 let r_is_right =
986 right.schema().field_with_qualified_name(rr, &r.name);
987
988 match (l_is_left, l_is_right, r_is_left, r_is_right) {
989 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
990 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
991 _ => (
992 Self::normalize(&self.plan, l),
993 Self::normalize(&right, r),
994 ),
995 }
996 }
997 (Some(lr), None) => {
998 let l_is_left =
999 self.plan.schema().field_with_qualified_name(lr, &l.name);
1000 let l_is_right =
1001 right.schema().field_with_qualified_name(lr, &l.name);
1002
1003 match (l_is_left, l_is_right) {
1004 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1005 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1006 _ => (
1007 Self::normalize(&self.plan, l),
1008 Self::normalize(&right, r),
1009 ),
1010 }
1011 }
1012 (None, Some(rr)) => {
1013 let r_is_left =
1014 self.plan.schema().field_with_qualified_name(rr, &r.name);
1015 let r_is_right =
1016 right.schema().field_with_qualified_name(rr, &r.name);
1017
1018 match (r_is_left, r_is_right) {
1019 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1020 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1021 _ => (
1022 Self::normalize(&self.plan, l),
1023 Self::normalize(&right, r),
1024 ),
1025 }
1026 }
1027 (None, None) => {
1028 let mut swap = false;
1029 let left_key = Self::normalize(&self.plan, l.clone())
1030 .or_else(|_| {
1031 swap = true;
1032 Self::normalize(&right, l)
1033 });
1034 if swap {
1035 (Self::normalize(&self.plan, r), left_key)
1036 } else {
1037 (left_key, Self::normalize(&right, r))
1038 }
1039 }
1040 }
1041 })
1042 .unzip();
1043
1044 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1045 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1046
1047 let on = left_keys
1048 .into_iter()
1049 .zip(right_keys)
1050 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1051 .collect();
1052 let join_schema =
1053 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1054
1055 Ok(Self::new(LogicalPlan::Join(Join {
1056 left: self.plan,
1057 right: Arc::new(right),
1058 on,
1059 filter,
1060 join_type,
1061 join_constraint: JoinConstraint::On,
1062 schema: DFSchemaRef::new(join_schema),
1063 null_equals_null,
1064 })))
1065 }
1066
1067 pub fn join_using(
1069 self,
1070 right: LogicalPlan,
1071 join_type: JoinType,
1072 using_keys: Vec<impl Into<Column> + Clone>,
1073 ) -> Result<Self> {
1074 let left_keys: Vec<Column> = using_keys
1075 .clone()
1076 .into_iter()
1077 .map(|c| Self::normalize(&self.plan, c))
1078 .collect::<Result<_>>()?;
1079 let right_keys: Vec<Column> = using_keys
1080 .into_iter()
1081 .map(|c| Self::normalize(&right, c))
1082 .collect::<Result<_>>()?;
1083
1084 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1085 let join_schema =
1086 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1087 let mut join_on: Vec<(Expr, Expr)> = vec![];
1088 let mut filters: Option<Expr> = None;
1089 for (l, r) in &on {
1090 if self.plan.schema().has_column(l)
1091 && right.schema().has_column(r)
1092 && can_hash(self.plan.schema().field_from_column(l)?.data_type())
1093 {
1094 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1095 } else if self.plan.schema().has_column(l)
1096 && right.schema().has_column(r)
1097 && can_hash(self.plan.schema().field_from_column(r)?.data_type())
1098 {
1099 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1100 } else {
1101 let expr = binary_expr(
1102 Expr::Column(l.clone()),
1103 Operator::Eq,
1104 Expr::Column(r.clone()),
1105 );
1106 match filters {
1107 None => filters = Some(expr),
1108 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1109 }
1110 }
1111 }
1112
1113 if join_on.is_empty() {
1114 let join = Self::from(self.plan).cross_join(right)?;
1115 join.filter(filters.ok_or_else(|| {
1116 DataFusionError::Internal("filters should not be None here".to_string())
1117 })?)
1118 } else {
1119 Ok(Self::new(LogicalPlan::Join(Join {
1120 left: self.plan,
1121 right: Arc::new(right),
1122 on: join_on,
1123 filter: filters,
1124 join_type,
1125 join_constraint: JoinConstraint::Using,
1126 schema: DFSchemaRef::new(join_schema),
1127 null_equals_null: false,
1128 })))
1129 }
1130 }
1131
1132 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1134 let join_schema =
1135 build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
1136 Ok(Self::new(LogicalPlan::Join(Join {
1137 left: self.plan,
1138 right: Arc::new(right),
1139 on: vec![],
1140 filter: None,
1141 join_type: JoinType::Inner,
1142 join_constraint: JoinConstraint::On,
1143 null_equals_null: false,
1144 schema: DFSchemaRef::new(join_schema),
1145 })))
1146 }
1147
1148 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1150 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1151 input: self.plan,
1152 partitioning_scheme,
1153 })))
1154 }
1155
1156 pub fn window(
1158 self,
1159 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1160 ) -> Result<Self> {
1161 let window_expr = normalize_cols(window_expr, &self.plan)?;
1162 validate_unique_names("Windows", &window_expr)?;
1163 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1164 window_expr,
1165 self.plan,
1166 )?)))
1167 }
1168
1169 pub fn aggregate(
1173 self,
1174 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1175 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1176 ) -> Result<Self> {
1177 let group_expr = normalize_cols(group_expr, &self.plan)?;
1178 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1179
1180 let group_expr = if self.options.add_implicit_group_by_exprs {
1181 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1182 } else {
1183 group_expr
1184 };
1185
1186 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1187 .map(LogicalPlan::Aggregate)
1188 .map(Self::new)
1189 }
1190
1191 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1198 let schema = LogicalPlan::explain_schema();
1199 let schema = schema.to_dfschema_ref()?;
1200
1201 if analyze {
1202 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1203 verbose,
1204 input: self.plan,
1205 schema,
1206 })))
1207 } else {
1208 let stringified_plans =
1209 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1210
1211 Ok(Self::new(LogicalPlan::Explain(Explain {
1212 verbose,
1213 plan: self.plan,
1214 stringified_plans,
1215 schema,
1216 logical_optimization_succeeded: false,
1217 })))
1218 }
1219 }
1220
1221 pub fn intersect(
1223 left_plan: LogicalPlan,
1224 right_plan: LogicalPlan,
1225 is_all: bool,
1226 ) -> Result<LogicalPlan> {
1227 LogicalPlanBuilder::intersect_or_except(
1228 left_plan,
1229 right_plan,
1230 JoinType::LeftSemi,
1231 is_all,
1232 )
1233 }
1234
1235 pub fn except(
1237 left_plan: LogicalPlan,
1238 right_plan: LogicalPlan,
1239 is_all: bool,
1240 ) -> Result<LogicalPlan> {
1241 LogicalPlanBuilder::intersect_or_except(
1242 left_plan,
1243 right_plan,
1244 JoinType::LeftAnti,
1245 is_all,
1246 )
1247 }
1248
1249 fn intersect_or_except(
1251 left_plan: LogicalPlan,
1252 right_plan: LogicalPlan,
1253 join_type: JoinType,
1254 is_all: bool,
1255 ) -> Result<LogicalPlan> {
1256 let left_len = left_plan.schema().fields().len();
1257 let right_len = right_plan.schema().fields().len();
1258
1259 if left_len != right_len {
1260 return plan_err!(
1261 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1262 );
1263 }
1264
1265 let join_keys = left_plan
1266 .schema()
1267 .fields()
1268 .iter()
1269 .zip(right_plan.schema().fields().iter())
1270 .map(|(left_field, right_field)| {
1271 (
1272 (Column::from_name(left_field.name())),
1273 (Column::from_name(right_field.name())),
1274 )
1275 })
1276 .unzip();
1277 if is_all {
1278 LogicalPlanBuilder::from(left_plan)
1279 .join_detailed(right_plan, join_type, join_keys, None, true)?
1280 .build()
1281 } else {
1282 LogicalPlanBuilder::from(left_plan)
1283 .distinct()?
1284 .join_detailed(right_plan, join_type, join_keys, None, true)?
1285 .build()
1286 }
1287 }
1288
1289 pub fn build(self) -> Result<LogicalPlan> {
1291 Ok(Arc::unwrap_or_clone(self.plan))
1292 }
1293
1294 pub fn join_with_expr_keys(
1309 self,
1310 right: LogicalPlan,
1311 join_type: JoinType,
1312 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1313 filter: Option<Expr>,
1314 ) -> Result<Self> {
1315 if equi_exprs.0.len() != equi_exprs.1.len() {
1316 return plan_err!("left_keys and right_keys were not the same length");
1317 }
1318
1319 let join_key_pairs = equi_exprs
1320 .0
1321 .into_iter()
1322 .zip(equi_exprs.1)
1323 .map(|(l, r)| {
1324 let left_key = l.into();
1325 let right_key = r.into();
1326 let mut left_using_columns = HashSet::new();
1327 expr_to_columns(&left_key, &mut left_using_columns)?;
1328 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1329 left_key,
1330 &[&[self.plan.schema()]],
1331 &[],
1332 )?;
1333
1334 let mut right_using_columns = HashSet::new();
1335 expr_to_columns(&right_key, &mut right_using_columns)?;
1336 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1337 right_key,
1338 &[&[right.schema()]],
1339 &[],
1340 )?;
1341
1342 find_valid_equijoin_key_pair(
1344 &normalized_left_key,
1345 &normalized_right_key,
1346 self.plan.schema(),
1347 right.schema(),
1348 )?.ok_or_else(||
1349 plan_datafusion_err!(
1350 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1351 ))
1352 })
1353 .collect::<Result<Vec<_>>>()?;
1354
1355 let join_schema =
1356 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1357
1358 Ok(Self::new(LogicalPlan::Join(Join {
1359 left: self.plan,
1360 right: Arc::new(right),
1361 on: join_key_pairs,
1362 filter,
1363 join_type,
1364 join_constraint: JoinConstraint::On,
1365 schema: DFSchemaRef::new(join_schema),
1366 null_equals_null: false,
1367 })))
1368 }
1369
1370 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1372 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1373 }
1374
1375 pub fn unnest_column_with_options(
1377 self,
1378 column: impl Into<Column>,
1379 options: UnnestOptions,
1380 ) -> Result<Self> {
1381 unnest_with_options(
1382 Arc::unwrap_or_clone(self.plan),
1383 vec![column.into()],
1384 options,
1385 )
1386 .map(Self::new)
1387 }
1388
1389 pub fn unnest_columns_with_options(
1391 self,
1392 columns: Vec<Column>,
1393 options: UnnestOptions,
1394 ) -> Result<Self> {
1395 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1396 .map(Self::new)
1397 }
1398}
1399
1400impl From<LogicalPlan> for LogicalPlanBuilder {
1401 fn from(plan: LogicalPlan) -> Self {
1402 LogicalPlanBuilder::new(plan)
1403 }
1404}
1405
1406impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1407 fn from(plan: Arc<LogicalPlan>) -> Self {
1408 LogicalPlanBuilder::new_from_arc(plan)
1409 }
1410}
1411
1412#[derive(Default)]
1414struct ValuesFields {
1415 inner: Vec<Field>,
1416}
1417
1418impl ValuesFields {
1419 pub fn new() -> Self {
1420 Self::default()
1421 }
1422
1423 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1424 let name = format!("column{}", self.inner.len() + 1);
1427 self.inner.push(Field::new(name, data_type, nullable));
1428 }
1429
1430 pub fn into_fields(self) -> Fields {
1431 self.inner.into()
1432 }
1433}
1434
1435pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1436 let mut name_map = HashMap::new();
1437 fields
1438 .into_iter()
1439 .map(|field| {
1440 let counter = name_map.entry(field.name().to_string()).or_insert(0);
1441 *counter += 1;
1442 if *counter > 1 {
1443 let new_name = format!("{}:{}", field.name(), *counter - 1);
1444 Field::new(new_name, field.data_type().clone(), field.is_nullable())
1445 } else {
1446 field.as_ref().clone()
1447 }
1448 })
1449 .collect()
1450}
1451
1452fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1453 let mut table_references = schema
1454 .iter()
1455 .filter_map(|(qualifier, _)| qualifier)
1456 .collect::<Vec<_>>();
1457 table_references.dedup();
1458 let table_reference = if table_references.len() == 1 {
1459 table_references.pop().cloned()
1460 } else {
1461 None
1462 };
1463
1464 (
1465 table_reference,
1466 Arc::new(Field::new("mark", DataType::Boolean, false)),
1467 )
1468}
1469
1470pub fn build_join_schema(
1473 left: &DFSchema,
1474 right: &DFSchema,
1475 join_type: &JoinType,
1476) -> Result<DFSchema> {
1477 fn nullify_fields<'a>(
1478 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1479 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1480 fields
1481 .map(|(q, f)| {
1482 let field = f.as_ref().clone().with_nullable(true);
1484 (q.cloned(), Arc::new(field))
1485 })
1486 .collect()
1487 }
1488
1489 let right_fields = right.iter();
1490 let left_fields = left.iter();
1491
1492 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1493 JoinType::Inner => {
1494 let left_fields = left_fields
1496 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1497 .collect::<Vec<_>>();
1498 let right_fields = right_fields
1499 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1500 .collect::<Vec<_>>();
1501 left_fields.into_iter().chain(right_fields).collect()
1502 }
1503 JoinType::Left => {
1504 let left_fields = left_fields
1506 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1507 .collect::<Vec<_>>();
1508 left_fields
1509 .into_iter()
1510 .chain(nullify_fields(right_fields))
1511 .collect()
1512 }
1513 JoinType::Right => {
1514 let right_fields = right_fields
1516 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1517 .collect::<Vec<_>>();
1518 nullify_fields(left_fields)
1519 .into_iter()
1520 .chain(right_fields)
1521 .collect()
1522 }
1523 JoinType::Full => {
1524 nullify_fields(left_fields)
1526 .into_iter()
1527 .chain(nullify_fields(right_fields))
1528 .collect()
1529 }
1530 JoinType::LeftSemi | JoinType::LeftAnti => {
1531 left_fields
1533 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1534 .collect()
1535 }
1536 JoinType::LeftMark => left_fields
1537 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1538 .chain(once(mark_field(right)))
1539 .collect(),
1540 JoinType::RightSemi | JoinType::RightAnti => {
1541 right_fields
1543 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1544 .collect()
1545 }
1546 };
1547 let func_dependencies = left.functional_dependencies().join(
1548 right.functional_dependencies(),
1549 join_type,
1550 left.fields().len(),
1551 );
1552 let metadata = left
1553 .metadata()
1554 .clone()
1555 .into_iter()
1556 .chain(right.metadata().clone())
1557 .collect();
1558 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1559 dfschema.with_functional_dependencies(func_dependencies)
1560}
1561
1562pub fn add_group_by_exprs_from_dependencies(
1572 mut group_expr: Vec<Expr>,
1573 schema: &DFSchemaRef,
1574) -> Result<Vec<Expr>> {
1575 let mut group_by_field_names = group_expr
1578 .iter()
1579 .map(|e| e.schema_name().to_string())
1580 .collect::<Vec<_>>();
1581
1582 if let Some(target_indices) =
1583 get_target_functional_dependencies(schema, &group_by_field_names)
1584 {
1585 for idx in target_indices {
1586 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1587 let expr_name = expr.schema_name().to_string();
1588 if !group_by_field_names.contains(&expr_name) {
1589 group_by_field_names.push(expr_name);
1590 group_expr.push(expr);
1591 }
1592 }
1593 }
1594 Ok(group_expr)
1595}
1596
1597pub fn validate_unique_names<'a>(
1599 node_name: &str,
1600 expressions: impl IntoIterator<Item = &'a Expr>,
1601) -> Result<()> {
1602 let mut unique_names = HashMap::new();
1603
1604 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1605 let name = expr.schema_name().to_string();
1606 match unique_names.get(&name) {
1607 None => {
1608 unique_names.insert(name, (position, expr));
1609 Ok(())
1610 },
1611 Some((existing_position, existing_expr)) => {
1612 plan_err!("{node_name} require unique expression names \
1613 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1614 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1615 )
1616 }
1617 }
1618 })
1619}
1620
1621pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1633 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1634 Arc::new(left_plan),
1635 Arc::new(right_plan),
1636 ])?))
1637}
1638
1639pub fn union_by_name(
1642 left_plan: LogicalPlan,
1643 right_plan: LogicalPlan,
1644) -> Result<LogicalPlan> {
1645 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1646 Arc::new(left_plan),
1647 Arc::new(right_plan),
1648 ])?))
1649}
1650
1651pub fn project(
1657 plan: LogicalPlan,
1658 expr: impl IntoIterator<Item = impl Into<Expr>>,
1659) -> Result<LogicalPlan> {
1660 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1661}
1662
1663fn project_with_validation(
1671 plan: LogicalPlan,
1672 expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
1673) -> Result<LogicalPlan> {
1674 let mut projected_expr = vec![];
1675 for (e, validate) in expr {
1676 let e = e.into();
1677 match e {
1678 #[expect(deprecated)]
1679 Expr::Wildcard { .. } => projected_expr.push(e),
1680 _ => {
1681 if validate {
1682 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1683 } else {
1684 projected_expr.push(e)
1685 }
1686 }
1687 }
1688 }
1689 validate_unique_names("Projections", projected_expr.iter())?;
1690
1691 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1692}
1693
1694pub fn subquery_alias(
1696 plan: LogicalPlan,
1697 alias: impl Into<TableReference>,
1698) -> Result<LogicalPlan> {
1699 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1700}
1701
1702pub fn table_scan(
1705 name: Option<impl Into<TableReference>>,
1706 table_schema: &Schema,
1707 projection: Option<Vec<usize>>,
1708) -> Result<LogicalPlanBuilder> {
1709 table_scan_with_filters(name, table_schema, projection, vec![])
1710}
1711
1712pub fn table_scan_with_filters(
1716 name: Option<impl Into<TableReference>>,
1717 table_schema: &Schema,
1718 projection: Option<Vec<usize>>,
1719 filters: Vec<Expr>,
1720) -> Result<LogicalPlanBuilder> {
1721 let table_source = table_source(table_schema);
1722 let name = name
1723 .map(|n| n.into())
1724 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1725 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1726}
1727
1728pub fn table_scan_with_filter_and_fetch(
1732 name: Option<impl Into<TableReference>>,
1733 table_schema: &Schema,
1734 projection: Option<Vec<usize>>,
1735 filters: Vec<Expr>,
1736 fetch: Option<usize>,
1737) -> Result<LogicalPlanBuilder> {
1738 let table_source = table_source(table_schema);
1739 let name = name
1740 .map(|n| n.into())
1741 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1742 LogicalPlanBuilder::scan_with_filters_fetch(
1743 name,
1744 table_source,
1745 projection,
1746 filters,
1747 fetch,
1748 )
1749}
1750
1751pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1752 let table_schema = Arc::new(table_schema.clone());
1753 Arc::new(LogicalTableSource {
1754 table_schema,
1755 constraints: Default::default(),
1756 })
1757}
1758
1759pub fn table_source_with_constraints(
1760 table_schema: &Schema,
1761 constraints: Constraints,
1762) -> Arc<dyn TableSource> {
1763 let table_schema = Arc::new(table_schema.clone());
1764 Arc::new(LogicalTableSource {
1765 table_schema,
1766 constraints,
1767 })
1768}
1769
1770pub fn wrap_projection_for_join_if_necessary(
1772 join_keys: &[Expr],
1773 input: LogicalPlan,
1774) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1775 let input_schema = input.schema();
1776 let alias_join_keys: Vec<Expr> = join_keys
1777 .iter()
1778 .map(|key| {
1779 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1788 let alias = format!("{key}");
1789 key.clone().alias(alias)
1790 } else {
1791 key.clone()
1792 }
1793 })
1794 .collect::<Vec<_>>();
1795
1796 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
1797 let plan = if need_project {
1798 let mut projection = input_schema
1800 .columns()
1801 .into_iter()
1802 .map(Expr::Column)
1803 .collect::<Vec<_>>();
1804 let join_key_items = alias_join_keys
1805 .iter()
1806 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
1807 .cloned()
1808 .collect::<HashSet<Expr>>();
1809 projection.extend(join_key_items);
1810
1811 LogicalPlanBuilder::from(input)
1812 .project(projection)?
1813 .build()?
1814 } else {
1815 input
1816 };
1817
1818 let join_on = alias_join_keys
1819 .into_iter()
1820 .map(|key| {
1821 if let Some(col) = key.try_as_col() {
1822 Ok(col.clone())
1823 } else {
1824 let name = key.schema_name().to_string();
1825 Ok(Column::from_name(name))
1826 }
1827 })
1828 .collect::<Result<Vec<_>>>()?;
1829
1830 Ok((plan, join_on, need_project))
1831}
1832
1833pub struct LogicalTableSource {
1837 table_schema: SchemaRef,
1838 constraints: Constraints,
1839}
1840
1841impl LogicalTableSource {
1842 pub fn new(table_schema: SchemaRef) -> Self {
1844 Self {
1845 table_schema,
1846 constraints: Constraints::default(),
1847 }
1848 }
1849
1850 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1851 self.constraints = constraints;
1852 self
1853 }
1854}
1855
1856impl TableSource for LogicalTableSource {
1857 fn as_any(&self) -> &dyn Any {
1858 self
1859 }
1860
1861 fn schema(&self) -> SchemaRef {
1862 Arc::clone(&self.table_schema)
1863 }
1864
1865 fn constraints(&self) -> Option<&Constraints> {
1866 Some(&self.constraints)
1867 }
1868
1869 fn supports_filters_pushdown(
1870 &self,
1871 filters: &[&Expr],
1872 ) -> Result<Vec<TableProviderFilterPushDown>> {
1873 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
1874 }
1875}
1876
1877pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
1879 unnest_with_options(input, columns, UnnestOptions::default())
1880}
1881
1882fn get_unnested_list_datatype_recursive(
1885 data_type: &DataType,
1886 depth: usize,
1887) -> Result<DataType> {
1888 match data_type {
1889 DataType::List(field)
1890 | DataType::FixedSizeList(field, _)
1891 | DataType::LargeList(field) => {
1892 if depth == 1 {
1893 return Ok(field.data_type().clone());
1894 }
1895 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
1896 }
1897 _ => {}
1898 };
1899
1900 internal_err!("trying to unnest on invalid data type {:?}", data_type)
1901}
1902
1903pub fn get_struct_unnested_columns(
1904 col_name: &String,
1905 inner_fields: &Fields,
1906) -> Vec<Column> {
1907 inner_fields
1908 .iter()
1909 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
1910 .collect()
1911}
1912
1913pub fn get_unnested_columns(
1922 col_name: &String,
1923 data_type: &DataType,
1924 depth: usize,
1925) -> Result<Vec<(Column, Arc<Field>)>> {
1926 let mut qualified_columns = Vec::with_capacity(1);
1927
1928 match data_type {
1929 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
1930 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
1931 let new_field = Arc::new(Field::new(
1932 col_name, data_type,
1933 true,
1936 ));
1937 let column = Column::from_name(col_name);
1938 qualified_columns.push((column, new_field));
1940 }
1941 DataType::Struct(fields) => {
1942 qualified_columns.extend(fields.iter().map(|f| {
1943 let new_name = format!("{}.{}", col_name, f.name());
1944 let column = Column::from_name(&new_name);
1945 let new_field = f.as_ref().clone().with_name(new_name);
1946 (column, Arc::new(new_field))
1948 }))
1949 }
1950 _ => {
1951 return internal_err!(
1952 "trying to unnest on invalid data type {:?}",
1953 data_type
1954 );
1955 }
1956 };
1957 Ok(qualified_columns)
1958}
1959
1960pub fn unnest_with_options(
1990 input: LogicalPlan,
1991 columns_to_unnest: Vec<Column>,
1992 options: UnnestOptions,
1993) -> Result<LogicalPlan> {
1994 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
1995 let mut struct_columns = vec![];
1996 let indices_to_unnest = columns_to_unnest
1997 .iter()
1998 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
1999 .collect::<Result<HashMap<usize, &Column>>>()?;
2000
2001 let input_schema = input.schema();
2002
2003 let mut dependency_indices = vec![];
2004 let fields = input_schema
2020 .iter()
2021 .enumerate()
2022 .map(|(index, (original_qualifier, original_field))| {
2023 match indices_to_unnest.get(&index) {
2024 Some(column_to_unnest) => {
2025 let recursions_on_column = options
2026 .recursions
2027 .iter()
2028 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
2029 .collect::<Vec<_>>();
2030 let mut transformed_columns = recursions_on_column
2031 .iter()
2032 .map(|r| {
2033 list_columns.push((
2034 index,
2035 ColumnUnnestList {
2036 output_column: r.output_column.clone(),
2037 depth: r.depth,
2038 },
2039 ));
2040 Ok(get_unnested_columns(
2041 &r.output_column.name,
2042 original_field.data_type(),
2043 r.depth,
2044 )?
2045 .into_iter()
2046 .next()
2047 .unwrap()) })
2049 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2050 if transformed_columns.is_empty() {
2051 transformed_columns = get_unnested_columns(
2052 &column_to_unnest.name,
2053 original_field.data_type(),
2054 1,
2055 )?;
2056 match original_field.data_type() {
2057 DataType::Struct(_) => {
2058 struct_columns.push(index);
2059 }
2060 DataType::List(_)
2061 | DataType::FixedSizeList(_, _)
2062 | DataType::LargeList(_) => {
2063 list_columns.push((
2064 index,
2065 ColumnUnnestList {
2066 output_column: Column::from_name(
2067 &column_to_unnest.name,
2068 ),
2069 depth: 1,
2070 },
2071 ));
2072 }
2073 _ => {}
2074 };
2075 }
2076
2077 dependency_indices
2079 .extend(std::iter::repeat(index).take(transformed_columns.len()));
2080 Ok(transformed_columns
2081 .iter()
2082 .map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2083 .collect())
2084 }
2085 None => {
2086 dependency_indices.push(index);
2087 Ok(vec![(
2088 original_qualifier.cloned(),
2089 Arc::clone(original_field),
2090 )])
2091 }
2092 }
2093 })
2094 .collect::<Result<Vec<_>>>()?
2095 .into_iter()
2096 .flatten()
2097 .collect::<Vec<_>>();
2098
2099 let metadata = input_schema.metadata().clone();
2100 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2101 let deps = input_schema.functional_dependencies().clone();
2103 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2104
2105 Ok(LogicalPlan::Unnest(Unnest {
2106 input: Arc::new(input),
2107 exec_columns: columns_to_unnest,
2108 list_type_columns: list_columns,
2109 struct_type_columns: struct_columns,
2110 dependency_indices,
2111 schema,
2112 options,
2113 }))
2114}
2115
2116#[cfg(test)]
2117mod tests {
2118 use super::*;
2119 use crate::logical_plan::StringifiedPlan;
2120 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2121
2122 use crate::test::function_stub::sum;
2123 use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2124
2125 #[test]
2126 fn plan_builder_simple() -> Result<()> {
2127 let plan =
2128 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2129 .filter(col("state").eq(lit("CO")))?
2130 .project(vec![col("id")])?
2131 .build()?;
2132
2133 let expected = "Projection: employee_csv.id\
2134 \n Filter: employee_csv.state = Utf8(\"CO\")\
2135 \n TableScan: employee_csv projection=[id, state]";
2136
2137 assert_eq!(expected, format!("{plan}"));
2138
2139 Ok(())
2140 }
2141
2142 #[test]
2143 fn plan_builder_schema() {
2144 let schema = employee_schema();
2145 let projection = None;
2146 let plan =
2147 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2148 .unwrap();
2149 let expected = DFSchema::try_from_qualified_schema(
2150 TableReference::bare("employee_csv"),
2151 &schema,
2152 )
2153 .unwrap();
2154 assert_eq!(&expected, plan.schema().as_ref());
2155
2156 let projection = None;
2159 let plan =
2160 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2161 .unwrap();
2162 assert_eq!(&expected, plan.schema().as_ref());
2163 }
2164
2165 #[test]
2166 fn plan_builder_empty_name() {
2167 let schema = employee_schema();
2168 let projection = None;
2169 let err =
2170 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2171 assert_eq!(
2172 err.strip_backtrace(),
2173 "Error during planning: table_name cannot be empty"
2174 );
2175 }
2176
2177 #[test]
2178 fn plan_builder_sort() -> Result<()> {
2179 let plan =
2180 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2181 .sort(vec![
2182 expr::Sort::new(col("state"), true, true),
2183 expr::Sort::new(col("salary"), false, false),
2184 ])?
2185 .build()?;
2186
2187 let expected = "Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST\
2188 \n TableScan: employee_csv projection=[state, salary]";
2189
2190 assert_eq!(expected, format!("{plan}"));
2191
2192 Ok(())
2193 }
2194
2195 #[test]
2196 fn plan_builder_union() -> Result<()> {
2197 let plan =
2198 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2199
2200 let plan = plan
2201 .clone()
2202 .union(plan.clone().build()?)?
2203 .union(plan.clone().build()?)?
2204 .union(plan.build()?)?
2205 .build()?;
2206
2207 let expected = "Union\
2208 \n Union\
2209 \n Union\
2210 \n TableScan: employee_csv projection=[state, salary]\
2211 \n TableScan: employee_csv projection=[state, salary]\
2212 \n TableScan: employee_csv projection=[state, salary]\
2213 \n TableScan: employee_csv projection=[state, salary]";
2214
2215 assert_eq!(expected, format!("{plan}"));
2216
2217 Ok(())
2218 }
2219
2220 #[test]
2221 fn plan_builder_union_distinct() -> Result<()> {
2222 let plan =
2223 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2224
2225 let plan = plan
2226 .clone()
2227 .union_distinct(plan.clone().build()?)?
2228 .union_distinct(plan.clone().build()?)?
2229 .union_distinct(plan.build()?)?
2230 .build()?;
2231
2232 let expected = "\
2233 Distinct:\
2234 \n Union\
2235 \n Distinct:\
2236 \n Union\
2237 \n Distinct:\
2238 \n Union\
2239 \n TableScan: employee_csv projection=[state, salary]\
2240 \n TableScan: employee_csv projection=[state, salary]\
2241 \n TableScan: employee_csv projection=[state, salary]\
2242 \n TableScan: employee_csv projection=[state, salary]";
2243
2244 assert_eq!(expected, format!("{plan}"));
2245
2246 Ok(())
2247 }
2248
2249 #[test]
2250 fn plan_builder_simple_distinct() -> Result<()> {
2251 let plan =
2252 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2253 .filter(col("state").eq(lit("CO")))?
2254 .project(vec![col("id")])?
2255 .distinct()?
2256 .build()?;
2257
2258 let expected = "\
2259 Distinct:\
2260 \n Projection: employee_csv.id\
2261 \n Filter: employee_csv.state = Utf8(\"CO\")\
2262 \n TableScan: employee_csv projection=[id, state]";
2263
2264 assert_eq!(expected, format!("{plan}"));
2265
2266 Ok(())
2267 }
2268
2269 #[test]
2270 fn exists_subquery() -> Result<()> {
2271 let foo = test_table_scan_with_name("foo")?;
2272 let bar = test_table_scan_with_name("bar")?;
2273
2274 let subquery = LogicalPlanBuilder::from(foo)
2275 .project(vec![col("a")])?
2276 .filter(col("a").eq(col("bar.a")))?
2277 .build()?;
2278
2279 let outer_query = LogicalPlanBuilder::from(bar)
2280 .project(vec![col("a")])?
2281 .filter(exists(Arc::new(subquery)))?
2282 .build()?;
2283
2284 let expected = "Filter: EXISTS (<subquery>)\
2285 \n Subquery:\
2286 \n Filter: foo.a = bar.a\
2287 \n Projection: foo.a\
2288 \n TableScan: foo\
2289 \n Projection: bar.a\
2290 \n TableScan: bar";
2291 assert_eq!(expected, format!("{outer_query}"));
2292
2293 Ok(())
2294 }
2295
2296 #[test]
2297 fn filter_in_subquery() -> Result<()> {
2298 let foo = test_table_scan_with_name("foo")?;
2299 let bar = test_table_scan_with_name("bar")?;
2300
2301 let subquery = LogicalPlanBuilder::from(foo)
2302 .project(vec![col("a")])?
2303 .filter(col("a").eq(col("bar.a")))?
2304 .build()?;
2305
2306 let outer_query = LogicalPlanBuilder::from(bar)
2308 .project(vec![col("a")])?
2309 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2310 .build()?;
2311
2312 let expected = "Filter: bar.a IN (<subquery>)\
2313 \n Subquery:\
2314 \n Filter: foo.a = bar.a\
2315 \n Projection: foo.a\
2316 \n TableScan: foo\
2317 \n Projection: bar.a\
2318 \n TableScan: bar";
2319 assert_eq!(expected, format!("{outer_query}"));
2320
2321 Ok(())
2322 }
2323
2324 #[test]
2325 fn select_scalar_subquery() -> Result<()> {
2326 let foo = test_table_scan_with_name("foo")?;
2327 let bar = test_table_scan_with_name("bar")?;
2328
2329 let subquery = LogicalPlanBuilder::from(foo)
2330 .project(vec![col("b")])?
2331 .filter(col("a").eq(col("bar.a")))?
2332 .build()?;
2333
2334 let outer_query = LogicalPlanBuilder::from(bar)
2336 .project(vec![scalar_subquery(Arc::new(subquery))])?
2337 .build()?;
2338
2339 let expected = "Projection: (<subquery>)\
2340 \n Subquery:\
2341 \n Filter: foo.a = bar.a\
2342 \n Projection: foo.b\
2343 \n TableScan: foo\
2344 \n TableScan: bar";
2345 assert_eq!(expected, format!("{outer_query}"));
2346
2347 Ok(())
2348 }
2349
2350 #[test]
2351 fn projection_non_unique_names() -> Result<()> {
2352 let plan = table_scan(
2353 Some("employee_csv"),
2354 &employee_schema(),
2355 Some(vec![0, 1]),
2357 )?
2358 .project(vec![col("id"), col("first_name").alias("id")]);
2360
2361 match plan {
2362 Err(DataFusionError::SchemaError(
2363 SchemaError::AmbiguousReference {
2364 field:
2365 Column {
2366 relation: Some(TableReference::Bare { table }),
2367 name,
2368 spans: _,
2369 },
2370 },
2371 _,
2372 )) => {
2373 assert_eq!(*"employee_csv", *table);
2374 assert_eq!("id", &name);
2375 Ok(())
2376 }
2377 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2378 }
2379 }
2380
2381 fn employee_schema() -> Schema {
2382 Schema::new(vec![
2383 Field::new("id", DataType::Int32, false),
2384 Field::new("first_name", DataType::Utf8, false),
2385 Field::new("last_name", DataType::Utf8, false),
2386 Field::new("state", DataType::Utf8, false),
2387 Field::new("salary", DataType::Int32, false),
2388 ])
2389 }
2390
2391 #[test]
2392 fn stringified_plan() {
2393 let stringified_plan =
2394 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2395 assert!(stringified_plan.should_display(true));
2396 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2399 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2400 assert!(stringified_plan.should_display(true));
2401 assert!(stringified_plan.should_display(false)); let stringified_plan =
2404 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2405 assert!(stringified_plan.should_display(true));
2406 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2409 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2410 assert!(stringified_plan.should_display(true));
2411 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2414 PlanType::OptimizedLogicalPlan {
2415 optimizer_name: "random opt pass".into(),
2416 },
2417 "...the plan...",
2418 );
2419 assert!(stringified_plan.should_display(true));
2420 assert!(!stringified_plan.should_display(false));
2421 }
2422
2423 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2424 let schema = Schema::new(vec![
2425 Field::new("a", DataType::UInt32, false),
2426 Field::new("b", DataType::UInt32, false),
2427 Field::new("c", DataType::UInt32, false),
2428 ]);
2429 table_scan(Some(name), &schema, None)?.build()
2430 }
2431
2432 #[test]
2433 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2434 let plan1 =
2435 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2436 let plan2 =
2437 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2438
2439 let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \
2440 Left is 1 and right is 2.";
2441 let err_msg1 =
2442 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2443 .unwrap_err();
2444
2445 assert_eq!(err_msg1.strip_backtrace(), expected);
2446
2447 Ok(())
2448 }
2449
2450 #[test]
2451 fn plan_builder_unnest() -> Result<()> {
2452 let err = nested_table_scan("test_table")?
2454 .unnest_column("scalar")
2455 .unwrap_err();
2456 assert!(err
2457 .to_string()
2458 .starts_with("Internal error: trying to unnest on invalid data type UInt32"));
2459
2460 let plan = nested_table_scan("test_table")?
2462 .unnest_column("strings")?
2463 .build()?;
2464
2465 let expected = "\
2466 Unnest: lists[test_table.strings|depth=1] structs[]\
2467 \n TableScan: test_table";
2468 assert_eq!(expected, format!("{plan}"));
2469
2470 let field = plan.schema().field_with_name(None, "strings").unwrap();
2472 assert_eq!(&DataType::Utf8, field.data_type());
2473
2474 let plan = nested_table_scan("test_table")?
2476 .unnest_column("struct_singular")?
2477 .build()?;
2478
2479 let expected = "\
2480 Unnest: lists[] structs[test_table.struct_singular]\
2481 \n TableScan: test_table";
2482 assert_eq!(expected, format!("{plan}"));
2483
2484 for field_name in &["a", "b"] {
2485 let field = plan
2487 .schema()
2488 .field_with_name(None, &format!("struct_singular.{}", field_name))
2489 .unwrap();
2490 assert_eq!(&DataType::UInt32, field.data_type());
2491 }
2492
2493 let plan = nested_table_scan("test_table")?
2495 .unnest_column("strings")?
2496 .unnest_column("structs")?
2497 .unnest_column("struct_singular")?
2498 .build()?;
2499
2500 let expected = "\
2501 Unnest: lists[] structs[test_table.struct_singular]\
2502 \n Unnest: lists[test_table.structs|depth=1] structs[]\
2503 \n Unnest: lists[test_table.strings|depth=1] structs[]\
2504 \n TableScan: test_table";
2505 assert_eq!(expected, format!("{plan}"));
2506
2507 let field = plan.schema().field_with_name(None, "structs").unwrap();
2509 assert!(matches!(field.data_type(), DataType::Struct(_)));
2510
2511 let cols = vec!["strings", "structs", "struct_singular"]
2513 .into_iter()
2514 .map(|c| c.into())
2515 .collect();
2516
2517 let plan = nested_table_scan("test_table")?
2518 .unnest_columns_with_options(cols, UnnestOptions::default())?
2519 .build()?;
2520
2521 let expected = "\
2522 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]\
2523 \n TableScan: test_table";
2524 assert_eq!(expected, format!("{plan}"));
2525
2526 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2528 assert!(plan.is_err());
2529
2530 let plan = nested_table_scan("test_table")?
2532 .unnest_columns_with_options(
2533 vec!["stringss".into(), "struct_singular".into()],
2534 UnnestOptions::default()
2535 .with_recursions(RecursionUnnestOption {
2536 input_column: "stringss".into(),
2537 output_column: "stringss_depth_1".into(),
2538 depth: 1,
2539 })
2540 .with_recursions(RecursionUnnestOption {
2541 input_column: "stringss".into(),
2542 output_column: "stringss_depth_2".into(),
2543 depth: 2,
2544 }),
2545 )?
2546 .build()?;
2547
2548 let expected = "\
2549 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]\
2550 \n TableScan: test_table";
2551 assert_eq!(expected, format!("{plan}"));
2552
2553 let field = plan
2555 .schema()
2556 .field_with_name(None, "stringss_depth_1")
2557 .unwrap();
2558 assert_eq!(
2559 &DataType::new_list(DataType::Utf8, false),
2560 field.data_type()
2561 );
2562 let field = plan
2563 .schema()
2564 .field_with_name(None, "stringss_depth_2")
2565 .unwrap();
2566 assert_eq!(&DataType::Utf8, field.data_type());
2567 for field_name in &["a", "b"] {
2569 let field = plan
2570 .schema()
2571 .field_with_name(None, &format!("struct_singular.{}", field_name))
2572 .unwrap();
2573 assert_eq!(&DataType::UInt32, field.data_type());
2574 }
2575
2576 Ok(())
2577 }
2578
2579 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2580 let struct_field_in_list = Field::new_struct(
2583 "item",
2584 vec![
2585 Field::new("a", DataType::UInt32, false),
2586 Field::new("b", DataType::UInt32, false),
2587 ],
2588 false,
2589 );
2590 let string_field = Field::new_list_field(DataType::Utf8, false);
2591 let strings_field = Field::new_list("item", string_field.clone(), false);
2592 let schema = Schema::new(vec![
2593 Field::new("scalar", DataType::UInt32, false),
2594 Field::new_list("strings", string_field, false),
2595 Field::new_list("structs", struct_field_in_list, false),
2596 Field::new(
2597 "struct_singular",
2598 DataType::Struct(Fields::from(vec![
2599 Field::new("a", DataType::UInt32, false),
2600 Field::new("b", DataType::UInt32, false),
2601 ])),
2602 false,
2603 ),
2604 Field::new_list("stringss", strings_field, false),
2605 ]);
2606
2607 table_scan(Some(table_name), &schema, None)
2608 }
2609
2610 #[test]
2611 fn test_union_after_join() -> Result<()> {
2612 let values = vec![vec![lit(1)]];
2613
2614 let left = LogicalPlanBuilder::values(values.clone())?
2615 .alias("left")?
2616 .build()?;
2617 let right = LogicalPlanBuilder::values(values)?
2618 .alias("right")?
2619 .build()?;
2620
2621 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2622
2623 let _ = LogicalPlanBuilder::from(join.clone())
2624 .union(join)?
2625 .build()?;
2626
2627 Ok(())
2628 }
2629
2630 #[test]
2631 fn test_change_redundant_column() -> Result<()> {
2632 let t1_field_1 = Field::new("a", DataType::Int32, false);
2633 let t2_field_1 = Field::new("a", DataType::Int32, false);
2634 let t2_field_3 = Field::new("a", DataType::Int32, false);
2635 let t1_field_2 = Field::new("b", DataType::Int32, false);
2636 let t2_field_2 = Field::new("b", DataType::Int32, false);
2637
2638 let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
2639 let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2640
2641 assert_eq!(
2642 remove_redundant,
2643 vec![
2644 Field::new("a", DataType::Int32, false),
2645 Field::new("a:1", DataType::Int32, false),
2646 Field::new("b", DataType::Int32, false),
2647 Field::new("b:1", DataType::Int32, false),
2648 Field::new("a:2", DataType::Int32, false),
2649 ]
2650 );
2651 Ok(())
2652 }
2653
2654 #[test]
2655 fn plan_builder_from_logical_plan() -> Result<()> {
2656 let plan =
2657 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2658 .sort(vec![
2659 expr::Sort::new(col("state"), true, true),
2660 expr::Sort::new(col("salary"), false, false),
2661 ])?
2662 .build()?;
2663
2664 let plan_expected = format!("{plan}");
2665 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2666 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2667
2668 Ok(())
2669 }
2670
2671 #[test]
2672 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2673 let constraints =
2674 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2675 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2676
2677 let plan =
2678 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2679 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2680 .build()?;
2681
2682 let expected =
2683 "Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]\
2684 \n TableScan: employee_csv projection=[id, state, salary]";
2685 assert_eq!(expected, format!("{plan}"));
2686
2687 Ok(())
2688 }
2689
2690 #[test]
2691 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2692 let constraints =
2693 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2694 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2695
2696 let options =
2697 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2698 let plan =
2699 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2700 .with_options(options)
2701 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2702 .build()?;
2703
2704 let expected =
2705 "Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]\
2706 \n TableScan: employee_csv projection=[id, state, salary]";
2707 assert_eq!(expected, format!("{plan}"));
2708
2709 Ok(())
2710 }
2711}