1use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{internal_err, plan_err, Column, Result};
31use datafusion_expr::expr::{Exists, InSubquery};
32use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
33use datafusion_expr::logical_plan::{JoinType, Subquery};
34use datafusion_expr::utils::{conjunction, split_conjunction_owned};
35use datafusion_expr::{
36 exists, in_subquery, lit, not, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
37 LogicalPlan, LogicalPlanBuilder, Operator,
38};
39
40use log::debug;
41
42#[derive(Default, Debug)]
44pub struct DecorrelatePredicateSubquery {}
45
46impl DecorrelatePredicateSubquery {
47 #[allow(missing_docs)]
48 pub fn new() -> Self {
49 Self::default()
50 }
51}
52
53impl OptimizerRule for DecorrelatePredicateSubquery {
54 fn supports_rewrite(&self) -> bool {
55 true
56 }
57
58 fn rewrite(
59 &self,
60 plan: LogicalPlan,
61 config: &dyn OptimizerConfig,
62 ) -> Result<Transformed<LogicalPlan>> {
63 let plan = plan
64 .map_subqueries(|subquery| {
65 subquery.transform_down(|p| self.rewrite(p, config))
66 })?
67 .data;
68
69 let LogicalPlan::Filter(filter) = plan else {
70 return Ok(Transformed::no(plan));
71 };
72
73 if !has_subquery(&filter.predicate) {
74 return Ok(Transformed::no(LogicalPlan::Filter(filter)));
75 }
76
77 let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
78 split_conjunction_owned(filter.predicate)
79 .into_iter()
80 .partition(has_subquery);
81
82 if with_subqueries.is_empty() {
83 return internal_err!(
84 "can not find expected subqueries in DecorrelatePredicateSubquery"
85 );
86 }
87
88 let mut cur_input = Arc::unwrap_or_clone(filter.input);
90 for subquery_expr in with_subqueries {
91 match extract_subquery_info(subquery_expr) {
92 SubqueryPredicate::Top(subquery) => {
94 match build_join_top(&subquery, &cur_input, config.alias_generator())?
95 {
96 Some(plan) => cur_input = plan,
97 None => other_exprs.push(subquery.expr()),
99 }
100 }
101 SubqueryPredicate::Embedded(expr) => {
103 let (plan, expr_without_subqueries) =
104 rewrite_inner_subqueries(cur_input, expr, config)?;
105 cur_input = plan;
106 other_exprs.push(expr_without_subqueries);
107 }
108 }
109 }
110
111 let expr = conjunction(other_exprs);
112 if let Some(expr) = expr {
113 let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
114 cur_input = LogicalPlan::Filter(new_filter);
115 }
116 Ok(Transformed::yes(cur_input))
117 }
118
119 fn name(&self) -> &str {
120 "decorrelate_predicate_subquery"
121 }
122
123 fn apply_order(&self) -> Option<ApplyOrder> {
124 Some(ApplyOrder::TopDown)
125 }
126}
127
128fn rewrite_inner_subqueries(
129 outer: LogicalPlan,
130 expr: Expr,
131 config: &dyn OptimizerConfig,
132) -> Result<(LogicalPlan, Expr)> {
133 let mut cur_input = outer;
134 let alias = config.alias_generator();
135 let expr_without_subqueries = expr.transform(|e| match e {
136 Expr::Exists(Exists {
137 subquery: Subquery { subquery, .. },
138 negated,
139 }) => match mark_join(&cur_input, Arc::clone(&subquery), None, negated, alias)? {
140 Some((plan, exists_expr)) => {
141 cur_input = plan;
142 Ok(Transformed::yes(exists_expr))
143 }
144 None if negated => Ok(Transformed::no(not_exists(subquery))),
145 None => Ok(Transformed::no(exists(subquery))),
146 },
147 Expr::InSubquery(InSubquery {
148 expr,
149 subquery: Subquery { subquery, .. },
150 negated,
151 }) => {
152 let in_predicate = subquery
153 .head_output_expr()?
154 .map_or(plan_err!("single expression required."), |output_expr| {
155 Ok(Expr::eq(*expr.clone(), output_expr))
156 })?;
157 match mark_join(
158 &cur_input,
159 Arc::clone(&subquery),
160 Some(in_predicate),
161 negated,
162 alias,
163 )? {
164 Some((plan, exists_expr)) => {
165 cur_input = plan;
166 Ok(Transformed::yes(exists_expr))
167 }
168 None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
169 None => Ok(Transformed::no(in_subquery(*expr, subquery))),
170 }
171 }
172 _ => Ok(Transformed::no(e)),
173 })?;
174 Ok((cur_input, expr_without_subqueries.data))
175}
176
177enum SubqueryPredicate {
178 Top(SubqueryInfo),
181 Embedded(Expr),
184}
185
186fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
187 match expr {
188 Expr::Not(not_expr) => match *not_expr {
189 Expr::InSubquery(InSubquery {
190 expr,
191 subquery,
192 negated,
193 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
194 subquery, *expr, !negated,
195 )),
196 Expr::Exists(Exists { subquery, negated }) => {
197 SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
198 }
199 expr => SubqueryPredicate::Embedded(not(expr)),
200 },
201 Expr::InSubquery(InSubquery {
202 expr,
203 subquery,
204 negated,
205 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
206 subquery, *expr, negated,
207 )),
208 Expr::Exists(Exists { subquery, negated }) => {
209 SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
210 }
211 expr => SubqueryPredicate::Embedded(expr),
212 }
213}
214
215fn has_subquery(expr: &Expr) -> bool {
216 expr.exists(|e| match e {
217 Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
218 _ => Ok(false),
219 })
220 .unwrap()
221}
222
223fn build_join_top(
254 query_info: &SubqueryInfo,
255 left: &LogicalPlan,
256 alias: &Arc<AliasGenerator>,
257) -> Result<Option<LogicalPlan>> {
258 let where_in_expr_opt = &query_info.where_in_expr;
259 let in_predicate_opt = where_in_expr_opt
260 .clone()
261 .map(|where_in_expr| {
262 query_info
263 .query
264 .subquery
265 .head_output_expr()?
266 .map_or(plan_err!("single expression required."), |expr| {
267 Ok(Expr::eq(where_in_expr, expr))
268 })
269 })
270 .map_or(Ok(None), |v| v.map(Some))?;
271
272 let join_type = match query_info.negated {
273 true => JoinType::LeftAnti,
274 false => JoinType::LeftSemi,
275 };
276 let subquery = query_info.query.subquery.as_ref();
277 let subquery_alias = alias.next("__correlated_sq");
278 build_join(left, subquery, in_predicate_opt, join_type, subquery_alias)
279}
280
281fn mark_join(
297 left: &LogicalPlan,
298 subquery: Arc<LogicalPlan>,
299 in_predicate_opt: Option<Expr>,
300 negated: bool,
301 alias_generator: &Arc<AliasGenerator>,
302) -> Result<Option<(LogicalPlan, Expr)>> {
303 let alias = alias_generator.next("__correlated_sq");
304
305 let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
306 let exists_expr = if negated { !exists_col } else { exists_col };
307
308 Ok(
309 build_join(left, &subquery, in_predicate_opt, JoinType::LeftMark, alias)?
310 .map(|plan| (plan, exists_expr)),
311 )
312}
313
314fn build_join(
315 left: &LogicalPlan,
316 subquery: &LogicalPlan,
317 in_predicate_opt: Option<Expr>,
318 join_type: JoinType,
319 alias: String,
320) -> Result<Option<LogicalPlan>> {
321 let mut pull_up = PullUpCorrelatedExpr::new()
322 .with_in_predicate_opt(in_predicate_opt.clone())
323 .with_exists_sub_query(in_predicate_opt.is_none());
324
325 let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
326 if !pull_up.can_pull_up {
327 return Ok(None);
328 }
329
330 let sub_query_alias = LogicalPlanBuilder::from(new_plan)
331 .alias(alias.to_string())?
332 .build()?;
333 let mut all_correlated_cols = BTreeSet::new();
334 pull_up
335 .correlated_subquery_cols_map
336 .values()
337 .for_each(|cols| all_correlated_cols.extend(cols.clone()));
338
339 let join_filter_opt = conjunction(pull_up.join_filters)
341 .map_or(Ok(None), |filter| {
342 replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
343 })?;
344
345 let join_filter = match (join_filter_opt, in_predicate_opt) {
346 (
347 Some(join_filter),
348 Some(Expr::BinaryExpr(BinaryExpr {
349 left,
350 op: Operator::Eq,
351 right,
352 })),
353 ) => {
354 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
355 let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
356 in_predicate.and(join_filter)
357 }
358 (Some(join_filter), _) => join_filter,
359 (
360 _,
361 Some(Expr::BinaryExpr(BinaryExpr {
362 left,
363 op: Operator::Eq,
364 right,
365 })),
366 ) => {
367 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
368 let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
369 in_predicate
370 }
371 (None, None) => lit(true),
372 _ => return Ok(None),
373 };
374 let new_plan = LogicalPlanBuilder::from(left.clone())
376 .join_on(sub_query_alias, join_type, Some(join_filter))?
377 .build()?;
378 debug!(
379 "predicate subquery optimized:\n{}",
380 new_plan.display_indent()
381 );
382 Ok(Some(new_plan))
383}
384
385#[derive(Debug)]
386struct SubqueryInfo {
387 query: Subquery,
388 where_in_expr: Option<Expr>,
389 negated: bool,
390}
391
392impl SubqueryInfo {
393 pub fn new(query: Subquery, negated: bool) -> Self {
394 Self {
395 query,
396 where_in_expr: None,
397 negated,
398 }
399 }
400
401 pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
402 Self {
403 query,
404 where_in_expr: Some(expr),
405 negated,
406 }
407 }
408
409 pub fn expr(self) -> Expr {
410 match self.where_in_expr {
411 Some(expr) => match self.negated {
412 true => not_in_subquery(expr, self.query.subquery),
413 false => in_subquery(expr, self.query.subquery),
414 },
415 None => match self.negated {
416 true => not_exists(self.query.subquery),
417 false => exists(self.query.subquery),
418 },
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use std::ops::Add;
426
427 use super::*;
428 use crate::test::*;
429
430 use arrow::datatypes::{DataType, Field, Schema};
431 use datafusion_expr::builder::table_source;
432 use datafusion_expr::{and, binary_expr, col, lit, not, out_ref_col, table_scan};
433
434 fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
435 assert_optimized_plan_eq_display_indent(
436 Arc::new(DecorrelatePredicateSubquery::new()),
437 plan,
438 expected,
439 );
440 Ok(())
441 }
442
443 fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
444 let table_scan = test_table_scan_with_name(name)?;
445 Ok(Arc::new(
446 LogicalPlanBuilder::from(table_scan)
447 .project(vec![col("c")])?
448 .build()?,
449 ))
450 }
451
452 #[test]
454 fn in_subquery_multiple() -> Result<()> {
455 let table_scan = test_table_scan()?;
456 let plan = LogicalPlanBuilder::from(table_scan)
457 .filter(and(
458 in_subquery(col("c"), test_subquery_with_name("sq_1")?),
459 in_subquery(col("b"), test_subquery_with_name("sq_2")?),
460 ))?
461 .project(vec![col("test.b")])?
462 .build()?;
463
464 let expected = "Projection: test.b [b:UInt32]\
465 \n LeftSemi Join: Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
466 \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
467 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
468 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
469 \n Projection: sq_1.c [c:UInt32]\
470 \n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
471 \n SubqueryAlias: __correlated_sq_2 [c:UInt32]\
472 \n Projection: sq_2.c [c:UInt32]\
473 \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
474 assert_optimized_plan_equal(plan, expected)
475 }
476
477 #[test]
479 fn in_subquery_with_and_filters() -> Result<()> {
480 let table_scan = test_table_scan()?;
481 let plan = LogicalPlanBuilder::from(table_scan)
482 .filter(and(
483 in_subquery(col("c"), test_subquery_with_name("sq")?),
484 and(
485 binary_expr(col("a"), Operator::Eq, lit(1_u32)),
486 binary_expr(col("b"), Operator::Lt, lit(30_u32)),
487 ),
488 ))?
489 .project(vec![col("test.b")])?
490 .build()?;
491
492 let expected = "Projection: test.b [b:UInt32]\
493 \n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
494 \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
495 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
496 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
497 \n Projection: sq.c [c:UInt32]\
498 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
499
500 assert_optimized_plan_equal(plan, expected)
501 }
502
503 #[test]
505 fn in_subquery_nested() -> Result<()> {
506 let table_scan = test_table_scan()?;
507
508 let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
509 .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
510 .project(vec![col("a")])?
511 .build()?;
512
513 let plan = LogicalPlanBuilder::from(table_scan)
514 .filter(in_subquery(col("b"), Arc::new(subquery)))?
515 .project(vec![col("test.b")])?
516 .build()?;
517
518 let expected = "Projection: test.b [b:UInt32]\
519 \n LeftSemi Join: Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
520 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
521 \n SubqueryAlias: __correlated_sq_2 [a:UInt32]\
522 \n Projection: sq.a [a:UInt32]\
523 \n LeftSemi Join: Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
524 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
525 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
526 \n Projection: sq_nested.c [c:UInt32]\
527 \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
528
529 assert_optimized_plan_equal(plan, expected)
530 }
531
532 #[test]
535 fn multiple_subqueries() -> Result<()> {
536 let orders = Arc::new(
537 LogicalPlanBuilder::from(scan_tpch_table("orders"))
538 .filter(
539 col("orders.o_custkey")
540 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
541 )?
542 .project(vec![col("orders.o_custkey")])?
543 .build()?,
544 );
545 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
546 .filter(
547 in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
548 .and(in_subquery(col("customer.c_custkey"), orders)),
549 )?
550 .project(vec![col("customer.c_custkey")])?
551 .build()?;
552 debug!("plan to optimize:\n{}", plan.display_indent());
553
554 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
555 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
556 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
557 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
558 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
559 \n Projection: orders.o_custkey [o_custkey:Int64]\
560 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
561 \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
562 \n Projection: orders.o_custkey [o_custkey:Int64]\
563 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
564
565 assert_optimized_plan_eq_display_indent(
566 Arc::new(DecorrelatePredicateSubquery::new()),
567 plan,
568 expected,
569 );
570 Ok(())
571 }
572
573 #[test]
576 fn recursive_subqueries() -> Result<()> {
577 let lineitem = Arc::new(
578 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
579 .filter(
580 col("lineitem.l_orderkey")
581 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
582 )?
583 .project(vec![col("lineitem.l_orderkey")])?
584 .build()?,
585 );
586
587 let orders = Arc::new(
588 LogicalPlanBuilder::from(scan_tpch_table("orders"))
589 .filter(
590 in_subquery(col("orders.o_orderkey"), lineitem).and(
591 col("orders.o_custkey")
592 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
593 ),
594 )?
595 .project(vec![col("orders.o_custkey")])?
596 .build()?,
597 );
598
599 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
600 .filter(in_subquery(col("customer.c_custkey"), orders))?
601 .project(vec![col("customer.c_custkey")])?
602 .build()?;
603
604 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
605 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
606 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
607 \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
608 \n Projection: orders.o_custkey [o_custkey:Int64]\
609 \n LeftSemi Join: Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
610 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
611 \n SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]\
612 \n Projection: lineitem.l_orderkey [l_orderkey:Int64]\
613 \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
614
615 assert_optimized_plan_eq_display_indent(
616 Arc::new(DecorrelatePredicateSubquery::new()),
617 plan,
618 expected,
619 );
620 Ok(())
621 }
622
623 #[test]
625 fn in_subquery_with_subquery_filters() -> Result<()> {
626 let sq = Arc::new(
627 LogicalPlanBuilder::from(scan_tpch_table("orders"))
628 .filter(
629 out_ref_col(DataType::Int64, "customer.c_custkey")
630 .eq(col("orders.o_custkey"))
631 .and(col("o_orderkey").eq(lit(1))),
632 )?
633 .project(vec![col("orders.o_custkey")])?
634 .build()?,
635 );
636
637 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
638 .filter(in_subquery(col("customer.c_custkey"), sq))?
639 .project(vec![col("customer.c_custkey")])?
640 .build()?;
641
642 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
643 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
644 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
645 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
646 \n Projection: orders.o_custkey [o_custkey:Int64]\
647 \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
648 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
649
650 assert_optimized_plan_eq_display_indent(
651 Arc::new(DecorrelatePredicateSubquery::new()),
652 plan,
653 expected,
654 );
655 Ok(())
656 }
657
658 #[test]
660 fn in_subquery_no_cols() -> Result<()> {
661 let sq = Arc::new(
662 LogicalPlanBuilder::from(scan_tpch_table("orders"))
663 .filter(
664 out_ref_col(DataType::Int64, "customer.c_custkey")
665 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
666 )?
667 .project(vec![col("orders.o_custkey")])?
668 .build()?,
669 );
670
671 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
672 .filter(in_subquery(col("customer.c_custkey"), sq))?
673 .project(vec![col("customer.c_custkey")])?
674 .build()?;
675
676 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
677 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
678 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
679 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
680 \n Projection: orders.o_custkey [o_custkey:Int64]\
681 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
682
683 assert_optimized_plan_eq_display_indent(
684 Arc::new(DecorrelatePredicateSubquery::new()),
685 plan,
686 expected,
687 );
688 Ok(())
689 }
690
691 #[test]
693 fn in_subquery_with_no_correlated_cols() -> Result<()> {
694 let sq = Arc::new(
695 LogicalPlanBuilder::from(scan_tpch_table("orders"))
696 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
697 .project(vec![col("orders.o_custkey")])?
698 .build()?,
699 );
700
701 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
702 .filter(in_subquery(col("customer.c_custkey"), sq))?
703 .project(vec![col("customer.c_custkey")])?
704 .build()?;
705
706 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
707 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
708 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
709 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
710 \n Projection: orders.o_custkey [o_custkey:Int64]\
711 \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
712 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
713
714 assert_optimized_plan_eq_display_indent(
715 Arc::new(DecorrelatePredicateSubquery::new()),
716 plan,
717 expected,
718 );
719 Ok(())
720 }
721
722 #[test]
724 fn in_subquery_where_not_eq() -> Result<()> {
725 let sq = Arc::new(
726 LogicalPlanBuilder::from(scan_tpch_table("orders"))
727 .filter(
728 out_ref_col(DataType::Int64, "customer.c_custkey")
729 .not_eq(col("orders.o_custkey")),
730 )?
731 .project(vec![col("orders.o_custkey")])?
732 .build()?,
733 );
734
735 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
736 .filter(in_subquery(col("customer.c_custkey"), sq))?
737 .project(vec![col("customer.c_custkey")])?
738 .build()?;
739
740 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
741 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
742 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
743 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
744 \n Projection: orders.o_custkey [o_custkey:Int64]\
745 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
746
747 assert_optimized_plan_eq_display_indent(
748 Arc::new(DecorrelatePredicateSubquery::new()),
749 plan,
750 expected,
751 );
752 Ok(())
753 }
754
755 #[test]
757 fn in_subquery_where_less_than() -> Result<()> {
758 let sq = Arc::new(
759 LogicalPlanBuilder::from(scan_tpch_table("orders"))
760 .filter(
761 out_ref_col(DataType::Int64, "customer.c_custkey")
762 .lt(col("orders.o_custkey")),
763 )?
764 .project(vec![col("orders.o_custkey")])?
765 .build()?,
766 );
767
768 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
769 .filter(in_subquery(col("customer.c_custkey"), sq))?
770 .project(vec![col("customer.c_custkey")])?
771 .build()?;
772
773 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
774 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
775 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
776 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
777 \n Projection: orders.o_custkey [o_custkey:Int64]\
778 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
779
780 assert_optimized_plan_eq_display_indent(
781 Arc::new(DecorrelatePredicateSubquery::new()),
782 plan,
783 expected,
784 );
785 Ok(())
786 }
787
788 #[test]
790 fn in_subquery_with_subquery_disjunction() -> Result<()> {
791 let sq = Arc::new(
792 LogicalPlanBuilder::from(scan_tpch_table("orders"))
793 .filter(
794 out_ref_col(DataType::Int64, "customer.c_custkey")
795 .eq(col("orders.o_custkey"))
796 .or(col("o_orderkey").eq(lit(1))),
797 )?
798 .project(vec![col("orders.o_custkey")])?
799 .build()?,
800 );
801
802 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
803 .filter(in_subquery(col("customer.c_custkey"), sq))?
804 .project(vec![col("customer.c_custkey")])?
805 .build()?;
806
807 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
808 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]\
809 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
810 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]\
811 \n Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]\
812 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
813
814 assert_optimized_plan_eq_display_indent(
815 Arc::new(DecorrelatePredicateSubquery::new()),
816 plan,
817 expected,
818 );
819
820 Ok(())
821 }
822
823 #[test]
825 fn in_subquery_no_projection() -> Result<()> {
826 let sq = Arc::new(
827 LogicalPlanBuilder::from(scan_tpch_table("orders"))
828 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
829 .build()?,
830 );
831
832 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
833 .filter(in_subquery(col("customer.c_custkey"), sq))?
834 .project(vec![col("customer.c_custkey")])?
835 .build()?;
836
837 let expected = "Invalid (non-executable) plan after Analyzer\
839 \ncaused by\
840 \nError during planning: InSubquery should only return one column, but found 4";
841 assert_analyzer_check_err(vec![], plan, expected);
842
843 Ok(())
844 }
845
846 #[test]
848 fn in_subquery_join_expr() -> Result<()> {
849 let sq = Arc::new(
850 LogicalPlanBuilder::from(scan_tpch_table("orders"))
851 .filter(
852 out_ref_col(DataType::Int64, "customer.c_custkey")
853 .eq(col("orders.o_custkey")),
854 )?
855 .project(vec![col("orders.o_custkey")])?
856 .build()?,
857 );
858
859 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
860 .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
861 .project(vec![col("customer.c_custkey")])?
862 .build()?;
863
864 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
865 \n LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
866 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
867 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
868 \n Projection: orders.o_custkey [o_custkey:Int64]\
869 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
870
871 assert_optimized_plan_eq_display_indent(
872 Arc::new(DecorrelatePredicateSubquery::new()),
873 plan,
874 expected,
875 );
876 Ok(())
877 }
878
879 #[test]
881 fn in_subquery_project_expr() -> Result<()> {
882 let sq = Arc::new(
883 LogicalPlanBuilder::from(scan_tpch_table("orders"))
884 .filter(
885 out_ref_col(DataType::Int64, "customer.c_custkey")
886 .eq(col("orders.o_custkey")),
887 )?
888 .project(vec![col("orders.o_custkey").add(lit(1))])?
889 .build()?,
890 );
891
892 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
893 .filter(in_subquery(col("customer.c_custkey"), sq))?
894 .project(vec![col("customer.c_custkey")])?
895 .build()?;
896
897 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
898 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
899 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
900 \n SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
901 \n Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
902 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
903
904 assert_optimized_plan_eq_display_indent(
905 Arc::new(DecorrelatePredicateSubquery::new()),
906 plan,
907 expected,
908 );
909 Ok(())
910 }
911
912 #[test]
914 fn in_subquery_multi_col() -> Result<()> {
915 let sq = Arc::new(
916 LogicalPlanBuilder::from(scan_tpch_table("orders"))
917 .filter(
918 out_ref_col(DataType::Int64, "customer.c_custkey")
919 .eq(col("orders.o_custkey")),
920 )?
921 .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
922 .build()?,
923 );
924
925 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
926 .filter(
927 in_subquery(col("customer.c_custkey"), sq)
928 .and(col("c_custkey").eq(lit(1))),
929 )?
930 .project(vec![col("customer.c_custkey")])?
931 .build()?;
932
933 let expected = "Invalid (non-executable) plan after Analyzer\
934 \ncaused by\
935 \nError during planning: InSubquery should only return one column";
936 assert_analyzer_check_err(vec![], plan, expected);
937
938 Ok(())
939 }
940
941 #[test]
943 fn should_support_additional_filters() -> Result<()> {
944 let sq = Arc::new(
945 LogicalPlanBuilder::from(scan_tpch_table("orders"))
946 .filter(
947 out_ref_col(DataType::Int64, "customer.c_custkey")
948 .eq(col("orders.o_custkey")),
949 )?
950 .project(vec![col("orders.o_custkey")])?
951 .build()?,
952 );
953
954 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
955 .filter(
956 in_subquery(col("customer.c_custkey"), sq)
957 .and(col("c_custkey").eq(lit(1))),
958 )?
959 .project(vec![col("customer.c_custkey")])?
960 .build()?;
961
962 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
963 \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
964 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
965 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
966 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
967 \n Projection: orders.o_custkey [o_custkey:Int64]\
968 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
969
970 assert_optimized_plan_eq_display_indent(
971 Arc::new(DecorrelatePredicateSubquery::new()),
972 plan,
973 expected,
974 );
975 Ok(())
976 }
977
978 #[test]
980 fn in_subquery_correlated() -> Result<()> {
981 let sq = Arc::new(
982 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
983 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
984 .project(vec![col("c")])?
985 .build()?,
986 );
987
988 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
989 .filter(in_subquery(col("c"), sq))?
990 .project(vec![col("test.b")])?
991 .build()?;
992
993 let expected = "Projection: test.b [b:UInt32]\
994 \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
995 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
996 \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
997 \n Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
998 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
999
1000 assert_optimized_plan_eq_display_indent(
1001 Arc::new(DecorrelatePredicateSubquery::new()),
1002 plan,
1003 expected,
1004 );
1005 Ok(())
1006 }
1007
1008 #[test]
1010 fn in_subquery_simple() -> Result<()> {
1011 let table_scan = test_table_scan()?;
1012 let plan = LogicalPlanBuilder::from(table_scan)
1013 .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1014 .project(vec![col("test.b")])?
1015 .build()?;
1016
1017 let expected = "Projection: test.b [b:UInt32]\
1018 \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1019 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1020 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1021 \n Projection: sq.c [c:UInt32]\
1022 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1023
1024 assert_optimized_plan_eq_display_indent(
1025 Arc::new(DecorrelatePredicateSubquery::new()),
1026 plan,
1027 expected,
1028 );
1029 Ok(())
1030 }
1031
1032 #[test]
1034 fn not_in_subquery_simple() -> Result<()> {
1035 let table_scan = test_table_scan()?;
1036 let plan = LogicalPlanBuilder::from(table_scan)
1037 .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1038 .project(vec![col("test.b")])?
1039 .build()?;
1040
1041 let expected = "Projection: test.b [b:UInt32]\
1042 \n LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1043 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1044 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1045 \n Projection: sq.c [c:UInt32]\
1046 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1047
1048 assert_optimized_plan_eq_display_indent(
1049 Arc::new(DecorrelatePredicateSubquery::new()),
1050 plan,
1051 expected,
1052 );
1053 Ok(())
1054 }
1055
1056 #[test]
1057 fn wrapped_not_in_subquery() -> Result<()> {
1058 let table_scan = test_table_scan()?;
1059 let plan = LogicalPlanBuilder::from(table_scan)
1060 .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1061 .project(vec![col("test.b")])?
1062 .build()?;
1063
1064 let expected = "Projection: test.b [b:UInt32]\
1065 \n LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1066 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1067 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1068 \n Projection: sq.c [c:UInt32]\
1069 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1070
1071 assert_optimized_plan_eq_display_indent(
1072 Arc::new(DecorrelatePredicateSubquery::new()),
1073 plan,
1074 expected,
1075 );
1076 Ok(())
1077 }
1078
1079 #[test]
1080 fn wrapped_not_not_in_subquery() -> Result<()> {
1081 let table_scan = test_table_scan()?;
1082 let plan = LogicalPlanBuilder::from(table_scan)
1083 .filter(not(not_in_subquery(
1084 col("c"),
1085 test_subquery_with_name("sq")?,
1086 )))?
1087 .project(vec![col("test.b")])?
1088 .build()?;
1089
1090 let expected = "Projection: test.b [b:UInt32]\
1091 \n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1092 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1093 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1094 \n Projection: sq.c [c:UInt32]\
1095 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1096
1097 assert_optimized_plan_eq_display_indent(
1098 Arc::new(DecorrelatePredicateSubquery::new()),
1099 plan,
1100 expected,
1101 );
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn in_subquery_both_side_expr() -> Result<()> {
1107 let table_scan = test_table_scan()?;
1108 let subquery_scan = test_table_scan_with_name("sq")?;
1109
1110 let subquery = LogicalPlanBuilder::from(subquery_scan)
1111 .project(vec![col("c") * lit(2u32)])?
1112 .build()?;
1113
1114 let plan = LogicalPlanBuilder::from(table_scan)
1115 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1116 .project(vec![col("test.b")])?
1117 .build()?;
1118
1119 let expected = "Projection: test.b [b:UInt32]\
1120 \n LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1121 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1122 \n SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]\
1123 \n Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]\
1124 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1125
1126 assert_optimized_plan_eq_display_indent(
1127 Arc::new(DecorrelatePredicateSubquery::new()),
1128 plan,
1129 expected,
1130 );
1131 Ok(())
1132 }
1133
1134 #[test]
1135 fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1136 let table_scan = test_table_scan()?;
1137 let subquery_scan = test_table_scan_with_name("sq")?;
1138
1139 let subquery = LogicalPlanBuilder::from(subquery_scan)
1140 .filter(
1141 out_ref_col(DataType::UInt32, "test.a")
1142 .eq(col("sq.a"))
1143 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1144 )?
1145 .project(vec![col("c") * lit(2u32)])?
1146 .build()?;
1147
1148 let plan = LogicalPlanBuilder::from(table_scan)
1149 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1150 .project(vec![col("test.b")])?
1151 .build()?;
1152
1153 let expected = "Projection: test.b [b:UInt32]\
1154 \n LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1155 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1156 \n SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]\
1157 \n Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]\
1158 \n Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]\
1159 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1160
1161 assert_optimized_plan_eq_display_indent(
1162 Arc::new(DecorrelatePredicateSubquery::new()),
1163 plan,
1164 expected,
1165 );
1166 Ok(())
1167 }
1168
1169 #[test]
1170 fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1171 let table_scan = test_table_scan()?;
1172 let subquery_scan = test_table_scan_with_name("sq")?;
1173
1174 let subquery = LogicalPlanBuilder::from(subquery_scan)
1175 .filter(
1176 out_ref_col(DataType::UInt32, "test.a")
1177 .add(out_ref_col(DataType::UInt32, "test.b"))
1178 .eq(col("sq.a").add(col("sq.b")))
1179 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1180 )?
1181 .project(vec![col("c") * lit(2u32)])?
1182 .build()?;
1183
1184 let plan = LogicalPlanBuilder::from(table_scan)
1185 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1186 .project(vec![col("test.b")])?
1187 .build()?;
1188
1189 let expected = "Projection: test.b [b:UInt32]\
1190 \n LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]\
1191 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1192 \n SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]\
1193 \n Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]\
1194 \n Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]\
1195 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1196
1197 assert_optimized_plan_eq_display_indent(
1198 Arc::new(DecorrelatePredicateSubquery::new()),
1199 plan,
1200 expected,
1201 );
1202 Ok(())
1203 }
1204
1205 #[test]
1206 fn two_in_subquery_with_outer_filter() -> Result<()> {
1207 let table_scan = test_table_scan()?;
1208 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1209 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1210
1211 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1212 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1213 .project(vec![col("c") * lit(2u32)])?
1214 .build()?;
1215
1216 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1217 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1218 .project(vec![col("c") * lit(2u32)])?
1219 .build()?;
1220
1221 let plan = LogicalPlanBuilder::from(table_scan)
1222 .filter(
1223 in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1224 in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1225 .and(col("test.c").gt(lit(1u32))),
1226 ),
1227 )?
1228 .project(vec![col("test.b")])?
1229 .build()?;
1230
1231 let expected = "Projection: test.b [b:UInt32]\
1232 \n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
1233 \n LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
1234 \n LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1235 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1236 \n SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]\
1237 \n Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]\
1238 \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
1239 \n SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]\
1240 \n Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]\
1241 \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
1242
1243 assert_optimized_plan_eq_display_indent(
1244 Arc::new(DecorrelatePredicateSubquery::new()),
1245 plan,
1246 expected,
1247 );
1248 Ok(())
1249 }
1250
1251 #[test]
1252 fn in_subquery_with_same_table() -> Result<()> {
1253 let outer_scan = test_table_scan()?;
1254 let subquery_scan = test_table_scan()?;
1255 let subquery = LogicalPlanBuilder::from(subquery_scan)
1256 .filter(col("test.a").gt(col("test.b")))?
1257 .project(vec![col("c")])?
1258 .build()?;
1259
1260 let plan = LogicalPlanBuilder::from(outer_scan)
1261 .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1262 .project(vec![col("test.b")])?
1263 .build()?;
1264
1265 let expected = "Projection: test.b [b:UInt32]\
1267 \n LeftSemi Join: Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1268 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1269 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1270 \n Projection: test.c [c:UInt32]\
1271 \n Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]\
1272 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
1273
1274 assert_optimized_plan_eq_display_indent(
1275 Arc::new(DecorrelatePredicateSubquery::new()),
1276 plan,
1277 expected,
1278 );
1279 Ok(())
1280 }
1281
1282 #[test]
1284 fn multiple_exists_subqueries() -> Result<()> {
1285 let orders = Arc::new(
1286 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1287 .filter(
1288 col("orders.o_custkey")
1289 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1290 )?
1291 .project(vec![col("orders.o_custkey")])?
1292 .build()?,
1293 );
1294
1295 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1296 .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1297 .project(vec![col("customer.c_custkey")])?
1298 .build()?;
1299
1300 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1301 \n LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1302 \n LeftSemi Join: Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1303 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1304 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1305 \n Projection: orders.o_custkey [o_custkey:Int64]\
1306 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1307 \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
1308 \n Projection: orders.o_custkey [o_custkey:Int64]\
1309 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1310 assert_optimized_plan_equal(plan, expected)
1311 }
1312
1313 #[test]
1315 fn recursive_exists_subqueries() -> Result<()> {
1316 let lineitem = Arc::new(
1317 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1318 .filter(
1319 col("lineitem.l_orderkey")
1320 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1321 )?
1322 .project(vec![col("lineitem.l_orderkey")])?
1323 .build()?,
1324 );
1325
1326 let orders = Arc::new(
1327 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1328 .filter(
1329 exists(lineitem).and(
1330 col("orders.o_custkey")
1331 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1332 ),
1333 )?
1334 .project(vec![col("orders.o_custkey")])?
1335 .build()?,
1336 );
1337
1338 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1339 .filter(exists(orders))?
1340 .project(vec![col("customer.c_custkey")])?
1341 .build()?;
1342
1343 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1344 \n LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1345 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1346 \n SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
1347 \n Projection: orders.o_custkey [o_custkey:Int64]\
1348 \n LeftSemi Join: Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1349 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1350 \n SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]\
1351 \n Projection: lineitem.l_orderkey [l_orderkey:Int64]\
1352 \n TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
1353 assert_optimized_plan_equal(plan, expected)
1354 }
1355
1356 #[test]
1358 fn exists_subquery_with_subquery_filters() -> Result<()> {
1359 let sq = Arc::new(
1360 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1361 .filter(
1362 out_ref_col(DataType::Int64, "customer.c_custkey")
1363 .eq(col("orders.o_custkey"))
1364 .and(col("o_orderkey").eq(lit(1))),
1365 )?
1366 .project(vec![col("orders.o_custkey")])?
1367 .build()?,
1368 );
1369
1370 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1371 .filter(exists(sq))?
1372 .project(vec![col("customer.c_custkey")])?
1373 .build()?;
1374
1375 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1376 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1377 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1378 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1379 \n Projection: orders.o_custkey [o_custkey:Int64]\
1380 \n Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1381 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1382
1383 assert_optimized_plan_equal(plan, expected)
1384 }
1385
1386 #[test]
1387 fn exists_subquery_no_cols() -> Result<()> {
1388 let sq = Arc::new(
1389 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1390 .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1391 .project(vec![col("orders.o_custkey")])?
1392 .build()?,
1393 );
1394
1395 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1396 .filter(exists(sq))?
1397 .project(vec![col("customer.c_custkey")])?
1398 .build()?;
1399
1400 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1402 \n LeftSemi Join: Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]\
1403 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1404 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1405 \n Projection: orders.o_custkey [o_custkey:Int64]\
1406 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1407
1408 assert_optimized_plan_equal(plan, expected)
1409 }
1410
1411 #[test]
1413 fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1414 let sq = Arc::new(
1415 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1416 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1417 .project(vec![col("orders.o_custkey")])?
1418 .build()?,
1419 );
1420
1421 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1422 .filter(exists(sq))?
1423 .project(vec![col("customer.c_custkey")])?
1424 .build()?;
1425
1426 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1427 \n LeftSemi Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]\
1428 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1429 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1430 \n Projection: orders.o_custkey [o_custkey:Int64]\
1431 \n Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1432 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1433 assert_optimized_plan_equal(plan, expected)
1434 }
1435
1436 #[test]
1438 fn exists_subquery_where_not_eq() -> Result<()> {
1439 let sq = Arc::new(
1440 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1441 .filter(
1442 out_ref_col(DataType::Int64, "customer.c_custkey")
1443 .not_eq(col("orders.o_custkey")),
1444 )?
1445 .project(vec![col("orders.o_custkey")])?
1446 .build()?,
1447 );
1448
1449 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1450 .filter(exists(sq))?
1451 .project(vec![col("customer.c_custkey")])?
1452 .build()?;
1453
1454 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1455 \n LeftSemi Join: Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1456 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1457 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1458 \n Projection: orders.o_custkey [o_custkey:Int64]\
1459 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1460
1461 assert_optimized_plan_equal(plan, expected)
1462 }
1463
1464 #[test]
1466 fn exists_subquery_where_less_than() -> Result<()> {
1467 let sq = Arc::new(
1468 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1469 .filter(
1470 out_ref_col(DataType::Int64, "customer.c_custkey")
1471 .lt(col("orders.o_custkey")),
1472 )?
1473 .project(vec![col("orders.o_custkey")])?
1474 .build()?,
1475 );
1476
1477 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1478 .filter(exists(sq))?
1479 .project(vec![col("customer.c_custkey")])?
1480 .build()?;
1481
1482 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1483 \n LeftSemi Join: Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1484 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1485 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1486 \n Projection: orders.o_custkey [o_custkey:Int64]\
1487 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1488
1489 assert_optimized_plan_equal(plan, expected)
1490 }
1491
1492 #[test]
1494 fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1495 let sq = Arc::new(
1496 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1497 .filter(
1498 out_ref_col(DataType::Int64, "customer.c_custkey")
1499 .eq(col("orders.o_custkey"))
1500 .or(col("o_orderkey").eq(lit(1))),
1501 )?
1502 .project(vec![col("orders.o_custkey")])?
1503 .build()?,
1504 );
1505
1506 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1507 .filter(exists(sq))?
1508 .project(vec![col("customer.c_custkey")])?
1509 .build()?;
1510
1511 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1512 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
1513 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1514 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]\
1515 \n Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]\
1516 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1517
1518 assert_optimized_plan_equal(plan, expected)
1519 }
1520
1521 #[test]
1523 fn exists_subquery_no_projection() -> Result<()> {
1524 let sq = Arc::new(
1525 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1526 .filter(
1527 out_ref_col(DataType::Int64, "customer.c_custkey")
1528 .eq(col("orders.o_custkey")),
1529 )?
1530 .build()?,
1531 );
1532
1533 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1534 .filter(exists(sq))?
1535 .project(vec![col("customer.c_custkey")])?
1536 .build()?;
1537
1538 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1539 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1540 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1541 \n SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1542 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1543
1544 assert_optimized_plan_equal(plan, expected)
1545 }
1546
1547 #[test]
1549 fn exists_subquery_project_expr() -> Result<()> {
1550 let sq = Arc::new(
1551 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1552 .filter(
1553 out_ref_col(DataType::Int64, "customer.c_custkey")
1554 .eq(col("orders.o_custkey")),
1555 )?
1556 .project(vec![col("orders.o_custkey").add(lit(1))])?
1557 .build()?,
1558 );
1559
1560 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1561 .filter(exists(sq))?
1562 .project(vec![col("customer.c_custkey")])?
1563 .build()?;
1564
1565 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1566 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1567 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1568 \n SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
1569 \n Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
1570 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1571
1572 assert_optimized_plan_equal(plan, expected)
1573 }
1574
1575 #[test]
1577 fn exists_subquery_should_support_additional_filters() -> Result<()> {
1578 let sq = Arc::new(
1579 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1580 .filter(
1581 out_ref_col(DataType::Int64, "customer.c_custkey")
1582 .eq(col("orders.o_custkey")),
1583 )?
1584 .project(vec![col("orders.o_custkey")])?
1585 .build()?,
1586 );
1587 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1588 .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1589 .project(vec![col("customer.c_custkey")])?
1590 .build()?;
1591
1592 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1593 \n Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
1594 \n LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1595 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1596 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1597 \n Projection: orders.o_custkey [o_custkey:Int64]\
1598 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1599
1600 assert_optimized_plan_equal(plan, expected)
1601 }
1602
1603 #[test]
1605 fn exists_subquery_disjunction() -> Result<()> {
1606 let sq = Arc::new(
1607 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1608 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1609 .project(vec![col("orders.o_custkey")])?
1610 .build()?,
1611 );
1612
1613 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1614 .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1615 .project(vec![col("customer.c_custkey")])?
1616 .build()?;
1617
1618 let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1619 \n Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]\
1620 \n LeftMark Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]\
1621 \n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1622 \n SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1623 \n Projection: orders.o_custkey [o_custkey:Int64]\
1624 \n Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1625 \n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1626
1627 assert_optimized_plan_equal(plan, expected)
1628 }
1629
1630 #[test]
1632 fn exists_subquery_correlated() -> Result<()> {
1633 let sq = Arc::new(
1634 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1635 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1636 .project(vec![col("c")])?
1637 .build()?,
1638 );
1639
1640 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1641 .filter(exists(sq))?
1642 .project(vec![col("test.c")])?
1643 .build()?;
1644
1645 let expected = "Projection: test.c [c:UInt32]\
1646 \n LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1647 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1648 \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1649 \n Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
1650 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1651
1652 assert_optimized_plan_equal(plan, expected)
1653 }
1654
1655 #[test]
1657 fn exists_subquery_simple() -> Result<()> {
1658 let table_scan = test_table_scan()?;
1659 let plan = LogicalPlanBuilder::from(table_scan)
1660 .filter(exists(test_subquery_with_name("sq")?))?
1661 .project(vec![col("test.b")])?
1662 .build()?;
1663
1664 let expected = "Projection: test.b [b:UInt32]\
1665 \n LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1666 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1667 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1668 \n Projection: sq.c [c:UInt32]\
1669 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1670 assert_optimized_plan_equal(plan, expected)
1671 }
1672
1673 #[test]
1675 fn not_exists_subquery_simple() -> Result<()> {
1676 let table_scan = test_table_scan()?;
1677 let plan = LogicalPlanBuilder::from(table_scan)
1678 .filter(not_exists(test_subquery_with_name("sq")?))?
1679 .project(vec![col("test.b")])?
1680 .build()?;
1681
1682 let expected = "Projection: test.b [b:UInt32]\
1683 \n LeftAnti Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1684 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1685 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1686 \n Projection: sq.c [c:UInt32]\
1687 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1688 assert_optimized_plan_equal(plan, expected)
1689 }
1690
1691 #[test]
1692 fn two_exists_subquery_with_outer_filter() -> Result<()> {
1693 let table_scan = test_table_scan()?;
1694 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1695 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1696
1697 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1698 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1699 .project(vec![col("c")])?
1700 .build()?;
1701
1702 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1703 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1704 .project(vec![col("c")])?
1705 .build()?;
1706
1707 let plan = LogicalPlanBuilder::from(table_scan)
1708 .filter(
1709 exists(Arc::new(subquery1))
1710 .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1711 )?
1712 .project(vec![col("test.b")])?
1713 .build()?;
1714
1715 let expected = "Projection: test.b [b:UInt32]\
1716 \n Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
1717 \n LeftSemi Join: Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
1718 \n LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1719 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1720 \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1721 \n Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]\
1722 \n TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
1723 \n SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]\
1724 \n Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]\
1725 \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
1726
1727 assert_optimized_plan_equal(plan, expected)
1728 }
1729
1730 #[test]
1731 fn exists_subquery_expr_filter() -> Result<()> {
1732 let table_scan = test_table_scan()?;
1733 let subquery_scan = test_table_scan_with_name("sq")?;
1734 let subquery = LogicalPlanBuilder::from(subquery_scan)
1735 .filter(
1736 (lit(1u32) + col("sq.a"))
1737 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1738 )?
1739 .project(vec![lit(1u32)])?
1740 .build()?;
1741 let plan = LogicalPlanBuilder::from(table_scan)
1742 .filter(exists(Arc::new(subquery)))?
1743 .project(vec![col("test.b")])?
1744 .build()?;
1745
1746 let expected = "Projection: test.b [b:UInt32]\
1747 \n LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1748 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1749 \n SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]\
1750 \n Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]\
1751 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1752
1753 assert_optimized_plan_equal(plan, expected)
1754 }
1755
1756 #[test]
1757 fn exists_subquery_with_same_table() -> Result<()> {
1758 let outer_scan = test_table_scan()?;
1759 let subquery_scan = test_table_scan()?;
1760 let subquery = LogicalPlanBuilder::from(subquery_scan)
1761 .filter(col("test.a").gt(col("test.b")))?
1762 .project(vec![col("c")])?
1763 .build()?;
1764
1765 let plan = LogicalPlanBuilder::from(outer_scan)
1766 .filter(exists(Arc::new(subquery)))?
1767 .project(vec![col("test.b")])?
1768 .build()?;
1769
1770 let expected = "Projection: test.b [b:UInt32]\
1772 \n LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1773 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1774 \n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1775 \n Projection: test.c [c:UInt32]\
1776 \n Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]\
1777 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
1778
1779 assert_optimized_plan_equal(plan, expected)
1780 }
1781
1782 #[test]
1783 fn exists_distinct_subquery() -> Result<()> {
1784 let table_scan = test_table_scan()?;
1785 let subquery_scan = test_table_scan_with_name("sq")?;
1786 let subquery = LogicalPlanBuilder::from(subquery_scan)
1787 .filter(
1788 (lit(1u32) + col("sq.a"))
1789 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1790 )?
1791 .project(vec![col("sq.c")])?
1792 .distinct()?
1793 .build()?;
1794 let plan = LogicalPlanBuilder::from(table_scan)
1795 .filter(exists(Arc::new(subquery)))?
1796 .project(vec![col("test.b")])?
1797 .build()?;
1798
1799 let expected = "Projection: test.b [b:UInt32]\
1800 \n LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1801 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1802 \n SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1803 \n Distinct: [c:UInt32, a:UInt32]\
1804 \n Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
1805 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1806
1807 assert_optimized_plan_equal(plan, expected)
1808 }
1809
1810 #[test]
1811 fn exists_distinct_expr_subquery() -> Result<()> {
1812 let table_scan = test_table_scan()?;
1813 let subquery_scan = test_table_scan_with_name("sq")?;
1814 let subquery = LogicalPlanBuilder::from(subquery_scan)
1815 .filter(
1816 (lit(1u32) + col("sq.a"))
1817 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1818 )?
1819 .project(vec![col("sq.b") + col("sq.c")])?
1820 .distinct()?
1821 .build()?;
1822 let plan = LogicalPlanBuilder::from(table_scan)
1823 .filter(exists(Arc::new(subquery)))?
1824 .project(vec![col("test.b")])?
1825 .build()?;
1826
1827 let expected = "Projection: test.b [b:UInt32]\
1828 \n LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1829 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1830 \n SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]\
1831 \n Distinct: [sq.b + sq.c:UInt32, a:UInt32]\
1832 \n Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]\
1833 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1834
1835 assert_optimized_plan_equal(plan, expected)
1836 }
1837
1838 #[test]
1839 fn exists_distinct_subquery_with_literal() -> Result<()> {
1840 let table_scan = test_table_scan()?;
1841 let subquery_scan = test_table_scan_with_name("sq")?;
1842 let subquery = LogicalPlanBuilder::from(subquery_scan)
1843 .filter(
1844 (lit(1u32) + col("sq.a"))
1845 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1846 )?
1847 .project(vec![lit(1u32), col("sq.c")])?
1848 .distinct()?
1849 .build()?;
1850 let plan = LogicalPlanBuilder::from(table_scan)
1851 .filter(exists(Arc::new(subquery)))?
1852 .project(vec![col("test.b")])?
1853 .build()?;
1854
1855 let expected = "Projection: test.b [b:UInt32]\
1856 \n LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1857 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1858 \n SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1859 \n Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1860 \n Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1861 \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1862
1863 assert_optimized_plan_equal(plan, expected)
1864 }
1865
1866 #[test]
1867 fn exists_uncorrelated_unnest() -> Result<()> {
1868 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1869 "arr",
1870 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1871 true,
1872 )]));
1873 let subquery = LogicalPlanBuilder::scan_with_filters(
1874 "sq",
1875 subquery_table_source,
1876 None,
1877 vec![],
1878 )?
1879 .unnest_column("arr")?
1880 .build()?;
1881 let table_scan = test_table_scan()?;
1882 let plan = LogicalPlanBuilder::from(table_scan)
1883 .filter(exists(Arc::new(subquery)))?
1884 .project(vec![col("test.b")])?
1885 .build()?;
1886
1887 let expected = "Projection: test.b [b:UInt32]\
1888 \n LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1889 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1890 \n SubqueryAlias: __correlated_sq_1 [arr:Int32;N]\
1891 \n Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]\
1892 \n TableScan: sq [arr:List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]";
1893 assert_optimized_plan_equal(plan, expected)
1894 }
1895
1896 #[test]
1897 fn exists_correlated_unnest() -> Result<()> {
1898 let table_scan = test_table_scan()?;
1899 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1900 "a",
1901 DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
1902 true,
1903 )]));
1904 let subquery = LogicalPlanBuilder::scan_with_filters(
1905 "sq",
1906 subquery_table_source,
1907 None,
1908 vec![],
1909 )?
1910 .unnest_column("a")?
1911 .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
1912 .build()?;
1913 let plan = LogicalPlanBuilder::from(table_scan)
1914 .filter(exists(Arc::new(subquery)))?
1915 .project(vec![col("test.b")])?
1916 .build()?;
1917
1918 let expected = "Projection: test.b [b:UInt32]\
1919 \n LeftSemi Join: Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]\
1920 \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1921 \n SubqueryAlias: __correlated_sq_1 [a:UInt32;N]\
1922 \n Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]\
1923 \n TableScan: sq [a:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]";
1924
1925 assert_optimized_plan_equal(plan, expected)
1926 }
1927
1928 #[test]
1929 fn upper_case_ident() -> Result<()> {
1930 let fields = vec![
1931 Field::new("A", DataType::UInt32, false),
1932 Field::new("B", DataType::UInt32, false),
1933 ];
1934
1935 let schema = Schema::new(fields);
1936 let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
1937 let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
1938
1939 let subquery = LogicalPlanBuilder::from(table_scan_b)
1940 .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
1941 .project(vec![lit(1)])?
1942 .build()?;
1943
1944 let plan = LogicalPlanBuilder::from(table_scan_a)
1945 .filter(exists(Arc::new(subquery)))?
1946 .project(vec![col("\"TEST_A\".\"B\"")])?
1947 .build()?;
1948
1949 let expected = "Projection: TEST_A.B [B:UInt32]\
1950 \n LeftSemi Join: Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]\
1951 \n TableScan: TEST_A [A:UInt32, B:UInt32]\
1952 \n SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]\
1953 \n Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]\
1954 \n TableScan: TEST_B [A:UInt32, B:UInt32]";
1955
1956 assert_optimized_plan_equal(plan, expected)
1957 }
1958}