1mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::collections::HashSet;
25use std::sync::Arc;
26
27use datafusion_common::{
28 get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
29 HashMap, JoinType, Result,
30};
31use datafusion_expr::expr::Alias;
32use datafusion_expr::Unnest;
33use datafusion_expr::{
34 logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, Projection,
35 TableScan, Window,
36};
37
38use crate::optimize_projections::required_indices::RequiredIndices;
39use crate::utils::NamePreserver;
40use datafusion_common::tree_node::{
41 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
42};
43
44#[derive(Default, Debug)]
60pub struct OptimizeProjections {}
61
62impl OptimizeProjections {
63 #[allow(missing_docs)]
64 pub fn new() -> Self {
65 Self {}
66 }
67}
68
69impl OptimizerRule for OptimizeProjections {
70 fn name(&self) -> &str {
71 "optimize_projections"
72 }
73
74 fn apply_order(&self) -> Option<ApplyOrder> {
75 None
76 }
77
78 fn supports_rewrite(&self) -> bool {
79 true
80 }
81
82 fn rewrite(
83 &self,
84 plan: LogicalPlan,
85 config: &dyn OptimizerConfig,
86 ) -> Result<Transformed<LogicalPlan>> {
87 let indices = RequiredIndices::new_for_all_exprs(&plan);
89 optimize_projections(plan, config, indices)
90 }
91}
92
93#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
113fn optimize_projections(
114 plan: LogicalPlan,
115 config: &dyn OptimizerConfig,
116 indices: RequiredIndices,
117) -> Result<Transformed<LogicalPlan>> {
118 match plan {
121 LogicalPlan::Projection(proj) => {
122 return merge_consecutive_projections(proj)?.transform_data(|proj| {
123 rewrite_projection_given_requirements(proj, config, &indices)
124 })
125 }
126 LogicalPlan::Aggregate(aggregate) => {
127 let n_group_exprs = aggregate.group_expr_len()?;
129 let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
132
133 let group_by_expr_existing = aggregate
135 .group_expr
136 .iter()
137 .map(|group_by_expr| group_by_expr.schema_name().to_string())
138 .collect::<Vec<_>>();
139
140 let new_group_bys = if let Some(simplest_groupby_indices) =
141 get_required_group_by_exprs_indices(
142 aggregate.input.schema(),
143 &group_by_expr_existing,
144 ) {
145 group_by_reqs
149 .append(&simplest_groupby_indices)
150 .get_at_indices(&aggregate.group_expr)
151 } else {
152 aggregate.group_expr
153 };
154
155 let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
158
159 if new_aggr_expr.is_empty()
168 && new_group_bys.is_empty()
169 && !aggregate.aggr_expr.is_empty()
170 {
171 new_aggr_expr = aggregate.aggr_expr;
173 new_aggr_expr.resize_with(1, || unreachable!());
174 }
175
176 let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
177 let schema = aggregate.input.schema();
178 let necessary_indices =
179 RequiredIndices::new().with_exprs(schema, all_exprs_iter);
180 let necessary_exprs = necessary_indices.get_required_exprs(schema);
181
182 return optimize_projections(
183 Arc::unwrap_or_clone(aggregate.input),
184 config,
185 necessary_indices,
186 )?
187 .transform_data(|aggregate_input| {
188 add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
193 })?
194 .map_data(|aggregate_input| {
195 Aggregate::try_new(
198 Arc::new(aggregate_input),
199 new_group_bys,
200 new_aggr_expr,
201 )
202 .map(LogicalPlan::Aggregate)
203 });
204 }
205 LogicalPlan::Window(window) => {
206 let input_schema = Arc::clone(window.input.schema());
207 let n_input_fields = input_schema.fields().len();
209 let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
212
213 let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
216
217 let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
220
221 return optimize_projections(
222 Arc::unwrap_or_clone(window.input),
223 config,
224 required_indices.clone(),
225 )?
226 .transform_data(|window_child| {
227 if new_window_expr.is_empty() {
228 Ok(Transformed::no(window_child))
230 } else {
231 let required_exprs =
235 required_indices.get_required_exprs(&input_schema);
236 let window_child =
237 add_projection_on_top_if_helpful(window_child, required_exprs)?
238 .data;
239 Window::try_new(new_window_expr, Arc::new(window_child))
240 .map(LogicalPlan::Window)
241 .map(Transformed::yes)
242 }
243 });
244 }
245 LogicalPlan::TableScan(table_scan) => {
246 let TableScan {
247 table_name,
248 source,
249 projection,
250 filters,
251 fetch,
252 projected_schema: _,
253 } = table_scan;
254
255 let projection = match &projection {
258 Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
259 None => indices.into_inner(),
260 };
261 return TableScan::try_new(
262 table_name,
263 source,
264 Some(projection),
265 filters,
266 fetch,
267 )
268 .map(LogicalPlan::TableScan)
269 .map(Transformed::yes);
270 }
271 _ => {}
273 };
274
275 let mut child_required_indices: Vec<RequiredIndices> = match &plan {
278 LogicalPlan::Sort(_)
279 | LogicalPlan::Filter(_)
280 | LogicalPlan::Repartition(_)
281 | LogicalPlan::Union(_)
282 | LogicalPlan::SubqueryAlias(_)
283 | LogicalPlan::Distinct(Distinct::On(_)) => {
284 plan.inputs()
289 .into_iter()
290 .map(|input| {
291 indices
292 .clone()
293 .with_projection_beneficial()
294 .with_plan_exprs(&plan, input.schema())
295 })
296 .collect::<Result<_>>()?
297 }
298 LogicalPlan::Limit(_) => {
299 plan.inputs()
304 .into_iter()
305 .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
306 .collect::<Result<_>>()?
307 }
308 LogicalPlan::Copy(_)
309 | LogicalPlan::Ddl(_)
310 | LogicalPlan::Dml(_)
311 | LogicalPlan::Explain(_)
312 | LogicalPlan::Analyze(_)
313 | LogicalPlan::Subquery(_)
314 | LogicalPlan::Statement(_)
315 | LogicalPlan::Distinct(Distinct::All(_)) => {
316 plan.inputs()
322 .into_iter()
323 .map(RequiredIndices::new_for_all_exprs)
324 .collect()
325 }
326 LogicalPlan::Extension(extension) => {
327 let Some(necessary_children_indices) =
328 extension.node.necessary_children_exprs(indices.indices())
329 else {
330 return Ok(Transformed::no(plan));
332 };
333 let children = extension.node.inputs();
334 if children.len() != necessary_children_indices.len() {
335 return internal_err!("Inconsistent length between children and necessary children indices. \
336 Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
337 consistent with actual children length for the node.");
338 }
339 children
340 .into_iter()
341 .zip(necessary_children_indices)
342 .map(|(child, necessary_indices)| {
343 RequiredIndices::new_from_indices(necessary_indices)
344 .with_plan_exprs(&plan, child.schema())
345 })
346 .collect::<Result<Vec<_>>>()?
347 }
348 LogicalPlan::EmptyRelation(_)
349 | LogicalPlan::RecursiveQuery(_)
350 | LogicalPlan::Values(_)
351 | LogicalPlan::DescribeTable(_) => {
352 return Ok(Transformed::no(plan));
354 }
355 LogicalPlan::Join(join) => {
356 let left_len = join.left.schema().fields().len();
357 let (left_req_indices, right_req_indices) =
358 split_join_requirements(left_len, indices, &join.join_type);
359 let left_indices =
360 left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
361 let right_indices =
362 right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
363 vec![
366 left_indices.with_projection_beneficial(),
367 right_indices.with_projection_beneficial(),
368 ]
369 }
370 LogicalPlan::Projection(_)
372 | LogicalPlan::Aggregate(_)
373 | LogicalPlan::Window(_)
374 | LogicalPlan::TableScan(_) => {
375 return internal_err!(
376 "OptimizeProjection: should have handled in the match statement above"
377 );
378 }
379 LogicalPlan::Unnest(Unnest {
380 dependency_indices, ..
381 }) => {
382 vec![RequiredIndices::new_from_indices(
383 dependency_indices.clone(),
384 )]
385 }
386 };
387
388 child_required_indices.reverse();
391 if child_required_indices.len() != plan.inputs().len() {
392 return internal_err!(
393 "OptimizeProjection: child_required_indices length mismatch with plan inputs"
394 );
395 }
396
397 let transformed_plan = plan.map_children(|child| {
399 let required_indices = child_required_indices.pop().ok_or_else(|| {
400 internal_datafusion_err!(
401 "Unexpected number of required_indices in OptimizeProjections rule"
402 )
403 })?;
404
405 let projection_beneficial = required_indices.projection_beneficial();
406 let project_exprs = required_indices.get_required_exprs(child.schema());
407
408 optimize_projections(child, config, required_indices)?.transform_data(
409 |new_input| {
410 if projection_beneficial {
411 add_projection_on_top_if_helpful(new_input, project_exprs)
412 } else {
413 Ok(Transformed::no(new_input))
414 }
415 },
416 )
417 })?;
418
419 if transformed_plan.transformed {
421 transformed_plan.map_data(|plan| plan.recompute_schema())
422 } else {
423 Ok(transformed_plan)
424 }
425}
426
427fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
448 let Projection {
449 expr,
450 input,
451 schema,
452 ..
453 } = proj;
454 let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
455 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
456 };
457
458 let mut column_referral_map = HashMap::<&Column, usize>::new();
460 expr.iter()
461 .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
462
463 if column_referral_map.into_iter().any(|(col, usage)| {
467 usage > 1
468 && !is_expr_trivial(
469 &prev_projection.expr
470 [prev_projection.schema.index_of_column(col).unwrap()],
471 )
472 }) {
473 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
475 }
476
477 let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
478 unreachable!();
480 };
481
482 let name_preserver = NamePreserver::new_for_projection();
485 let mut original_names = vec![];
486 let new_exprs = expr.map_elements(|expr| {
487 original_names.push(name_preserver.save(&expr));
488
489 match expr {
491 Expr::Alias(Alias {
492 expr,
493 relation,
494 name,
495 metadata,
496 }) => rewrite_expr(*expr, &prev_projection).map(|result| {
497 result.update_data(|expr| {
498 Expr::Alias(Alias::new(expr, relation, name).with_metadata(metadata))
499 })
500 }),
501 e => rewrite_expr(e, &prev_projection),
502 }
503 })?;
504
505 if new_exprs.transformed {
508 let new_exprs = new_exprs
510 .data
511 .into_iter()
512 .zip(original_names)
513 .map(|(expr, original_name)| original_name.restore(expr))
514 .collect::<Vec<_>>();
515 Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
516 } else {
517 let input = Arc::new(LogicalPlan::Projection(prev_projection));
519 Projection::try_new_with_schema(new_exprs.data, input, schema)
520 .map(Transformed::no)
521 }
522}
523
524fn is_expr_trivial(expr: &Expr) -> bool {
526 matches!(expr, Expr::Column(_) | Expr::Literal(_))
527}
528
529fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
574 expr.transform_up(|expr| {
575 match expr {
576 Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
578 Expr::Column(col) => {
579 let idx = input.schema.index_of_column(&col)?;
581 let input_expr = input.expr[idx].clone().unalias_nested().data;
589 Ok(Transformed::yes(input_expr))
590 }
591 _ => Ok(Transformed::no(expr)),
593 }
594 })
595}
596
597fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) {
606 expr.apply(|expr| {
608 match expr {
609 Expr::OuterReferenceColumn(_, col) => {
610 columns.insert(col);
611 }
612 Expr::ScalarSubquery(subquery) => {
613 outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
614 }
615 Expr::Exists(exists) => {
616 outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
617 }
618 Expr::InSubquery(insubquery) => {
619 outer_columns_helper_multi(
620 &insubquery.subquery.outer_ref_columns,
621 columns,
622 );
623 }
624 _ => {}
625 };
626 Ok(TreeNodeRecursion::Continue)
627 })
628 .unwrap();
630}
631
632fn outer_columns_helper_multi<'a, 'b>(
641 exprs: impl IntoIterator<Item = &'a Expr>,
642 columns: &'b mut HashSet<&'a Column>,
643) {
644 exprs.into_iter().for_each(|e| outer_columns(e, columns));
645}
646
647fn split_join_requirements(
677 left_len: usize,
678 indices: RequiredIndices,
679 join_type: &JoinType,
680) -> (RequiredIndices, RequiredIndices) {
681 match join_type {
682 JoinType::Inner
684 | JoinType::Left
685 | JoinType::Right
686 | JoinType::Full
687 | JoinType::LeftMark => {
688 indices.split_off(left_len)
691 }
692 JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
694 JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
697 }
698}
699
700fn add_projection_on_top_if_helpful(
718 plan: LogicalPlan,
719 project_exprs: Vec<Expr>,
720) -> Result<Transformed<LogicalPlan>> {
721 if project_exprs.len() >= plan.schema().fields().len() {
723 Ok(Transformed::no(plan))
724 } else {
725 Projection::try_new(project_exprs, Arc::new(plan))
726 .map(LogicalPlan::Projection)
727 .map(Transformed::yes)
728 }
729}
730
731fn rewrite_projection_given_requirements(
749 proj: Projection,
750 config: &dyn OptimizerConfig,
751 indices: &RequiredIndices,
752) -> Result<Transformed<LogicalPlan>> {
753 let Projection { expr, input, .. } = proj;
754
755 let exprs_used = indices.get_at_indices(&expr);
756
757 let required_indices =
758 RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
759
760 optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
763 .transform_data(|input| {
764 if is_projection_unnecessary(&input, &exprs_used)? {
765 Ok(Transformed::yes(input))
766 } else {
767 Projection::try_new(exprs_used, Arc::new(input))
768 .map(LogicalPlan::Projection)
769 .map(Transformed::yes)
770 }
771 })
772}
773
774fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
778 let proj_schema = projection_schema(input, proj_exprs)?;
779 Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
780}
781
782#[cfg(test)]
783mod tests {
784 use std::cmp::Ordering;
785 use std::collections::HashMap;
786 use std::fmt::Formatter;
787 use std::ops::Add;
788 use std::sync::Arc;
789 use std::vec;
790
791 use crate::optimize_projections::OptimizeProjections;
792 use crate::optimizer::Optimizer;
793 use crate::test::{
794 assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
795 test_table_scan_fields, test_table_scan_with_name,
796 };
797 use crate::{OptimizerContext, OptimizerRule};
798 use arrow::datatypes::{DataType, Field, Schema};
799 use datafusion_common::{
800 Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
801 };
802 use datafusion_expr::ExprFunctionExt;
803 use datafusion_expr::{
804 binary_expr, build_join_schema,
805 builder::table_scan_with_filters,
806 col,
807 expr::{self, Cast},
808 lit,
809 logical_plan::{builder::LogicalPlanBuilder, table_scan},
810 not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator,
811 Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition,
812 };
813
814 use datafusion_functions_aggregate::count::count_udaf;
815 use datafusion_functions_aggregate::expr_fn::{count, max, min};
816 use datafusion_functions_aggregate::min_max::max_udaf;
817
818 fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
819 assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
820 }
821
822 #[derive(Debug, Hash, PartialEq, Eq)]
823 struct NoOpUserDefined {
824 exprs: Vec<Expr>,
825 schema: DFSchemaRef,
826 input: Arc<LogicalPlan>,
827 }
828
829 impl NoOpUserDefined {
830 fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
831 Self {
832 exprs: vec![],
833 schema,
834 input,
835 }
836 }
837
838 fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
839 self.exprs = exprs;
840 self
841 }
842 }
843
844 impl PartialOrd for NoOpUserDefined {
846 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
847 match self.exprs.partial_cmp(&other.exprs) {
848 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
849 cmp => cmp,
850 }
851 }
852 }
853
854 impl UserDefinedLogicalNodeCore for NoOpUserDefined {
855 fn name(&self) -> &str {
856 "NoOpUserDefined"
857 }
858
859 fn inputs(&self) -> Vec<&LogicalPlan> {
860 vec![&self.input]
861 }
862
863 fn schema(&self) -> &DFSchemaRef {
864 &self.schema
865 }
866
867 fn expressions(&self) -> Vec<Expr> {
868 self.exprs.clone()
869 }
870
871 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
872 write!(f, "NoOpUserDefined")
873 }
874
875 fn with_exprs_and_inputs(
876 &self,
877 exprs: Vec<Expr>,
878 mut inputs: Vec<LogicalPlan>,
879 ) -> Result<Self> {
880 Ok(Self {
881 exprs,
882 input: Arc::new(inputs.swap_remove(0)),
883 schema: Arc::clone(&self.schema),
884 })
885 }
886
887 fn necessary_children_exprs(
888 &self,
889 output_columns: &[usize],
890 ) -> Option<Vec<Vec<usize>>> {
891 Some(vec![output_columns.to_vec()])
893 }
894
895 fn supports_limit_pushdown(&self) -> bool {
896 false }
898 }
899
900 #[derive(Debug, Hash, PartialEq, Eq)]
901 struct UserDefinedCrossJoin {
902 exprs: Vec<Expr>,
903 schema: DFSchemaRef,
904 left_child: Arc<LogicalPlan>,
905 right_child: Arc<LogicalPlan>,
906 }
907
908 impl UserDefinedCrossJoin {
909 fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
910 let left_schema = left_child.schema();
911 let right_schema = right_child.schema();
912 let schema = Arc::new(
913 build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
914 );
915 Self {
916 exprs: vec![],
917 schema,
918 left_child,
919 right_child,
920 }
921 }
922 }
923
924 impl PartialOrd for UserDefinedCrossJoin {
926 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
927 match self.exprs.partial_cmp(&other.exprs) {
928 Some(Ordering::Equal) => {
929 match self.left_child.partial_cmp(&other.left_child) {
930 Some(Ordering::Equal) => {
931 self.right_child.partial_cmp(&other.right_child)
932 }
933 cmp => cmp,
934 }
935 }
936 cmp => cmp,
937 }
938 }
939 }
940
941 impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
942 fn name(&self) -> &str {
943 "UserDefinedCrossJoin"
944 }
945
946 fn inputs(&self) -> Vec<&LogicalPlan> {
947 vec![&self.left_child, &self.right_child]
948 }
949
950 fn schema(&self) -> &DFSchemaRef {
951 &self.schema
952 }
953
954 fn expressions(&self) -> Vec<Expr> {
955 self.exprs.clone()
956 }
957
958 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
959 write!(f, "UserDefinedCrossJoin")
960 }
961
962 fn with_exprs_and_inputs(
963 &self,
964 exprs: Vec<Expr>,
965 mut inputs: Vec<LogicalPlan>,
966 ) -> Result<Self> {
967 assert_eq!(inputs.len(), 2);
968 Ok(Self {
969 exprs,
970 left_child: Arc::new(inputs.remove(0)),
971 right_child: Arc::new(inputs.remove(0)),
972 schema: Arc::clone(&self.schema),
973 })
974 }
975
976 fn necessary_children_exprs(
977 &self,
978 output_columns: &[usize],
979 ) -> Option<Vec<Vec<usize>>> {
980 let left_child_len = self.left_child.schema().fields().len();
981 let mut left_reqs = vec![];
982 let mut right_reqs = vec![];
983 for &out_idx in output_columns {
984 if out_idx < left_child_len {
985 left_reqs.push(out_idx);
986 } else {
987 right_reqs.push(out_idx - left_child_len)
990 }
991 }
992 Some(vec![left_reqs, right_reqs])
993 }
994
995 fn supports_limit_pushdown(&self) -> bool {
996 false }
998 }
999
1000 #[test]
1001 fn merge_two_projection() -> Result<()> {
1002 let table_scan = test_table_scan()?;
1003 let plan = LogicalPlanBuilder::from(table_scan)
1004 .project(vec![col("a")])?
1005 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1006 .build()?;
1007
1008 let expected = "Projection: Int32(1) + test.a\
1009 \n TableScan: test projection=[a]";
1010 assert_optimized_plan_equal(plan, expected)
1011 }
1012
1013 #[test]
1014 fn merge_three_projection() -> Result<()> {
1015 let table_scan = test_table_scan()?;
1016 let plan = LogicalPlanBuilder::from(table_scan)
1017 .project(vec![col("a"), col("b")])?
1018 .project(vec![col("a")])?
1019 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1020 .build()?;
1021
1022 let expected = "Projection: Int32(1) + test.a\
1023 \n TableScan: test projection=[a]";
1024 assert_optimized_plan_equal(plan, expected)
1025 }
1026
1027 #[test]
1028 fn merge_alias() -> Result<()> {
1029 let table_scan = test_table_scan()?;
1030 let plan = LogicalPlanBuilder::from(table_scan)
1031 .project(vec![col("a")])?
1032 .project(vec![col("a").alias("alias")])?
1033 .build()?;
1034
1035 let expected = "Projection: test.a AS alias\
1036 \n TableScan: test projection=[a]";
1037 assert_optimized_plan_equal(plan, expected)
1038 }
1039
1040 #[test]
1041 fn merge_nested_alias() -> Result<()> {
1042 let table_scan = test_table_scan()?;
1043 let plan = LogicalPlanBuilder::from(table_scan)
1044 .project(vec![col("a").alias("alias1").alias("alias2")])?
1045 .project(vec![col("alias2").alias("alias")])?
1046 .build()?;
1047
1048 let expected = "Projection: test.a AS alias\
1049 \n TableScan: test projection=[a]";
1050 assert_optimized_plan_equal(plan, expected)
1051 }
1052
1053 #[test]
1054 fn test_nested_count() -> Result<()> {
1055 let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1056
1057 let groups: Vec<Expr> = vec![];
1058
1059 let plan = table_scan(TableReference::none(), &schema, None)
1060 .unwrap()
1061 .aggregate(groups.clone(), vec![count(lit(1))])
1062 .unwrap()
1063 .aggregate(groups, vec![count(lit(1))])
1064 .unwrap()
1065 .build()
1066 .unwrap();
1067
1068 let expected = "Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1069 \n Projection: \
1070 \n Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1071 \n TableScan: ?table? projection=[]";
1072 assert_optimized_plan_equal(plan, expected)
1073 }
1074
1075 #[test]
1076 fn test_neg_push_down() -> Result<()> {
1077 let table_scan = test_table_scan()?;
1078 let plan = LogicalPlanBuilder::from(table_scan)
1079 .project(vec![-col("a")])?
1080 .build()?;
1081
1082 let expected = "Projection: (- test.a)\
1083 \n TableScan: test projection=[a]";
1084 assert_optimized_plan_equal(plan, expected)
1085 }
1086
1087 #[test]
1088 fn test_is_null() -> Result<()> {
1089 let table_scan = test_table_scan()?;
1090 let plan = LogicalPlanBuilder::from(table_scan)
1091 .project(vec![col("a").is_null()])?
1092 .build()?;
1093
1094 let expected = "Projection: test.a IS NULL\
1095 \n TableScan: test projection=[a]";
1096 assert_optimized_plan_equal(plan, expected)
1097 }
1098
1099 #[test]
1100 fn test_is_not_null() -> Result<()> {
1101 let table_scan = test_table_scan()?;
1102 let plan = LogicalPlanBuilder::from(table_scan)
1103 .project(vec![col("a").is_not_null()])?
1104 .build()?;
1105
1106 let expected = "Projection: test.a IS NOT NULL\
1107 \n TableScan: test projection=[a]";
1108 assert_optimized_plan_equal(plan, expected)
1109 }
1110
1111 #[test]
1112 fn test_is_true() -> Result<()> {
1113 let table_scan = test_table_scan()?;
1114 let plan = LogicalPlanBuilder::from(table_scan)
1115 .project(vec![col("a").is_true()])?
1116 .build()?;
1117
1118 let expected = "Projection: test.a IS TRUE\
1119 \n TableScan: test projection=[a]";
1120 assert_optimized_plan_equal(plan, expected)
1121 }
1122
1123 #[test]
1124 fn test_is_not_true() -> Result<()> {
1125 let table_scan = test_table_scan()?;
1126 let plan = LogicalPlanBuilder::from(table_scan)
1127 .project(vec![col("a").is_not_true()])?
1128 .build()?;
1129
1130 let expected = "Projection: test.a IS NOT TRUE\
1131 \n TableScan: test projection=[a]";
1132 assert_optimized_plan_equal(plan, expected)
1133 }
1134
1135 #[test]
1136 fn test_is_false() -> Result<()> {
1137 let table_scan = test_table_scan()?;
1138 let plan = LogicalPlanBuilder::from(table_scan)
1139 .project(vec![col("a").is_false()])?
1140 .build()?;
1141
1142 let expected = "Projection: test.a IS FALSE\
1143 \n TableScan: test projection=[a]";
1144 assert_optimized_plan_equal(plan, expected)
1145 }
1146
1147 #[test]
1148 fn test_is_not_false() -> Result<()> {
1149 let table_scan = test_table_scan()?;
1150 let plan = LogicalPlanBuilder::from(table_scan)
1151 .project(vec![col("a").is_not_false()])?
1152 .build()?;
1153
1154 let expected = "Projection: test.a IS NOT FALSE\
1155 \n TableScan: test projection=[a]";
1156 assert_optimized_plan_equal(plan, expected)
1157 }
1158
1159 #[test]
1160 fn test_is_unknown() -> Result<()> {
1161 let table_scan = test_table_scan()?;
1162 let plan = LogicalPlanBuilder::from(table_scan)
1163 .project(vec![col("a").is_unknown()])?
1164 .build()?;
1165
1166 let expected = "Projection: test.a IS UNKNOWN\
1167 \n TableScan: test projection=[a]";
1168 assert_optimized_plan_equal(plan, expected)
1169 }
1170
1171 #[test]
1172 fn test_is_not_unknown() -> Result<()> {
1173 let table_scan = test_table_scan()?;
1174 let plan = LogicalPlanBuilder::from(table_scan)
1175 .project(vec![col("a").is_not_unknown()])?
1176 .build()?;
1177
1178 let expected = "Projection: test.a IS NOT UNKNOWN\
1179 \n TableScan: test projection=[a]";
1180 assert_optimized_plan_equal(plan, expected)
1181 }
1182
1183 #[test]
1184 fn test_not() -> Result<()> {
1185 let table_scan = test_table_scan()?;
1186 let plan = LogicalPlanBuilder::from(table_scan)
1187 .project(vec![not(col("a"))])?
1188 .build()?;
1189
1190 let expected = "Projection: NOT test.a\
1191 \n TableScan: test projection=[a]";
1192 assert_optimized_plan_equal(plan, expected)
1193 }
1194
1195 #[test]
1196 fn test_try_cast() -> Result<()> {
1197 let table_scan = test_table_scan()?;
1198 let plan = LogicalPlanBuilder::from(table_scan)
1199 .project(vec![try_cast(col("a"), DataType::Float64)])?
1200 .build()?;
1201
1202 let expected = "Projection: TRY_CAST(test.a AS Float64)\
1203 \n TableScan: test projection=[a]";
1204 assert_optimized_plan_equal(plan, expected)
1205 }
1206
1207 #[test]
1208 fn test_similar_to() -> Result<()> {
1209 let table_scan = test_table_scan()?;
1210 let expr = Box::new(col("a"));
1211 let pattern = Box::new(lit("[0-9]"));
1212 let similar_to_expr =
1213 Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1214 let plan = LogicalPlanBuilder::from(table_scan)
1215 .project(vec![similar_to_expr])?
1216 .build()?;
1217
1218 let expected = "Projection: test.a SIMILAR TO Utf8(\"[0-9]\")\
1219 \n TableScan: test projection=[a]";
1220 assert_optimized_plan_equal(plan, expected)
1221 }
1222
1223 #[test]
1224 fn test_between() -> Result<()> {
1225 let table_scan = test_table_scan()?;
1226 let plan = LogicalPlanBuilder::from(table_scan)
1227 .project(vec![col("a").between(lit(1), lit(3))])?
1228 .build()?;
1229
1230 let expected = "Projection: test.a BETWEEN Int32(1) AND Int32(3)\
1231 \n TableScan: test projection=[a]";
1232 assert_optimized_plan_equal(plan, expected)
1233 }
1234
1235 #[test]
1237 fn test_case_merged() -> Result<()> {
1238 let table_scan = test_table_scan()?;
1239 let plan = LogicalPlanBuilder::from(table_scan)
1240 .project(vec![col("a"), lit(0).alias("d")])?
1241 .project(vec![
1242 col("a"),
1243 when(col("a").eq(lit(1)), lit(10))
1244 .otherwise(col("d"))?
1245 .alias("d"),
1246 ])?
1247 .build()?;
1248
1249 let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d\
1250 \n TableScan: test projection=[a]";
1251 assert_optimized_plan_equal(plan, expected)
1252 }
1253
1254 #[test]
1257 fn test_derived_column() -> Result<()> {
1258 let table_scan = test_table_scan()?;
1259 let plan = LogicalPlanBuilder::from(table_scan)
1260 .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1261 .project(vec![
1262 col("a"),
1263 when(col("a").eq(lit(1)), lit(10))
1264 .otherwise(col("d"))?
1265 .alias("d"),
1266 ])?
1267 .build()?;
1268
1269 let expected =
1270 "Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d\
1271 \n Projection: test.a + Int32(1) AS a, Int32(0) AS d\
1272 \n TableScan: test projection=[a]";
1273 assert_optimized_plan_equal(plan, expected)
1274 }
1275
1276 #[test]
1279 fn test_user_defined_logical_plan_node() -> Result<()> {
1280 let table_scan = test_table_scan()?;
1281 let custom_plan = LogicalPlan::Extension(Extension {
1282 node: Arc::new(NoOpUserDefined::new(
1283 Arc::clone(table_scan.schema()),
1284 Arc::new(table_scan.clone()),
1285 )),
1286 });
1287 let plan = LogicalPlanBuilder::from(custom_plan)
1288 .project(vec![col("a"), lit(0).alias("d")])?
1289 .build()?;
1290
1291 let expected = "Projection: test.a, Int32(0) AS d\
1292 \n NoOpUserDefined\
1293 \n TableScan: test projection=[a]";
1294 assert_optimized_plan_equal(plan, expected)
1295 }
1296
1297 #[test]
1302 fn test_user_defined_logical_plan_node2() -> Result<()> {
1303 let table_scan = test_table_scan()?;
1304 let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1305 let custom_plan = LogicalPlan::Extension(Extension {
1306 node: Arc::new(
1307 NoOpUserDefined::new(
1308 Arc::clone(table_scan.schema()),
1309 Arc::new(table_scan.clone()),
1310 )
1311 .with_exprs(exprs),
1312 ),
1313 });
1314 let plan = LogicalPlanBuilder::from(custom_plan)
1315 .project(vec![col("a"), lit(0).alias("d")])?
1316 .build()?;
1317
1318 let expected = "Projection: test.a, Int32(0) AS d\
1319 \n NoOpUserDefined\
1320 \n TableScan: test projection=[a, b]";
1321 assert_optimized_plan_equal(plan, expected)
1322 }
1323
1324 #[test]
1330 fn test_user_defined_logical_plan_node3() -> Result<()> {
1331 let table_scan = test_table_scan()?;
1332 let left_expr = Expr::Column(Column::from_qualified_name("b"));
1333 let right_expr = Expr::Column(Column::from_qualified_name("c"));
1334 let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1335 Box::new(left_expr),
1336 Operator::Plus,
1337 Box::new(right_expr),
1338 ));
1339 let exprs = vec![binary_expr];
1340 let custom_plan = LogicalPlan::Extension(Extension {
1341 node: Arc::new(
1342 NoOpUserDefined::new(
1343 Arc::clone(table_scan.schema()),
1344 Arc::new(table_scan.clone()),
1345 )
1346 .with_exprs(exprs),
1347 ),
1348 });
1349 let plan = LogicalPlanBuilder::from(custom_plan)
1350 .project(vec![col("a"), lit(0).alias("d")])?
1351 .build()?;
1352
1353 let expected = "Projection: test.a, Int32(0) AS d\
1354 \n NoOpUserDefined\
1355 \n TableScan: test projection=[a, b, c]";
1356 assert_optimized_plan_equal(plan, expected)
1357 }
1358
1359 #[test]
1364 fn test_user_defined_logical_plan_node4() -> Result<()> {
1365 let left_table = test_table_scan_with_name("l")?;
1366 let right_table = test_table_scan_with_name("r")?;
1367 let custom_plan = LogicalPlan::Extension(Extension {
1368 node: Arc::new(UserDefinedCrossJoin::new(
1369 Arc::new(left_table),
1370 Arc::new(right_table),
1371 )),
1372 });
1373 let plan = LogicalPlanBuilder::from(custom_plan)
1374 .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1375 .build()?;
1376
1377 let expected = "Projection: l.a, l.c, r.a, Int32(0) AS d\
1378 \n UserDefinedCrossJoin\
1379 \n TableScan: l projection=[a, c]\
1380 \n TableScan: r projection=[a]";
1381 assert_optimized_plan_equal(plan, expected)
1382 }
1383
1384 #[test]
1385 fn aggregate_no_group_by() -> Result<()> {
1386 let table_scan = test_table_scan()?;
1387
1388 let plan = LogicalPlanBuilder::from(table_scan)
1389 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1390 .build()?;
1391
1392 let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1393 \n TableScan: test projection=[b]";
1394
1395 assert_optimized_plan_equal(plan, expected)
1396 }
1397
1398 #[test]
1399 fn aggregate_group_by() -> Result<()> {
1400 let table_scan = test_table_scan()?;
1401
1402 let plan = LogicalPlanBuilder::from(table_scan)
1403 .aggregate(vec![col("c")], vec![max(col("b"))])?
1404 .build()?;
1405
1406 let expected = "Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]\
1407 \n TableScan: test projection=[b, c]";
1408
1409 assert_optimized_plan_equal(plan, expected)
1410 }
1411
1412 #[test]
1413 fn aggregate_group_by_with_table_alias() -> Result<()> {
1414 let table_scan = test_table_scan()?;
1415
1416 let plan = LogicalPlanBuilder::from(table_scan)
1417 .alias("a")?
1418 .aggregate(vec![col("c")], vec![max(col("b"))])?
1419 .build()?;
1420
1421 let expected = "Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]\
1422 \n SubqueryAlias: a\
1423 \n TableScan: test projection=[b, c]";
1424
1425 assert_optimized_plan_equal(plan, expected)
1426 }
1427
1428 #[test]
1429 fn aggregate_no_group_by_with_filter() -> Result<()> {
1430 let table_scan = test_table_scan()?;
1431
1432 let plan = LogicalPlanBuilder::from(table_scan)
1433 .filter(col("c").gt(lit(1)))?
1434 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1435 .build()?;
1436
1437 let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1438 \n Projection: test.b\
1439 \n Filter: test.c > Int32(1)\
1440 \n TableScan: test projection=[b, c]";
1441
1442 assert_optimized_plan_equal(plan, expected)
1443 }
1444
1445 #[test]
1446 fn aggregate_with_periods() -> Result<()> {
1447 let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1448
1449 let plan = table_scan(Some("m4"), &schema, None)?
1456 .aggregate(
1457 Vec::<Expr>::new(),
1458 vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1459 )?
1460 .project([col(Column::new_unqualified("tag.one"))])?
1461 .build()?;
1462
1463 let expected = "\
1464 Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]\
1465 \n TableScan: m4 projection=[tag.one]";
1466
1467 assert_optimized_plan_equal(plan, expected)
1468 }
1469
1470 #[test]
1471 fn redundant_project() -> Result<()> {
1472 let table_scan = test_table_scan()?;
1473
1474 let plan = LogicalPlanBuilder::from(table_scan)
1475 .project(vec![col("a"), col("b"), col("c")])?
1476 .project(vec![col("a"), col("c"), col("b")])?
1477 .build()?;
1478 let expected = "Projection: test.a, test.c, test.b\
1479 \n TableScan: test projection=[a, b, c]";
1480
1481 assert_optimized_plan_equal(plan, expected)
1482 }
1483
1484 #[test]
1485 fn reorder_scan() -> Result<()> {
1486 let schema = Schema::new(test_table_scan_fields());
1487
1488 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1489 let expected = "TableScan: test projection=[b, a, c]";
1490
1491 assert_optimized_plan_equal(plan, expected)
1492 }
1493
1494 #[test]
1495 fn reorder_scan_projection() -> Result<()> {
1496 let schema = Schema::new(test_table_scan_fields());
1497
1498 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1499 .project(vec![col("a"), col("b")])?
1500 .build()?;
1501 let expected = "Projection: test.a, test.b\
1502 \n TableScan: test projection=[b, a]";
1503
1504 assert_optimized_plan_equal(plan, expected)
1505 }
1506
1507 #[test]
1508 fn reorder_projection() -> Result<()> {
1509 let table_scan = test_table_scan()?;
1510
1511 let plan = LogicalPlanBuilder::from(table_scan)
1512 .project(vec![col("c"), col("b"), col("a")])?
1513 .build()?;
1514 let expected = "Projection: test.c, test.b, test.a\
1515 \n TableScan: test projection=[a, b, c]";
1516
1517 assert_optimized_plan_equal(plan, expected)
1518 }
1519
1520 #[test]
1521 fn noncontinuous_redundant_projection() -> Result<()> {
1522 let table_scan = test_table_scan()?;
1523
1524 let plan = LogicalPlanBuilder::from(table_scan)
1525 .project(vec![col("c"), col("b"), col("a")])?
1526 .filter(col("c").gt(lit(1)))?
1527 .project(vec![col("c"), col("a"), col("b")])?
1528 .filter(col("b").gt(lit(1)))?
1529 .filter(col("a").gt(lit(1)))?
1530 .project(vec![col("a"), col("c"), col("b")])?
1531 .build()?;
1532 let expected = "Projection: test.a, test.c, test.b\
1533 \n Filter: test.a > Int32(1)\
1534 \n Filter: test.b > Int32(1)\
1535 \n Projection: test.c, test.a, test.b\
1536 \n Filter: test.c > Int32(1)\
1537 \n Projection: test.c, test.b, test.a\
1538 \n TableScan: test projection=[a, b, c]";
1539 assert_optimized_plan_equal(plan, expected)
1540 }
1541
1542 #[test]
1543 fn join_schema_trim_full_join_column_projection() -> Result<()> {
1544 let table_scan = test_table_scan()?;
1545
1546 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1547 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1548
1549 let plan = LogicalPlanBuilder::from(table_scan)
1550 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1551 .project(vec![col("a"), col("b"), col("c1")])?
1552 .build()?;
1553
1554 let expected = "Left Join: test.a = test2.c1\
1556 \n TableScan: test projection=[a, b]\
1557 \n TableScan: test2 projection=[c1]";
1558
1559 let optimized_plan = optimize(plan)?;
1560 let formatted_plan = format!("{optimized_plan}");
1561 assert_eq!(formatted_plan, expected);
1562
1563 let optimized_join = optimized_plan;
1565 assert_eq!(
1566 **optimized_join.schema(),
1567 DFSchema::new_with_metadata(
1568 vec![
1569 (
1570 Some("test".into()),
1571 Arc::new(Field::new("a", DataType::UInt32, false))
1572 ),
1573 (
1574 Some("test".into()),
1575 Arc::new(Field::new("b", DataType::UInt32, false))
1576 ),
1577 (
1578 Some("test2".into()),
1579 Arc::new(Field::new("c1", DataType::UInt32, true))
1580 ),
1581 ],
1582 HashMap::new()
1583 )?,
1584 );
1585
1586 Ok(())
1587 }
1588
1589 #[test]
1590 fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1591 let table_scan = test_table_scan()?;
1594
1595 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1596 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1597
1598 let plan = LogicalPlanBuilder::from(table_scan)
1599 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1600 .project(vec![col("a"), col("b")])?
1603 .build()?;
1604
1605 let expected = "Projection: test.a, test.b\
1607 \n Left Join: test.a = test2.c1\
1608 \n TableScan: test projection=[a, b]\
1609 \n TableScan: test2 projection=[c1]";
1610
1611 let optimized_plan = optimize(plan)?;
1612 let formatted_plan = format!("{optimized_plan}");
1613 assert_eq!(formatted_plan, expected);
1614
1615 let optimized_join = optimized_plan.inputs()[0];
1617 assert_eq!(
1618 **optimized_join.schema(),
1619 DFSchema::new_with_metadata(
1620 vec![
1621 (
1622 Some("test".into()),
1623 Arc::new(Field::new("a", DataType::UInt32, false))
1624 ),
1625 (
1626 Some("test".into()),
1627 Arc::new(Field::new("b", DataType::UInt32, false))
1628 ),
1629 (
1630 Some("test2".into()),
1631 Arc::new(Field::new("c1", DataType::UInt32, true))
1632 ),
1633 ],
1634 HashMap::new()
1635 )?,
1636 );
1637
1638 Ok(())
1639 }
1640
1641 #[test]
1642 fn join_schema_trim_using_join() -> Result<()> {
1643 let table_scan = test_table_scan()?;
1646
1647 let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
1648 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1649
1650 let plan = LogicalPlanBuilder::from(table_scan)
1651 .join_using(table2_scan, JoinType::Left, vec!["a"])?
1652 .project(vec![col("a"), col("b")])?
1653 .build()?;
1654
1655 let expected = "Projection: test.a, test.b\
1657 \n Left Join: Using test.a = test2.a\
1658 \n TableScan: test projection=[a, b]\
1659 \n TableScan: test2 projection=[a]";
1660
1661 let optimized_plan = optimize(plan)?;
1662 let formatted_plan = format!("{optimized_plan}");
1663 assert_eq!(formatted_plan, expected);
1664
1665 let optimized_join = optimized_plan.inputs()[0];
1667 assert_eq!(
1668 **optimized_join.schema(),
1669 DFSchema::new_with_metadata(
1670 vec![
1671 (
1672 Some("test".into()),
1673 Arc::new(Field::new("a", DataType::UInt32, false))
1674 ),
1675 (
1676 Some("test".into()),
1677 Arc::new(Field::new("b", DataType::UInt32, false))
1678 ),
1679 (
1680 Some("test2".into()),
1681 Arc::new(Field::new("a", DataType::UInt32, true))
1682 ),
1683 ],
1684 HashMap::new()
1685 )?,
1686 );
1687
1688 Ok(())
1689 }
1690
1691 #[test]
1692 fn cast() -> Result<()> {
1693 let table_scan = test_table_scan()?;
1694
1695 let projection = LogicalPlanBuilder::from(table_scan)
1696 .project(vec![Expr::Cast(Cast::new(
1697 Box::new(col("c")),
1698 DataType::Float64,
1699 ))])?
1700 .build()?;
1701
1702 let expected = "Projection: CAST(test.c AS Float64)\
1703 \n TableScan: test projection=[c]";
1704
1705 assert_optimized_plan_equal(projection, expected)
1706 }
1707
1708 #[test]
1709 fn table_scan_projected_schema() -> Result<()> {
1710 let table_scan = test_table_scan()?;
1711 let plan = LogicalPlanBuilder::from(test_table_scan()?)
1712 .project(vec![col("a"), col("b")])?
1713 .build()?;
1714
1715 assert_eq!(3, table_scan.schema().fields().len());
1716 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1717 assert_fields_eq(&plan, vec!["a", "b"]);
1718
1719 let expected = "TableScan: test projection=[a, b]";
1720
1721 assert_optimized_plan_equal(plan, expected)
1722 }
1723
1724 #[test]
1725 fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
1726 let table_scan = test_table_scan()?;
1727 let input_schema = table_scan.schema();
1728 assert_eq!(3, input_schema.fields().len());
1729 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1730
1731 let expr = vec![col("test.a"), col("test.b")];
1735 let plan =
1736 LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
1737
1738 assert_fields_eq(&plan, vec!["a", "b"]);
1739
1740 let expected = "TableScan: test projection=[a, b]";
1741
1742 assert_optimized_plan_equal(plan, expected)
1743 }
1744
1745 #[test]
1746 fn table_limit() -> Result<()> {
1747 let table_scan = test_table_scan()?;
1748 assert_eq!(3, table_scan.schema().fields().len());
1749 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1750
1751 let plan = LogicalPlanBuilder::from(table_scan)
1752 .project(vec![col("c"), col("a")])?
1753 .limit(0, Some(5))?
1754 .build()?;
1755
1756 assert_fields_eq(&plan, vec!["c", "a"]);
1757
1758 let expected = "Limit: skip=0, fetch=5\
1759 \n Projection: test.c, test.a\
1760 \n TableScan: test projection=[a, c]";
1761
1762 assert_optimized_plan_equal(plan, expected)
1763 }
1764
1765 #[test]
1766 fn table_scan_without_projection() -> Result<()> {
1767 let table_scan = test_table_scan()?;
1768 let plan = LogicalPlanBuilder::from(table_scan).build()?;
1769 let expected = "TableScan: test projection=[a, b, c]";
1771 assert_optimized_plan_equal(plan, expected)
1772 }
1773
1774 #[test]
1775 fn table_scan_with_literal_projection() -> Result<()> {
1776 let table_scan = test_table_scan()?;
1777 let plan = LogicalPlanBuilder::from(table_scan)
1778 .project(vec![lit(1_i64), lit(2_i64)])?
1779 .build()?;
1780 let expected = "Projection: Int64(1), Int64(2)\
1781 \n TableScan: test projection=[]";
1782 assert_optimized_plan_equal(plan, expected)
1783 }
1784
1785 #[test]
1787 fn table_unused_column() -> Result<()> {
1788 let table_scan = test_table_scan()?;
1789 assert_eq!(3, table_scan.schema().fields().len());
1790 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1791
1792 let plan = LogicalPlanBuilder::from(table_scan)
1794 .project(vec![col("c"), col("a"), col("b")])?
1795 .filter(col("c").gt(lit(1)))?
1796 .aggregate(vec![col("c")], vec![max(col("a"))])?
1797 .build()?;
1798
1799 assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
1800
1801 let plan = optimize(plan).expect("failed to optimize plan");
1802 let expected = "\
1803 Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]\
1804 \n Filter: test.c > Int32(1)\
1805 \n Projection: test.c, test.a\
1806 \n TableScan: test projection=[a, c]";
1807
1808 assert_optimized_plan_equal(plan, expected)
1809 }
1810
1811 #[test]
1813 fn table_unused_projection() -> Result<()> {
1814 let table_scan = test_table_scan()?;
1815 assert_eq!(3, table_scan.schema().fields().len());
1816 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1817
1818 let plan = LogicalPlanBuilder::from(table_scan)
1820 .project(vec![col("b")])?
1821 .project(vec![lit(1).alias("a")])?
1822 .build()?;
1823
1824 assert_fields_eq(&plan, vec!["a"]);
1825
1826 let expected = "\
1827 Projection: Int32(1) AS a\
1828 \n TableScan: test projection=[]";
1829
1830 assert_optimized_plan_equal(plan, expected)
1831 }
1832
1833 #[test]
1834 fn table_full_filter_pushdown() -> Result<()> {
1835 let schema = Schema::new(test_table_scan_fields());
1836
1837 let table_scan = table_scan_with_filters(
1838 Some("test"),
1839 &schema,
1840 None,
1841 vec![col("b").eq(lit(1))],
1842 )?
1843 .build()?;
1844 assert_eq!(3, table_scan.schema().fields().len());
1845 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1846
1847 let plan = LogicalPlanBuilder::from(table_scan)
1849 .project(vec![col("b")])?
1850 .project(vec![lit(1).alias("a")])?
1851 .build()?;
1852
1853 assert_fields_eq(&plan, vec!["a"]);
1854
1855 let expected = "\
1856 Projection: Int32(1) AS a\
1857 \n TableScan: test projection=[], full_filters=[b = Int32(1)]";
1858
1859 assert_optimized_plan_equal(plan, expected)
1860 }
1861
1862 #[test]
1864 fn test_double_optimization() -> Result<()> {
1865 let table_scan = test_table_scan()?;
1866
1867 let plan = LogicalPlanBuilder::from(table_scan)
1868 .project(vec![col("b")])?
1869 .project(vec![lit(1).alias("a")])?
1870 .build()?;
1871
1872 let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
1873 let optimized_plan2 =
1874 optimize(optimized_plan1.clone()).expect("failed to optimize plan");
1875
1876 let formatted_plan1 = format!("{optimized_plan1:?}");
1877 let formatted_plan2 = format!("{optimized_plan2:?}");
1878 assert_eq!(formatted_plan1, formatted_plan2);
1879 Ok(())
1880 }
1881
1882 #[test]
1884 fn table_unused_aggregate() -> Result<()> {
1885 let table_scan = test_table_scan()?;
1886 assert_eq!(3, table_scan.schema().fields().len());
1887 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1888
1889 let plan = LogicalPlanBuilder::from(table_scan)
1891 .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
1892 .filter(col("c").gt(lit(1)))?
1893 .project(vec![col("c"), col("a"), col("max(test.b)")])?
1894 .build()?;
1895
1896 assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
1897
1898 let expected = "Projection: test.c, test.a, max(test.b)\
1899 \n Filter: test.c > Int32(1)\
1900 \n Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]\
1901 \n TableScan: test projection=[a, b, c]";
1902
1903 assert_optimized_plan_equal(plan, expected)
1904 }
1905
1906 #[test]
1907 fn aggregate_filter_pushdown() -> Result<()> {
1908 let table_scan = test_table_scan()?;
1909 let aggr_with_filter = count_udaf()
1910 .call(vec![col("b")])
1911 .filter(col("c").gt(lit(42)))
1912 .build()?;
1913 let plan = LogicalPlanBuilder::from(table_scan)
1914 .aggregate(
1915 vec![col("a")],
1916 vec![count(col("b")), aggr_with_filter.alias("count2")],
1917 )?
1918 .build()?;
1919
1920 let expected = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
1921 \n TableScan: test projection=[a, b, c]";
1922
1923 assert_optimized_plan_equal(plan, expected)
1924 }
1925
1926 #[test]
1927 fn pushdown_through_distinct() -> Result<()> {
1928 let table_scan = test_table_scan()?;
1929
1930 let plan = LogicalPlanBuilder::from(table_scan)
1931 .project(vec![col("a"), col("b")])?
1932 .distinct()?
1933 .project(vec![col("a")])?
1934 .build()?;
1935
1936 let expected = "Projection: test.a\
1937 \n Distinct:\
1938 \n TableScan: test projection=[a, b]";
1939
1940 assert_optimized_plan_equal(plan, expected)
1941 }
1942
1943 #[test]
1944 fn test_window() -> Result<()> {
1945 let table_scan = test_table_scan()?;
1946
1947 let max1 = Expr::WindowFunction(expr::WindowFunction::new(
1948 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1949 vec![col("test.a")],
1950 ))
1951 .partition_by(vec![col("test.b")])
1952 .build()
1953 .unwrap();
1954
1955 let max2 = Expr::WindowFunction(expr::WindowFunction::new(
1956 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1957 vec![col("test.b")],
1958 ));
1959 let col1 = col(max1.schema_name().to_string());
1960 let col2 = col(max2.schema_name().to_string());
1961
1962 let plan = LogicalPlanBuilder::from(table_scan)
1963 .window(vec![max1])?
1964 .window(vec![max2])?
1965 .project(vec![col1, col2])?
1966 .build()?;
1967
1968 let expected = "Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1969 \n WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1970 \n Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1971 \n WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1972 \n TableScan: test projection=[a, b]";
1973
1974 assert_optimized_plan_equal(plan, expected)
1975 }
1976
1977 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
1978
1979 fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
1980 let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
1981 let optimized_plan =
1982 optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
1983 Ok(optimized_plan)
1984 }
1985}