1use crate::{
41 dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
42 Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
43 Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
44 Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest,
45 UserDefinedLogicalNode, Values, Window,
46};
47use datafusion_common::tree_node::TreeNodeRefContainer;
48
49use crate::expr::{Exists, InSubquery};
50use datafusion_common::tree_node::{
51 Transformed, TreeNode, TreeNodeContainer, TreeNodeIterator, TreeNodeRecursion,
52 TreeNodeRewriter, TreeNodeVisitor,
53};
54use datafusion_common::{internal_err, Result};
55
56impl TreeNode for LogicalPlan {
57 fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
58 &'n self,
59 f: F,
60 ) -> Result<TreeNodeRecursion> {
61 self.inputs().apply_ref_elements(f)
62 }
63
64 fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
73 self,
74 f: F,
75 ) -> Result<Transformed<Self>> {
76 Ok(match self {
77 LogicalPlan::Projection(Projection {
78 expr,
79 input,
80 schema,
81 }) => input.map_elements(f)?.update_data(|input| {
82 LogicalPlan::Projection(Projection {
83 expr,
84 input,
85 schema,
86 })
87 }),
88 LogicalPlan::Filter(Filter {
89 predicate,
90 input,
91 having,
92 }) => input.map_elements(f)?.update_data(|input| {
93 LogicalPlan::Filter(Filter {
94 predicate,
95 input,
96 having,
97 })
98 }),
99 LogicalPlan::Repartition(Repartition {
100 input,
101 partitioning_scheme,
102 }) => input.map_elements(f)?.update_data(|input| {
103 LogicalPlan::Repartition(Repartition {
104 input,
105 partitioning_scheme,
106 })
107 }),
108 LogicalPlan::Window(Window {
109 input,
110 window_expr,
111 schema,
112 }) => input.map_elements(f)?.update_data(|input| {
113 LogicalPlan::Window(Window {
114 input,
115 window_expr,
116 schema,
117 })
118 }),
119 LogicalPlan::Aggregate(Aggregate {
120 input,
121 group_expr,
122 aggr_expr,
123 schema,
124 }) => input.map_elements(f)?.update_data(|input| {
125 LogicalPlan::Aggregate(Aggregate {
126 input,
127 group_expr,
128 aggr_expr,
129 schema,
130 })
131 }),
132 LogicalPlan::Sort(Sort { expr, input, fetch }) => input
133 .map_elements(f)?
134 .update_data(|input| LogicalPlan::Sort(Sort { expr, input, fetch })),
135 LogicalPlan::Join(Join {
136 left,
137 right,
138 on,
139 filter,
140 join_type,
141 join_constraint,
142 schema,
143 null_equals_null,
144 }) => (left, right).map_elements(f)?.update_data(|(left, right)| {
145 LogicalPlan::Join(Join {
146 left,
147 right,
148 on,
149 filter,
150 join_type,
151 join_constraint,
152 schema,
153 null_equals_null,
154 })
155 }),
156 LogicalPlan::Limit(Limit { skip, fetch, input }) => input
157 .map_elements(f)?
158 .update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
159 LogicalPlan::Subquery(Subquery {
160 subquery,
161 outer_ref_columns,
162 }) => subquery.map_elements(f)?.update_data(|subquery| {
163 LogicalPlan::Subquery(Subquery {
164 subquery,
165 outer_ref_columns,
166 })
167 }),
168 LogicalPlan::SubqueryAlias(SubqueryAlias {
169 input,
170 alias,
171 schema,
172 }) => input.map_elements(f)?.update_data(|input| {
173 LogicalPlan::SubqueryAlias(SubqueryAlias {
174 input,
175 alias,
176 schema,
177 })
178 }),
179 LogicalPlan::Extension(extension) => rewrite_extension_inputs(extension, f)?
180 .update_data(LogicalPlan::Extension),
181 LogicalPlan::Union(Union { inputs, schema }) => inputs
182 .map_elements(f)?
183 .update_data(|inputs| LogicalPlan::Union(Union { inputs, schema })),
184 LogicalPlan::Distinct(distinct) => match distinct {
185 Distinct::All(input) => input.map_elements(f)?.update_data(Distinct::All),
186 Distinct::On(DistinctOn {
187 on_expr,
188 select_expr,
189 sort_expr,
190 input,
191 schema,
192 }) => input.map_elements(f)?.update_data(|input| {
193 Distinct::On(DistinctOn {
194 on_expr,
195 select_expr,
196 sort_expr,
197 input,
198 schema,
199 })
200 }),
201 }
202 .update_data(LogicalPlan::Distinct),
203 LogicalPlan::Explain(Explain {
204 verbose,
205 plan,
206 stringified_plans,
207 schema,
208 logical_optimization_succeeded,
209 }) => plan.map_elements(f)?.update_data(|plan| {
210 LogicalPlan::Explain(Explain {
211 verbose,
212 plan,
213 stringified_plans,
214 schema,
215 logical_optimization_succeeded,
216 })
217 }),
218 LogicalPlan::Analyze(Analyze {
219 verbose,
220 input,
221 schema,
222 }) => input.map_elements(f)?.update_data(|input| {
223 LogicalPlan::Analyze(Analyze {
224 verbose,
225 input,
226 schema,
227 })
228 }),
229 LogicalPlan::Dml(DmlStatement {
230 table_name,
231 target,
232 op,
233 input,
234 output_schema,
235 }) => input.map_elements(f)?.update_data(|input| {
236 LogicalPlan::Dml(DmlStatement {
237 table_name,
238 target,
239 op,
240 input,
241 output_schema,
242 })
243 }),
244 LogicalPlan::Copy(CopyTo {
245 input,
246 output_url,
247 partition_by,
248 file_type,
249 options,
250 }) => input.map_elements(f)?.update_data(|input| {
251 LogicalPlan::Copy(CopyTo {
252 input,
253 output_url,
254 partition_by,
255 file_type,
256 options,
257 })
258 }),
259 LogicalPlan::Ddl(ddl) => {
260 match ddl {
261 DdlStatement::CreateMemoryTable(CreateMemoryTable {
262 name,
263 constraints,
264 input,
265 if_not_exists,
266 or_replace,
267 column_defaults,
268 temporary,
269 }) => input.map_elements(f)?.update_data(|input| {
270 DdlStatement::CreateMemoryTable(CreateMemoryTable {
271 name,
272 constraints,
273 input,
274 if_not_exists,
275 or_replace,
276 column_defaults,
277 temporary,
278 })
279 }),
280 DdlStatement::CreateView(CreateView {
281 name,
282 input,
283 or_replace,
284 definition,
285 temporary,
286 }) => input.map_elements(f)?.update_data(|input| {
287 DdlStatement::CreateView(CreateView {
288 name,
289 input,
290 or_replace,
291 definition,
292 temporary,
293 })
294 }),
295 DdlStatement::CreateExternalTable(_)
297 | DdlStatement::CreateCatalogSchema(_)
298 | DdlStatement::CreateCatalog(_)
299 | DdlStatement::CreateIndex(_)
300 | DdlStatement::DropTable(_)
301 | DdlStatement::DropView(_)
302 | DdlStatement::DropCatalogSchema(_)
303 | DdlStatement::CreateFunction(_)
304 | DdlStatement::DropFunction(_) => Transformed::no(ddl),
305 }
306 .update_data(LogicalPlan::Ddl)
307 }
308 LogicalPlan::Unnest(Unnest {
309 input,
310 exec_columns: input_columns,
311 list_type_columns,
312 struct_type_columns,
313 dependency_indices,
314 schema,
315 options,
316 }) => input.map_elements(f)?.update_data(|input| {
317 LogicalPlan::Unnest(Unnest {
318 input,
319 exec_columns: input_columns,
320 dependency_indices,
321 list_type_columns,
322 struct_type_columns,
323 schema,
324 options,
325 })
326 }),
327 LogicalPlan::RecursiveQuery(RecursiveQuery {
328 name,
329 static_term,
330 recursive_term,
331 is_distinct,
332 }) => (static_term, recursive_term).map_elements(f)?.update_data(
333 |(static_term, recursive_term)| {
334 LogicalPlan::RecursiveQuery(RecursiveQuery {
335 name,
336 static_term,
337 recursive_term,
338 is_distinct,
339 })
340 },
341 ),
342 LogicalPlan::Statement(stmt) => match stmt {
343 Statement::Prepare(p) => p
344 .input
345 .map_elements(f)?
346 .update_data(|input| Statement::Prepare(Prepare { input, ..p })),
347 _ => Transformed::no(stmt),
348 }
349 .update_data(LogicalPlan::Statement),
350 LogicalPlan::TableScan { .. }
352 | LogicalPlan::EmptyRelation { .. }
353 | LogicalPlan::Values { .. }
354 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
355 })
356 }
357}
358
359fn rewrite_extension_inputs<F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>>(
365 extension: Extension,
366 f: F,
367) -> Result<Transformed<Extension>> {
368 let Extension { node } = extension;
369
370 node.inputs()
371 .into_iter()
372 .cloned()
373 .map_until_stop_and_collect(f)?
374 .map_data(|new_inputs| {
375 let exprs = node.expressions();
376 Ok(Extension {
377 node: node.with_exprs_and_inputs(exprs, new_inputs)?,
378 })
379 })
380}
381
382macro_rules! handle_transform_recursion {
385 ($F_DOWN:expr, $F_CHILD:expr, $F_UP:expr) => {{
386 $F_DOWN?
387 .transform_children(|n| {
388 n.map_subqueries($F_CHILD)?
389 .transform_sibling(|n| n.map_children($F_CHILD))
390 })?
391 .transform_parent($F_UP)
392 }};
393}
394
395impl LogicalPlan {
396 pub fn apply_expressions<F: FnMut(&Expr) -> Result<TreeNodeRecursion>>(
403 &self,
404 mut f: F,
405 ) -> Result<TreeNodeRecursion> {
406 match self {
407 LogicalPlan::Projection(Projection { expr, .. }) => expr.apply_elements(f),
408 LogicalPlan::Values(Values { values, .. }) => values.apply_elements(f),
409 LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
410 LogicalPlan::Repartition(Repartition {
411 partitioning_scheme,
412 ..
413 }) => match partitioning_scheme {
414 Partitioning::Hash(expr, _) | Partitioning::DistributeBy(expr) => {
415 expr.apply_elements(f)
416 }
417 Partitioning::RoundRobinBatch(_) => Ok(TreeNodeRecursion::Continue),
418 },
419 LogicalPlan::Window(Window { window_expr, .. }) => {
420 window_expr.apply_elements(f)
421 }
422 LogicalPlan::Aggregate(Aggregate {
423 group_expr,
424 aggr_expr,
425 ..
426 }) => (group_expr, aggr_expr).apply_ref_elements(f),
427 LogicalPlan::Join(Join { on, filter, .. }) => {
431 (on, filter).apply_ref_elements(f)
432 }
433 LogicalPlan::Sort(Sort { expr, .. }) => expr.apply_elements(f),
434 LogicalPlan::Extension(extension) => {
435 extension.node.expressions().apply_elements(f)
438 }
439 LogicalPlan::TableScan(TableScan { filters, .. }) => {
440 filters.apply_elements(f)
441 }
442 LogicalPlan::Unnest(unnest) => {
443 let columns = unnest.exec_columns.clone();
444
445 let exprs = columns
446 .iter()
447 .map(|c| Expr::Column(c.clone()))
448 .collect::<Vec<_>>();
449 exprs.apply_elements(f)
450 }
451 LogicalPlan::Distinct(Distinct::On(DistinctOn {
452 on_expr,
453 select_expr,
454 sort_expr,
455 ..
456 })) => (on_expr, select_expr, sort_expr).apply_ref_elements(f),
457 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
458 (skip, fetch).apply_ref_elements(f)
459 }
460 LogicalPlan::Statement(stmt) => match stmt {
461 Statement::Execute(Execute { parameters, .. }) => {
462 parameters.apply_elements(f)
463 }
464 _ => Ok(TreeNodeRecursion::Continue),
465 },
466 LogicalPlan::EmptyRelation(_)
468 | LogicalPlan::RecursiveQuery(_)
469 | LogicalPlan::Subquery(_)
470 | LogicalPlan::SubqueryAlias(_)
471 | LogicalPlan::Analyze(_)
472 | LogicalPlan::Explain(_)
473 | LogicalPlan::Union(_)
474 | LogicalPlan::Distinct(Distinct::All(_))
475 | LogicalPlan::Dml(_)
476 | LogicalPlan::Ddl(_)
477 | LogicalPlan::Copy(_)
478 | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue),
479 }
480 }
481
482 pub fn map_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
490 self,
491 mut f: F,
492 ) -> Result<Transformed<Self>> {
493 Ok(match self {
494 LogicalPlan::Projection(Projection {
495 expr,
496 input,
497 schema,
498 }) => expr.map_elements(f)?.update_data(|expr| {
499 LogicalPlan::Projection(Projection {
500 expr,
501 input,
502 schema,
503 })
504 }),
505 LogicalPlan::Values(Values { schema, values }) => values
506 .map_elements(f)?
507 .update_data(|values| LogicalPlan::Values(Values { schema, values })),
508 LogicalPlan::Filter(Filter {
509 predicate,
510 input,
511 having,
512 }) => f(predicate)?.update_data(|predicate| {
513 LogicalPlan::Filter(Filter {
514 predicate,
515 input,
516 having,
517 })
518 }),
519 LogicalPlan::Repartition(Repartition {
520 input,
521 partitioning_scheme,
522 }) => match partitioning_scheme {
523 Partitioning::Hash(expr, usize) => expr
524 .map_elements(f)?
525 .update_data(|expr| Partitioning::Hash(expr, usize)),
526 Partitioning::DistributeBy(expr) => expr
527 .map_elements(f)?
528 .update_data(Partitioning::DistributeBy),
529 Partitioning::RoundRobinBatch(_) => Transformed::no(partitioning_scheme),
530 }
531 .update_data(|partitioning_scheme| {
532 LogicalPlan::Repartition(Repartition {
533 input,
534 partitioning_scheme,
535 })
536 }),
537 LogicalPlan::Window(Window {
538 input,
539 window_expr,
540 schema,
541 }) => window_expr.map_elements(f)?.update_data(|window_expr| {
542 LogicalPlan::Window(Window {
543 input,
544 window_expr,
545 schema,
546 })
547 }),
548 LogicalPlan::Aggregate(Aggregate {
549 input,
550 group_expr,
551 aggr_expr,
552 schema,
553 }) => (group_expr, aggr_expr).map_elements(f)?.update_data(
554 |(group_expr, aggr_expr)| {
555 LogicalPlan::Aggregate(Aggregate {
556 input,
557 group_expr,
558 aggr_expr,
559 schema,
560 })
561 },
562 ),
563
564 LogicalPlan::Join(Join {
568 left,
569 right,
570 on,
571 filter,
572 join_type,
573 join_constraint,
574 schema,
575 null_equals_null,
576 }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
577 LogicalPlan::Join(Join {
578 left,
579 right,
580 on,
581 filter,
582 join_type,
583 join_constraint,
584 schema,
585 null_equals_null,
586 })
587 }),
588 LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
589 .map_elements(f)?
590 .update_data(|expr| LogicalPlan::Sort(Sort { expr, input, fetch })),
591 LogicalPlan::Extension(Extension { node }) => {
592 let exprs = node.expressions().map_elements(f)?;
595 let plan = LogicalPlan::Extension(Extension {
596 node: UserDefinedLogicalNode::with_exprs_and_inputs(
597 node.as_ref(),
598 exprs.data,
599 node.inputs().into_iter().cloned().collect::<Vec<_>>(),
600 )?,
601 });
602 Transformed::new(plan, exprs.transformed, exprs.tnr)
603 }
604 LogicalPlan::TableScan(TableScan {
605 table_name,
606 source,
607 projection,
608 projected_schema,
609 filters,
610 fetch,
611 }) => filters.map_elements(f)?.update_data(|filters| {
612 LogicalPlan::TableScan(TableScan {
613 table_name,
614 source,
615 projection,
616 projected_schema,
617 filters,
618 fetch,
619 })
620 }),
621 LogicalPlan::Distinct(Distinct::On(DistinctOn {
622 on_expr,
623 select_expr,
624 sort_expr,
625 input,
626 schema,
627 })) => (on_expr, select_expr, sort_expr)
628 .map_elements(f)?
629 .update_data(|(on_expr, select_expr, sort_expr)| {
630 LogicalPlan::Distinct(Distinct::On(DistinctOn {
631 on_expr,
632 select_expr,
633 sort_expr,
634 input,
635 schema,
636 }))
637 }),
638 LogicalPlan::Limit(Limit { skip, fetch, input }) => {
639 (skip, fetch).map_elements(f)?.update_data(|(skip, fetch)| {
640 LogicalPlan::Limit(Limit { skip, fetch, input })
641 })
642 }
643 LogicalPlan::Statement(stmt) => match stmt {
644 Statement::Execute(e) => {
645 e.parameters.map_elements(f)?.update_data(|parameters| {
646 Statement::Execute(Execute { parameters, ..e })
647 })
648 }
649 _ => Transformed::no(stmt),
650 }
651 .update_data(LogicalPlan::Statement),
652 LogicalPlan::EmptyRelation(_)
654 | LogicalPlan::Unnest(_)
655 | LogicalPlan::RecursiveQuery(_)
656 | LogicalPlan::Subquery(_)
657 | LogicalPlan::SubqueryAlias(_)
658 | LogicalPlan::Analyze(_)
659 | LogicalPlan::Explain(_)
660 | LogicalPlan::Union(_)
661 | LogicalPlan::Distinct(Distinct::All(_))
662 | LogicalPlan::Dml(_)
663 | LogicalPlan::Ddl(_)
664 | LogicalPlan::Copy(_)
665 | LogicalPlan::DescribeTable(_) => Transformed::no(self),
666 })
667 }
668
669 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
672 pub fn visit_with_subqueries<V: for<'n> TreeNodeVisitor<'n, Node = Self>>(
673 &self,
674 visitor: &mut V,
675 ) -> Result<TreeNodeRecursion> {
676 visitor
677 .f_down(self)?
678 .visit_children(|| {
679 self.apply_subqueries(|c| c.visit_with_subqueries(visitor))?
680 .visit_sibling(|| {
681 self.apply_children(|c| c.visit_with_subqueries(visitor))
682 })
683 })?
684 .visit_parent(|| visitor.f_up(self))
685 }
686
687 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
691 pub fn rewrite_with_subqueries<R: TreeNodeRewriter<Node = Self>>(
692 self,
693 rewriter: &mut R,
694 ) -> Result<Transformed<Self>> {
695 handle_transform_recursion!(
696 rewriter.f_down(self),
697 |c| c.rewrite_with_subqueries(rewriter),
698 |n| rewriter.f_up(n)
699 )
700 }
701
702 pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
706 &self,
707 mut f: F,
708 ) -> Result<TreeNodeRecursion> {
709 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
710 fn apply_with_subqueries_impl<
711 F: FnMut(&LogicalPlan) -> Result<TreeNodeRecursion>,
712 >(
713 node: &LogicalPlan,
714 f: &mut F,
715 ) -> Result<TreeNodeRecursion> {
716 f(node)?.visit_children(|| {
717 node.apply_subqueries(|c| apply_with_subqueries_impl(c, f))?
718 .visit_sibling(|| {
719 node.apply_children(|c| apply_with_subqueries_impl(c, f))
720 })
721 })
722 }
723
724 apply_with_subqueries_impl(self, &mut f)
725 }
726
727 pub fn transform_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
731 self,
732 f: F,
733 ) -> Result<Transformed<Self>> {
734 self.transform_up_with_subqueries(f)
735 }
736
737 pub fn transform_down_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
741 self,
742 mut f: F,
743 ) -> Result<Transformed<Self>> {
744 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
745 fn transform_down_with_subqueries_impl<
746 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
747 >(
748 node: LogicalPlan,
749 f: &mut F,
750 ) -> Result<Transformed<LogicalPlan>> {
751 f(node)?.transform_children(|n| {
752 n.map_subqueries(|c| transform_down_with_subqueries_impl(c, f))?
753 .transform_sibling(|n| {
754 n.map_children(|c| transform_down_with_subqueries_impl(c, f))
755 })
756 })
757 }
758
759 transform_down_with_subqueries_impl(self, &mut f)
760 }
761
762 pub fn transform_up_with_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
766 self,
767 mut f: F,
768 ) -> Result<Transformed<Self>> {
769 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
770 fn transform_up_with_subqueries_impl<
771 F: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
772 >(
773 node: LogicalPlan,
774 f: &mut F,
775 ) -> Result<Transformed<LogicalPlan>> {
776 node.map_subqueries(|c| transform_up_with_subqueries_impl(c, f))?
777 .transform_sibling(|n| {
778 n.map_children(|c| transform_up_with_subqueries_impl(c, f))
779 })?
780 .transform_parent(f)
781 }
782
783 transform_up_with_subqueries_impl(self, &mut f)
784 }
785
786 pub fn transform_down_up_with_subqueries<
790 FD: FnMut(Self) -> Result<Transformed<Self>>,
791 FU: FnMut(Self) -> Result<Transformed<Self>>,
792 >(
793 self,
794 mut f_down: FD,
795 mut f_up: FU,
796 ) -> Result<Transformed<Self>> {
797 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
798 fn transform_down_up_with_subqueries_impl<
799 FD: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
800 FU: FnMut(LogicalPlan) -> Result<Transformed<LogicalPlan>>,
801 >(
802 node: LogicalPlan,
803 f_down: &mut FD,
804 f_up: &mut FU,
805 ) -> Result<Transformed<LogicalPlan>> {
806 handle_transform_recursion!(
807 f_down(node),
808 |c| transform_down_up_with_subqueries_impl(c, f_down, f_up),
809 f_up
810 )
811 }
812
813 transform_down_up_with_subqueries_impl(self, &mut f_down, &mut f_up)
814 }
815
816 pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
820 &self,
821 mut f: F,
822 ) -> Result<TreeNodeRecursion> {
823 self.apply_expressions(|expr| {
824 expr.apply(|expr| match expr {
825 Expr::Exists(Exists { subquery, .. })
826 | Expr::InSubquery(InSubquery { subquery, .. })
827 | Expr::ScalarSubquery(subquery) => {
828 f(&LogicalPlan::Subquery(subquery.clone()))
832 }
833 _ => Ok(TreeNodeRecursion::Continue),
834 })
835 })
836 }
837
838 pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
843 self,
844 mut f: F,
845 ) -> Result<Transformed<Self>> {
846 self.map_expressions(|expr| {
847 expr.transform_down(|expr| match expr {
848 Expr::Exists(Exists { subquery, negated }) => {
849 f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
850 LogicalPlan::Subquery(subquery) => {
851 Ok(Expr::Exists(Exists { subquery, negated }))
852 }
853 _ => internal_err!("Transformation should return Subquery"),
854 })
855 }
856 Expr::InSubquery(InSubquery {
857 expr,
858 subquery,
859 negated,
860 }) => f(LogicalPlan::Subquery(subquery))?.map_data(|s| match s {
861 LogicalPlan::Subquery(subquery) => Ok(Expr::InSubquery(InSubquery {
862 expr,
863 subquery,
864 negated,
865 })),
866 _ => internal_err!("Transformation should return Subquery"),
867 }),
868 Expr::ScalarSubquery(subquery) => f(LogicalPlan::Subquery(subquery))?
869 .map_data(|s| match s {
870 LogicalPlan::Subquery(subquery) => {
871 Ok(Expr::ScalarSubquery(subquery))
872 }
873 _ => internal_err!("Transformation should return Subquery"),
874 }),
875 _ => Ok(Transformed::no(expr)),
876 })
877 })
878 }
879}