datafusion_optimizer/
decorrelate_predicate_subquery.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DecorrelatePredicateSubquery`] converts `IN`/`EXISTS` subquery predicates to `SEMI`/`ANTI` joins
19use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{internal_err, plan_err, Column, Result};
31use datafusion_expr::expr::{Exists, InSubquery};
32use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
33use datafusion_expr::logical_plan::{JoinType, Subquery};
34use datafusion_expr::utils::{conjunction, split_conjunction_owned};
35use datafusion_expr::{
36    exists, in_subquery, lit, not, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
37    LogicalPlan, LogicalPlanBuilder, Operator,
38};
39
40use log::debug;
41
42/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
43#[derive(Default, Debug)]
44pub struct DecorrelatePredicateSubquery {}
45
46impl DecorrelatePredicateSubquery {
47    #[allow(missing_docs)]
48    pub fn new() -> Self {
49        Self::default()
50    }
51}
52
53impl OptimizerRule for DecorrelatePredicateSubquery {
54    fn supports_rewrite(&self) -> bool {
55        true
56    }
57
58    fn rewrite(
59        &self,
60        plan: LogicalPlan,
61        config: &dyn OptimizerConfig,
62    ) -> Result<Transformed<LogicalPlan>> {
63        let plan = plan
64            .map_subqueries(|subquery| {
65                subquery.transform_down(|p| self.rewrite(p, config))
66            })?
67            .data;
68
69        let LogicalPlan::Filter(filter) = plan else {
70            return Ok(Transformed::no(plan));
71        };
72
73        if !has_subquery(&filter.predicate) {
74            return Ok(Transformed::no(LogicalPlan::Filter(filter)));
75        }
76
77        let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
78            split_conjunction_owned(filter.predicate)
79                .into_iter()
80                .partition(has_subquery);
81
82        if with_subqueries.is_empty() {
83            return internal_err!(
84                "can not find expected subqueries in DecorrelatePredicateSubquery"
85            );
86        }
87
88        // iterate through all exists clauses in predicate, turning each into a join
89        let mut cur_input = Arc::unwrap_or_clone(filter.input);
90        for subquery_expr in with_subqueries {
91            match extract_subquery_info(subquery_expr) {
92                // The subquery expression is at the top level of the filter
93                SubqueryPredicate::Top(subquery) => {
94                    match build_join_top(&subquery, &cur_input, config.alias_generator())?
95                    {
96                        Some(plan) => cur_input = plan,
97                        // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
98                        None => other_exprs.push(subquery.expr()),
99                    }
100                }
101                // The subquery expression is embedded within another expression
102                SubqueryPredicate::Embedded(expr) => {
103                    let (plan, expr_without_subqueries) =
104                        rewrite_inner_subqueries(cur_input, expr, config)?;
105                    cur_input = plan;
106                    other_exprs.push(expr_without_subqueries);
107                }
108            }
109        }
110
111        let expr = conjunction(other_exprs);
112        if let Some(expr) = expr {
113            let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
114            cur_input = LogicalPlan::Filter(new_filter);
115        }
116        Ok(Transformed::yes(cur_input))
117    }
118
119    fn name(&self) -> &str {
120        "decorrelate_predicate_subquery"
121    }
122
123    fn apply_order(&self) -> Option<ApplyOrder> {
124        Some(ApplyOrder::TopDown)
125    }
126}
127
128fn rewrite_inner_subqueries(
129    outer: LogicalPlan,
130    expr: Expr,
131    config: &dyn OptimizerConfig,
132) -> Result<(LogicalPlan, Expr)> {
133    let mut cur_input = outer;
134    let alias = config.alias_generator();
135    let expr_without_subqueries = expr.transform(|e| match e {
136        Expr::Exists(Exists {
137            subquery: Subquery { subquery, .. },
138            negated,
139        }) => match mark_join(&cur_input, Arc::clone(&subquery), None, negated, alias)? {
140            Some((plan, exists_expr)) => {
141                cur_input = plan;
142                Ok(Transformed::yes(exists_expr))
143            }
144            None if negated => Ok(Transformed::no(not_exists(subquery))),
145            None => Ok(Transformed::no(exists(subquery))),
146        },
147        Expr::InSubquery(InSubquery {
148            expr,
149            subquery: Subquery { subquery, .. },
150            negated,
151        }) => {
152            let in_predicate = subquery
153                .head_output_expr()?
154                .map_or(plan_err!("single expression required."), |output_expr| {
155                    Ok(Expr::eq(*expr.clone(), output_expr))
156                })?;
157            match mark_join(
158                &cur_input,
159                Arc::clone(&subquery),
160                Some(in_predicate),
161                negated,
162                alias,
163            )? {
164                Some((plan, exists_expr)) => {
165                    cur_input = plan;
166                    Ok(Transformed::yes(exists_expr))
167                }
168                None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
169                None => Ok(Transformed::no(in_subquery(*expr, subquery))),
170            }
171        }
172        _ => Ok(Transformed::no(e)),
173    })?;
174    Ok((cur_input, expr_without_subqueries.data))
175}
176
177enum SubqueryPredicate {
178    // The subquery expression is at the top level of the filter and can be fully replaced by a
179    // semi/anti join
180    Top(SubqueryInfo),
181    // The subquery expression is embedded within another expression and is replaced using an
182    // existence join
183    Embedded(Expr),
184}
185
186fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
187    match expr {
188        Expr::Not(not_expr) => match *not_expr {
189            Expr::InSubquery(InSubquery {
190                expr,
191                subquery,
192                negated,
193            }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
194                subquery, *expr, !negated,
195            )),
196            Expr::Exists(Exists { subquery, negated }) => {
197                SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
198            }
199            expr => SubqueryPredicate::Embedded(not(expr)),
200        },
201        Expr::InSubquery(InSubquery {
202            expr,
203            subquery,
204            negated,
205        }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
206            subquery, *expr, negated,
207        )),
208        Expr::Exists(Exists { subquery, negated }) => {
209            SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
210        }
211        expr => SubqueryPredicate::Embedded(expr),
212    }
213}
214
215fn has_subquery(expr: &Expr) -> bool {
216    expr.exists(|e| match e {
217        Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
218        _ => Ok(false),
219    })
220    .unwrap()
221}
222
223/// Optimize the subquery to left-anti/left-semi join.
224/// If the subquery is a correlated subquery, we need extract the join predicate from the subquery.
225///
226/// For example, given a query like:
227/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = t2.b and t1.c > t2.c)`
228///
229/// The optimized plan will be:
230///
231/// ```text
232/// Projection: t1.a, t1.b
233///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = __correlated_sq_1.b AND t1.c > __correlated_sq_1.c
234///     TableScan: t1
235///     SubqueryAlias: __correlated_sq_1
236///       Projection: t2.a, t2.b, t2.c
237///         TableScan: t2
238/// ```
239///
240/// Given another query like:
241/// `select t1.id from t1 where exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
242///
243/// The optimized plan will be:
244///
245/// ```text
246/// Projection: t1.id
247///   LeftSemi Join:  Filter: t1.id = __correlated_sq_1.id
248///     TableScan: t1
249///     SubqueryAlias: __correlated_sq_1
250///       Projection: t2.id
251///         TableScan: t2
252/// ```
253fn build_join_top(
254    query_info: &SubqueryInfo,
255    left: &LogicalPlan,
256    alias: &Arc<AliasGenerator>,
257) -> Result<Option<LogicalPlan>> {
258    let where_in_expr_opt = &query_info.where_in_expr;
259    let in_predicate_opt = where_in_expr_opt
260        .clone()
261        .map(|where_in_expr| {
262            query_info
263                .query
264                .subquery
265                .head_output_expr()?
266                .map_or(plan_err!("single expression required."), |expr| {
267                    Ok(Expr::eq(where_in_expr, expr))
268                })
269        })
270        .map_or(Ok(None), |v| v.map(Some))?;
271
272    let join_type = match query_info.negated {
273        true => JoinType::LeftAnti,
274        false => JoinType::LeftSemi,
275    };
276    let subquery = query_info.query.subquery.as_ref();
277    let subquery_alias = alias.next("__correlated_sq");
278    build_join(left, subquery, in_predicate_opt, join_type, subquery_alias)
279}
280
281/// This is used to handle the case when the subquery is embedded in a more complex boolean
282/// expression like and OR. For example
283///
284/// `select t1.id from t1 where t1.id < 0 OR exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
285///
286/// The optimized plan will be:
287///
288/// ```text
289/// Projection: t1.id
290///   Filter: t1.id < 0 OR __correlated_sq_1.mark
291///     LeftMark Join:  Filter: t1.id = __correlated_sq_1.id
292///       TableScan: t1
293///       SubqueryAlias: __correlated_sq_1
294///         Projection: t2.id
295///           TableScan: t2
296fn mark_join(
297    left: &LogicalPlan,
298    subquery: Arc<LogicalPlan>,
299    in_predicate_opt: Option<Expr>,
300    negated: bool,
301    alias_generator: &Arc<AliasGenerator>,
302) -> Result<Option<(LogicalPlan, Expr)>> {
303    let alias = alias_generator.next("__correlated_sq");
304
305    let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
306    let exists_expr = if negated { !exists_col } else { exists_col };
307
308    Ok(
309        build_join(left, &subquery, in_predicate_opt, JoinType::LeftMark, alias)?
310            .map(|plan| (plan, exists_expr)),
311    )
312}
313
314fn build_join(
315    left: &LogicalPlan,
316    subquery: &LogicalPlan,
317    in_predicate_opt: Option<Expr>,
318    join_type: JoinType,
319    alias: String,
320) -> Result<Option<LogicalPlan>> {
321    let mut pull_up = PullUpCorrelatedExpr::new()
322        .with_in_predicate_opt(in_predicate_opt.clone())
323        .with_exists_sub_query(in_predicate_opt.is_none());
324
325    let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
326    if !pull_up.can_pull_up {
327        return Ok(None);
328    }
329
330    let sub_query_alias = LogicalPlanBuilder::from(new_plan)
331        .alias(alias.to_string())?
332        .build()?;
333    let mut all_correlated_cols = BTreeSet::new();
334    pull_up
335        .correlated_subquery_cols_map
336        .values()
337        .for_each(|cols| all_correlated_cols.extend(cols.clone()));
338
339    // alias the join filter
340    let join_filter_opt = conjunction(pull_up.join_filters)
341        .map_or(Ok(None), |filter| {
342            replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
343        })?;
344
345    let join_filter = match (join_filter_opt, in_predicate_opt) {
346        (
347            Some(join_filter),
348            Some(Expr::BinaryExpr(BinaryExpr {
349                left,
350                op: Operator::Eq,
351                right,
352            })),
353        ) => {
354            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
355            let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
356            in_predicate.and(join_filter)
357        }
358        (Some(join_filter), _) => join_filter,
359        (
360            _,
361            Some(Expr::BinaryExpr(BinaryExpr {
362                left,
363                op: Operator::Eq,
364                right,
365            })),
366        ) => {
367            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
368            let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
369            in_predicate
370        }
371        (None, None) => lit(true),
372        _ => return Ok(None),
373    };
374    // join our sub query into the main plan
375    let new_plan = LogicalPlanBuilder::from(left.clone())
376        .join_on(sub_query_alias, join_type, Some(join_filter))?
377        .build()?;
378    debug!(
379        "predicate subquery optimized:\n{}",
380        new_plan.display_indent()
381    );
382    Ok(Some(new_plan))
383}
384
385#[derive(Debug)]
386struct SubqueryInfo {
387    query: Subquery,
388    where_in_expr: Option<Expr>,
389    negated: bool,
390}
391
392impl SubqueryInfo {
393    pub fn new(query: Subquery, negated: bool) -> Self {
394        Self {
395            query,
396            where_in_expr: None,
397            negated,
398        }
399    }
400
401    pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
402        Self {
403            query,
404            where_in_expr: Some(expr),
405            negated,
406        }
407    }
408
409    pub fn expr(self) -> Expr {
410        match self.where_in_expr {
411            Some(expr) => match self.negated {
412                true => not_in_subquery(expr, self.query.subquery),
413                false => in_subquery(expr, self.query.subquery),
414            },
415            None => match self.negated {
416                true => not_exists(self.query.subquery),
417                false => exists(self.query.subquery),
418            },
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use std::ops::Add;
426
427    use super::*;
428    use crate::test::*;
429
430    use arrow::datatypes::{DataType, Field, Schema};
431    use datafusion_expr::builder::table_source;
432    use datafusion_expr::{and, binary_expr, col, lit, not, out_ref_col, table_scan};
433
434    fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
435        assert_optimized_plan_eq_display_indent(
436            Arc::new(DecorrelatePredicateSubquery::new()),
437            plan,
438            expected,
439        );
440        Ok(())
441    }
442
443    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
444        let table_scan = test_table_scan_with_name(name)?;
445        Ok(Arc::new(
446            LogicalPlanBuilder::from(table_scan)
447                .project(vec![col("c")])?
448                .build()?,
449        ))
450    }
451
452    /// Test for several IN subquery expressions
453    #[test]
454    fn in_subquery_multiple() -> Result<()> {
455        let table_scan = test_table_scan()?;
456        let plan = LogicalPlanBuilder::from(table_scan)
457            .filter(and(
458                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
459                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
460            ))?
461            .project(vec![col("test.b")])?
462            .build()?;
463
464        let expected = "Projection: test.b [b:UInt32]\
465        \n  LeftSemi Join:  Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
466        \n    LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
467        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
468        \n      SubqueryAlias: __correlated_sq_1 [c:UInt32]\
469        \n        Projection: sq_1.c [c:UInt32]\
470        \n          TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
471        \n    SubqueryAlias: __correlated_sq_2 [c:UInt32]\
472        \n      Projection: sq_2.c [c:UInt32]\
473        \n        TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]";
474        assert_optimized_plan_equal(plan, expected)
475    }
476
477    /// Test for IN subquery with additional AND filter
478    #[test]
479    fn in_subquery_with_and_filters() -> Result<()> {
480        let table_scan = test_table_scan()?;
481        let plan = LogicalPlanBuilder::from(table_scan)
482            .filter(and(
483                in_subquery(col("c"), test_subquery_with_name("sq")?),
484                and(
485                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
486                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
487                ),
488            ))?
489            .project(vec![col("test.b")])?
490            .build()?;
491
492        let expected = "Projection: test.b [b:UInt32]\
493        \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]\
494        \n    LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
495        \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
496        \n      SubqueryAlias: __correlated_sq_1 [c:UInt32]\
497        \n        Projection: sq.c [c:UInt32]\
498        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
499
500        assert_optimized_plan_equal(plan, expected)
501    }
502
503    /// Test for nested IN subqueries
504    #[test]
505    fn in_subquery_nested() -> Result<()> {
506        let table_scan = test_table_scan()?;
507
508        let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
509            .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
510            .project(vec![col("a")])?
511            .build()?;
512
513        let plan = LogicalPlanBuilder::from(table_scan)
514            .filter(in_subquery(col("b"), Arc::new(subquery)))?
515            .project(vec![col("test.b")])?
516            .build()?;
517
518        let expected = "Projection: test.b [b:UInt32]\
519        \n  LeftSemi Join:  Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
520        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
521        \n    SubqueryAlias: __correlated_sq_2 [a:UInt32]\
522        \n      Projection: sq.a [a:UInt32]\
523        \n        LeftSemi Join:  Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
524        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
525        \n          SubqueryAlias: __correlated_sq_1 [c:UInt32]\
526        \n            Projection: sq_nested.c [c:UInt32]\
527        \n              TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
528
529        assert_optimized_plan_equal(plan, expected)
530    }
531
532    /// Test multiple correlated subqueries
533    /// See subqueries.rs where_in_multiple()
534    #[test]
535    fn multiple_subqueries() -> Result<()> {
536        let orders = Arc::new(
537            LogicalPlanBuilder::from(scan_tpch_table("orders"))
538                .filter(
539                    col("orders.o_custkey")
540                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
541                )?
542                .project(vec![col("orders.o_custkey")])?
543                .build()?,
544        );
545        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
546            .filter(
547                in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
548                    .and(in_subquery(col("customer.c_custkey"), orders)),
549            )?
550            .project(vec![col("customer.c_custkey")])?
551            .build()?;
552        debug!("plan to optimize:\n{}", plan.display_indent());
553
554        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
555        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
556        \n    LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
557        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
558        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
559        \n        Projection: orders.o_custkey [o_custkey:Int64]\
560        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
561        \n    SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
562        \n      Projection: orders.o_custkey [o_custkey:Int64]\
563        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
564
565        assert_optimized_plan_eq_display_indent(
566            Arc::new(DecorrelatePredicateSubquery::new()),
567            plan,
568            expected,
569        );
570        Ok(())
571    }
572
573    /// Test recursive correlated subqueries
574    /// See subqueries.rs where_in_recursive()
575    #[test]
576    fn recursive_subqueries() -> Result<()> {
577        let lineitem = Arc::new(
578            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
579                .filter(
580                    col("lineitem.l_orderkey")
581                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
582                )?
583                .project(vec![col("lineitem.l_orderkey")])?
584                .build()?,
585        );
586
587        let orders = Arc::new(
588            LogicalPlanBuilder::from(scan_tpch_table("orders"))
589                .filter(
590                    in_subquery(col("orders.o_orderkey"), lineitem).and(
591                        col("orders.o_custkey")
592                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
593                    ),
594                )?
595                .project(vec![col("orders.o_custkey")])?
596                .build()?,
597        );
598
599        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
600            .filter(in_subquery(col("customer.c_custkey"), orders))?
601            .project(vec![col("customer.c_custkey")])?
602            .build()?;
603
604        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
605        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]\
606        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
607        \n    SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
608        \n      Projection: orders.o_custkey [o_custkey:Int64]\
609        \n        LeftSemi Join:  Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
610        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
611        \n          SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]\
612        \n            Projection: lineitem.l_orderkey [l_orderkey:Int64]\
613        \n              TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
614
615        assert_optimized_plan_eq_display_indent(
616            Arc::new(DecorrelatePredicateSubquery::new()),
617            plan,
618            expected,
619        );
620        Ok(())
621    }
622
623    /// Test for correlated IN subquery filter with additional subquery filters
624    #[test]
625    fn in_subquery_with_subquery_filters() -> Result<()> {
626        let sq = Arc::new(
627            LogicalPlanBuilder::from(scan_tpch_table("orders"))
628                .filter(
629                    out_ref_col(DataType::Int64, "customer.c_custkey")
630                        .eq(col("orders.o_custkey"))
631                        .and(col("o_orderkey").eq(lit(1))),
632                )?
633                .project(vec![col("orders.o_custkey")])?
634                .build()?,
635        );
636
637        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
638            .filter(in_subquery(col("customer.c_custkey"), sq))?
639            .project(vec![col("customer.c_custkey")])?
640            .build()?;
641
642        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
643        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
644        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
645        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
646        \n      Projection: orders.o_custkey [o_custkey:Int64]\
647        \n        Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
648        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
649
650        assert_optimized_plan_eq_display_indent(
651            Arc::new(DecorrelatePredicateSubquery::new()),
652            plan,
653            expected,
654        );
655        Ok(())
656    }
657
658    /// Test for correlated IN subquery with no columns in schema
659    #[test]
660    fn in_subquery_no_cols() -> Result<()> {
661        let sq = Arc::new(
662            LogicalPlanBuilder::from(scan_tpch_table("orders"))
663                .filter(
664                    out_ref_col(DataType::Int64, "customer.c_custkey")
665                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
666                )?
667                .project(vec![col("orders.o_custkey")])?
668                .build()?,
669        );
670
671        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
672            .filter(in_subquery(col("customer.c_custkey"), sq))?
673            .project(vec![col("customer.c_custkey")])?
674            .build()?;
675
676        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
677        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
678        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
679        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
680        \n      Projection: orders.o_custkey [o_custkey:Int64]\
681        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
682
683        assert_optimized_plan_eq_display_indent(
684            Arc::new(DecorrelatePredicateSubquery::new()),
685            plan,
686            expected,
687        );
688        Ok(())
689    }
690
691    /// Test for IN subquery with both columns in schema
692    #[test]
693    fn in_subquery_with_no_correlated_cols() -> Result<()> {
694        let sq = Arc::new(
695            LogicalPlanBuilder::from(scan_tpch_table("orders"))
696                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
697                .project(vec![col("orders.o_custkey")])?
698                .build()?,
699        );
700
701        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
702            .filter(in_subquery(col("customer.c_custkey"), sq))?
703            .project(vec![col("customer.c_custkey")])?
704            .build()?;
705
706        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
707        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
708        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
709        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
710        \n      Projection: orders.o_custkey [o_custkey:Int64]\
711        \n        Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
712        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
713
714        assert_optimized_plan_eq_display_indent(
715            Arc::new(DecorrelatePredicateSubquery::new()),
716            plan,
717            expected,
718        );
719        Ok(())
720    }
721
722    /// Test for correlated IN subquery not equal
723    #[test]
724    fn in_subquery_where_not_eq() -> Result<()> {
725        let sq = Arc::new(
726            LogicalPlanBuilder::from(scan_tpch_table("orders"))
727                .filter(
728                    out_ref_col(DataType::Int64, "customer.c_custkey")
729                        .not_eq(col("orders.o_custkey")),
730                )?
731                .project(vec![col("orders.o_custkey")])?
732                .build()?,
733        );
734
735        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
736            .filter(in_subquery(col("customer.c_custkey"), sq))?
737            .project(vec![col("customer.c_custkey")])?
738            .build()?;
739
740        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
741        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
742        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
743        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
744        \n      Projection: orders.o_custkey [o_custkey:Int64]\
745        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
746
747        assert_optimized_plan_eq_display_indent(
748            Arc::new(DecorrelatePredicateSubquery::new()),
749            plan,
750            expected,
751        );
752        Ok(())
753    }
754
755    /// Test for correlated IN subquery less than
756    #[test]
757    fn in_subquery_where_less_than() -> Result<()> {
758        let sq = Arc::new(
759            LogicalPlanBuilder::from(scan_tpch_table("orders"))
760                .filter(
761                    out_ref_col(DataType::Int64, "customer.c_custkey")
762                        .lt(col("orders.o_custkey")),
763                )?
764                .project(vec![col("orders.o_custkey")])?
765                .build()?,
766        );
767
768        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
769            .filter(in_subquery(col("customer.c_custkey"), sq))?
770            .project(vec![col("customer.c_custkey")])?
771            .build()?;
772
773        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
774        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
775        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
776        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
777        \n      Projection: orders.o_custkey [o_custkey:Int64]\
778        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
779
780        assert_optimized_plan_eq_display_indent(
781            Arc::new(DecorrelatePredicateSubquery::new()),
782            plan,
783            expected,
784        );
785        Ok(())
786    }
787
788    /// Test for correlated IN subquery filter with subquery disjunction
789    #[test]
790    fn in_subquery_with_subquery_disjunction() -> Result<()> {
791        let sq = Arc::new(
792            LogicalPlanBuilder::from(scan_tpch_table("orders"))
793                .filter(
794                    out_ref_col(DataType::Int64, "customer.c_custkey")
795                        .eq(col("orders.o_custkey"))
796                        .or(col("o_orderkey").eq(lit(1))),
797                )?
798                .project(vec![col("orders.o_custkey")])?
799                .build()?,
800        );
801
802        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
803            .filter(in_subquery(col("customer.c_custkey"), sq))?
804            .project(vec![col("customer.c_custkey")])?
805            .build()?;
806
807        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
808        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]\
809        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
810        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]\
811        \n      Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]\
812        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
813
814        assert_optimized_plan_eq_display_indent(
815            Arc::new(DecorrelatePredicateSubquery::new()),
816            plan,
817            expected,
818        );
819
820        Ok(())
821    }
822
823    /// Test for correlated IN without projection
824    #[test]
825    fn in_subquery_no_projection() -> Result<()> {
826        let sq = Arc::new(
827            LogicalPlanBuilder::from(scan_tpch_table("orders"))
828                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
829                .build()?,
830        );
831
832        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
833            .filter(in_subquery(col("customer.c_custkey"), sq))?
834            .project(vec![col("customer.c_custkey")])?
835            .build()?;
836
837        // Maybe okay if the table only has a single column?
838        let expected = "Invalid (non-executable) plan after Analyzer\
839        \ncaused by\
840        \nError during planning: InSubquery should only return one column, but found 4";
841        assert_analyzer_check_err(vec![], plan, expected);
842
843        Ok(())
844    }
845
846    /// Test for correlated IN subquery join on expression
847    #[test]
848    fn in_subquery_join_expr() -> Result<()> {
849        let sq = Arc::new(
850            LogicalPlanBuilder::from(scan_tpch_table("orders"))
851                .filter(
852                    out_ref_col(DataType::Int64, "customer.c_custkey")
853                        .eq(col("orders.o_custkey")),
854                )?
855                .project(vec![col("orders.o_custkey")])?
856                .build()?,
857        );
858
859        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
860            .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
861            .project(vec![col("customer.c_custkey")])?
862            .build()?;
863
864        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
865        \n  LeftSemi Join:  Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
866        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
867        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
868        \n      Projection: orders.o_custkey [o_custkey:Int64]\
869        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
870
871        assert_optimized_plan_eq_display_indent(
872            Arc::new(DecorrelatePredicateSubquery::new()),
873            plan,
874            expected,
875        );
876        Ok(())
877    }
878
879    /// Test for correlated IN expressions
880    #[test]
881    fn in_subquery_project_expr() -> Result<()> {
882        let sq = Arc::new(
883            LogicalPlanBuilder::from(scan_tpch_table("orders"))
884                .filter(
885                    out_ref_col(DataType::Int64, "customer.c_custkey")
886                        .eq(col("orders.o_custkey")),
887                )?
888                .project(vec![col("orders.o_custkey").add(lit(1))])?
889                .build()?,
890        );
891
892        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
893            .filter(in_subquery(col("customer.c_custkey"), sq))?
894            .project(vec![col("customer.c_custkey")])?
895            .build()?;
896
897        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
898        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
899        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
900        \n    SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
901        \n      Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
902        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
903
904        assert_optimized_plan_eq_display_indent(
905            Arc::new(DecorrelatePredicateSubquery::new()),
906            plan,
907            expected,
908        );
909        Ok(())
910    }
911
912    /// Test for correlated IN subquery multiple projected columns
913    #[test]
914    fn in_subquery_multi_col() -> Result<()> {
915        let sq = Arc::new(
916            LogicalPlanBuilder::from(scan_tpch_table("orders"))
917                .filter(
918                    out_ref_col(DataType::Int64, "customer.c_custkey")
919                        .eq(col("orders.o_custkey")),
920                )?
921                .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
922                .build()?,
923        );
924
925        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
926            .filter(
927                in_subquery(col("customer.c_custkey"), sq)
928                    .and(col("c_custkey").eq(lit(1))),
929            )?
930            .project(vec![col("customer.c_custkey")])?
931            .build()?;
932
933        let expected = "Invalid (non-executable) plan after Analyzer\
934        \ncaused by\
935        \nError during planning: InSubquery should only return one column";
936        assert_analyzer_check_err(vec![], plan, expected);
937
938        Ok(())
939    }
940
941    /// Test for correlated IN subquery filter with additional filters
942    #[test]
943    fn should_support_additional_filters() -> Result<()> {
944        let sq = Arc::new(
945            LogicalPlanBuilder::from(scan_tpch_table("orders"))
946                .filter(
947                    out_ref_col(DataType::Int64, "customer.c_custkey")
948                        .eq(col("orders.o_custkey")),
949                )?
950                .project(vec![col("orders.o_custkey")])?
951                .build()?,
952        );
953
954        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
955            .filter(
956                in_subquery(col("customer.c_custkey"), sq)
957                    .and(col("c_custkey").eq(lit(1))),
958            )?
959            .project(vec![col("customer.c_custkey")])?
960            .build()?;
961
962        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
963        \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
964        \n    LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
965        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
966        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
967        \n        Projection: orders.o_custkey [o_custkey:Int64]\
968        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
969
970        assert_optimized_plan_eq_display_indent(
971            Arc::new(DecorrelatePredicateSubquery::new()),
972            plan,
973            expected,
974        );
975        Ok(())
976    }
977
978    /// Test for correlated IN subquery filter
979    #[test]
980    fn in_subquery_correlated() -> Result<()> {
981        let sq = Arc::new(
982            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
983                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
984                .project(vec![col("c")])?
985                .build()?,
986        );
987
988        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
989            .filter(in_subquery(col("c"), sq))?
990            .project(vec![col("test.b")])?
991            .build()?;
992
993        let expected = "Projection: test.b [b:UInt32]\
994        \n  LeftSemi Join:  Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
995        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
996        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
997        \n      Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
998        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
999
1000        assert_optimized_plan_eq_display_indent(
1001            Arc::new(DecorrelatePredicateSubquery::new()),
1002            plan,
1003            expected,
1004        );
1005        Ok(())
1006    }
1007
1008    /// Test for single IN subquery filter
1009    #[test]
1010    fn in_subquery_simple() -> Result<()> {
1011        let table_scan = test_table_scan()?;
1012        let plan = LogicalPlanBuilder::from(table_scan)
1013            .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1014            .project(vec![col("test.b")])?
1015            .build()?;
1016
1017        let expected = "Projection: test.b [b:UInt32]\
1018        \n  LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1019        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1020        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1021        \n      Projection: sq.c [c:UInt32]\
1022        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1023
1024        assert_optimized_plan_eq_display_indent(
1025            Arc::new(DecorrelatePredicateSubquery::new()),
1026            plan,
1027            expected,
1028        );
1029        Ok(())
1030    }
1031
1032    /// Test for single NOT IN subquery filter
1033    #[test]
1034    fn not_in_subquery_simple() -> Result<()> {
1035        let table_scan = test_table_scan()?;
1036        let plan = LogicalPlanBuilder::from(table_scan)
1037            .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1038            .project(vec![col("test.b")])?
1039            .build()?;
1040
1041        let expected = "Projection: test.b [b:UInt32]\
1042        \n  LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1043        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1044        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1045        \n      Projection: sq.c [c:UInt32]\
1046        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1047
1048        assert_optimized_plan_eq_display_indent(
1049            Arc::new(DecorrelatePredicateSubquery::new()),
1050            plan,
1051            expected,
1052        );
1053        Ok(())
1054    }
1055
1056    #[test]
1057    fn wrapped_not_in_subquery() -> Result<()> {
1058        let table_scan = test_table_scan()?;
1059        let plan = LogicalPlanBuilder::from(table_scan)
1060            .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1061            .project(vec![col("test.b")])?
1062            .build()?;
1063
1064        let expected = "Projection: test.b [b:UInt32]\
1065        \n  LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1066        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1067        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1068        \n      Projection: sq.c [c:UInt32]\
1069        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1070
1071        assert_optimized_plan_eq_display_indent(
1072            Arc::new(DecorrelatePredicateSubquery::new()),
1073            plan,
1074            expected,
1075        );
1076        Ok(())
1077    }
1078
1079    #[test]
1080    fn wrapped_not_not_in_subquery() -> Result<()> {
1081        let table_scan = test_table_scan()?;
1082        let plan = LogicalPlanBuilder::from(table_scan)
1083            .filter(not(not_in_subquery(
1084                col("c"),
1085                test_subquery_with_name("sq")?,
1086            )))?
1087            .project(vec![col("test.b")])?
1088            .build()?;
1089
1090        let expected = "Projection: test.b [b:UInt32]\
1091        \n  LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1092        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1093        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1094        \n      Projection: sq.c [c:UInt32]\
1095        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1096
1097        assert_optimized_plan_eq_display_indent(
1098            Arc::new(DecorrelatePredicateSubquery::new()),
1099            plan,
1100            expected,
1101        );
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn in_subquery_both_side_expr() -> Result<()> {
1107        let table_scan = test_table_scan()?;
1108        let subquery_scan = test_table_scan_with_name("sq")?;
1109
1110        let subquery = LogicalPlanBuilder::from(subquery_scan)
1111            .project(vec![col("c") * lit(2u32)])?
1112            .build()?;
1113
1114        let plan = LogicalPlanBuilder::from(table_scan)
1115            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1116            .project(vec![col("test.b")])?
1117            .build()?;
1118
1119        let expected = "Projection: test.b [b:UInt32]\
1120        \n  LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1121        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1122        \n    SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]\
1123        \n      Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]\
1124        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1125
1126        assert_optimized_plan_eq_display_indent(
1127            Arc::new(DecorrelatePredicateSubquery::new()),
1128            plan,
1129            expected,
1130        );
1131        Ok(())
1132    }
1133
1134    #[test]
1135    fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1136        let table_scan = test_table_scan()?;
1137        let subquery_scan = test_table_scan_with_name("sq")?;
1138
1139        let subquery = LogicalPlanBuilder::from(subquery_scan)
1140            .filter(
1141                out_ref_col(DataType::UInt32, "test.a")
1142                    .eq(col("sq.a"))
1143                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1144            )?
1145            .project(vec![col("c") * lit(2u32)])?
1146            .build()?;
1147
1148        let plan = LogicalPlanBuilder::from(table_scan)
1149            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1150            .project(vec![col("test.b")])?
1151            .build()?;
1152
1153        let expected = "Projection: test.b [b:UInt32]\
1154        \n  LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1155        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1156        \n    SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]\
1157        \n      Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]\
1158        \n        Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]\
1159        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1160
1161        assert_optimized_plan_eq_display_indent(
1162            Arc::new(DecorrelatePredicateSubquery::new()),
1163            plan,
1164            expected,
1165        );
1166        Ok(())
1167    }
1168
1169    #[test]
1170    fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1171        let table_scan = test_table_scan()?;
1172        let subquery_scan = test_table_scan_with_name("sq")?;
1173
1174        let subquery = LogicalPlanBuilder::from(subquery_scan)
1175            .filter(
1176                out_ref_col(DataType::UInt32, "test.a")
1177                    .add(out_ref_col(DataType::UInt32, "test.b"))
1178                    .eq(col("sq.a").add(col("sq.b")))
1179                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1180            )?
1181            .project(vec![col("c") * lit(2u32)])?
1182            .build()?;
1183
1184        let plan = LogicalPlanBuilder::from(table_scan)
1185            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1186            .project(vec![col("test.b")])?
1187            .build()?;
1188
1189        let expected = "Projection: test.b [b:UInt32]\
1190        \n  LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]\
1191        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1192        \n    SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]\
1193        \n      Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]\
1194        \n        Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]\
1195        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1196
1197        assert_optimized_plan_eq_display_indent(
1198            Arc::new(DecorrelatePredicateSubquery::new()),
1199            plan,
1200            expected,
1201        );
1202        Ok(())
1203    }
1204
1205    #[test]
1206    fn two_in_subquery_with_outer_filter() -> Result<()> {
1207        let table_scan = test_table_scan()?;
1208        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1209        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1210
1211        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1212            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1213            .project(vec![col("c") * lit(2u32)])?
1214            .build()?;
1215
1216        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1217            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1218            .project(vec![col("c") * lit(2u32)])?
1219            .build()?;
1220
1221        let plan = LogicalPlanBuilder::from(table_scan)
1222            .filter(
1223                in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1224                    in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1225                        .and(col("test.c").gt(lit(1u32))),
1226                ),
1227            )?
1228            .project(vec![col("test.b")])?
1229            .build()?;
1230
1231        let expected = "Projection: test.b [b:UInt32]\
1232        \n  Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
1233        \n    LeftSemi Join:  Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
1234        \n      LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1235        \n        TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1236        \n        SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]\
1237        \n          Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]\
1238        \n            TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
1239        \n      SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]\
1240        \n        Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]\
1241        \n          TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
1242
1243        assert_optimized_plan_eq_display_indent(
1244            Arc::new(DecorrelatePredicateSubquery::new()),
1245            plan,
1246            expected,
1247        );
1248        Ok(())
1249    }
1250
1251    #[test]
1252    fn in_subquery_with_same_table() -> Result<()> {
1253        let outer_scan = test_table_scan()?;
1254        let subquery_scan = test_table_scan()?;
1255        let subquery = LogicalPlanBuilder::from(subquery_scan)
1256            .filter(col("test.a").gt(col("test.b")))?
1257            .project(vec![col("c")])?
1258            .build()?;
1259
1260        let plan = LogicalPlanBuilder::from(outer_scan)
1261            .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1262            .project(vec![col("test.b")])?
1263            .build()?;
1264
1265        // Subquery and outer query refer to the same table.
1266        let expected = "Projection: test.b [b:UInt32]\
1267                      \n  LeftSemi Join:  Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1268                      \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1269                      \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1270                      \n      Projection: test.c [c:UInt32]\
1271                      \n        Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]\
1272                      \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
1273
1274        assert_optimized_plan_eq_display_indent(
1275            Arc::new(DecorrelatePredicateSubquery::new()),
1276            plan,
1277            expected,
1278        );
1279        Ok(())
1280    }
1281
1282    /// Test for multiple exists subqueries in the same filter expression
1283    #[test]
1284    fn multiple_exists_subqueries() -> Result<()> {
1285        let orders = Arc::new(
1286            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1287                .filter(
1288                    col("orders.o_custkey")
1289                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1290                )?
1291                .project(vec![col("orders.o_custkey")])?
1292                .build()?,
1293        );
1294
1295        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1296            .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1297            .project(vec![col("customer.c_custkey")])?
1298            .build()?;
1299
1300        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1301                        \n  LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1302                        \n    LeftSemi Join:  Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1303                        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1304                        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1305                        \n        Projection: orders.o_custkey [o_custkey:Int64]\
1306                        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1307                        \n    SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
1308                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1309                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1310        assert_optimized_plan_equal(plan, expected)
1311    }
1312
1313    /// Test recursive correlated subqueries
1314    #[test]
1315    fn recursive_exists_subqueries() -> Result<()> {
1316        let lineitem = Arc::new(
1317            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1318                .filter(
1319                    col("lineitem.l_orderkey")
1320                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1321                )?
1322                .project(vec![col("lineitem.l_orderkey")])?
1323                .build()?,
1324        );
1325
1326        let orders = Arc::new(
1327            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1328                .filter(
1329                    exists(lineitem).and(
1330                        col("orders.o_custkey")
1331                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1332                    ),
1333                )?
1334                .project(vec![col("orders.o_custkey")])?
1335                .build()?,
1336        );
1337
1338        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1339            .filter(exists(orders))?
1340            .project(vec![col("customer.c_custkey")])?
1341            .build()?;
1342
1343        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1344                        \n  LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]\
1345                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1346                        \n    SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]\
1347                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1348                        \n        LeftSemi Join:  Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1349                        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1350                        \n          SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]\
1351                        \n            Projection: lineitem.l_orderkey [l_orderkey:Int64]\
1352                        \n              TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]";
1353        assert_optimized_plan_equal(plan, expected)
1354    }
1355
1356    /// Test for correlated exists subquery filter with additional subquery filters
1357    #[test]
1358    fn exists_subquery_with_subquery_filters() -> Result<()> {
1359        let sq = Arc::new(
1360            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1361                .filter(
1362                    out_ref_col(DataType::Int64, "customer.c_custkey")
1363                        .eq(col("orders.o_custkey"))
1364                        .and(col("o_orderkey").eq(lit(1))),
1365                )?
1366                .project(vec![col("orders.o_custkey")])?
1367                .build()?,
1368        );
1369
1370        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1371            .filter(exists(sq))?
1372            .project(vec![col("customer.c_custkey")])?
1373            .build()?;
1374
1375        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1376                        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1377                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1378                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1379                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1380                        \n        Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1381                        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1382
1383        assert_optimized_plan_equal(plan, expected)
1384    }
1385
1386    #[test]
1387    fn exists_subquery_no_cols() -> Result<()> {
1388        let sq = Arc::new(
1389            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1390                .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1391                .project(vec![col("orders.o_custkey")])?
1392                .build()?,
1393        );
1394
1395        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1396            .filter(exists(sq))?
1397            .project(vec![col("customer.c_custkey")])?
1398            .build()?;
1399
1400        // Other rule will pushdown `customer.c_custkey = 1`,
1401        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1402                        \n  LeftSemi Join:  Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]\
1403                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1404                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1405                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1406                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1407
1408        assert_optimized_plan_equal(plan, expected)
1409    }
1410
1411    /// Test for exists subquery with both columns in schema
1412    #[test]
1413    fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1414        let sq = Arc::new(
1415            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1416                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1417                .project(vec![col("orders.o_custkey")])?
1418                .build()?,
1419        );
1420
1421        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1422            .filter(exists(sq))?
1423            .project(vec![col("customer.c_custkey")])?
1424            .build()?;
1425
1426        let expected  = "Projection: customer.c_custkey [c_custkey:Int64]\
1427                        \n  LeftSemi Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]\
1428                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1429                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1430                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1431                        \n        Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1432                        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1433        assert_optimized_plan_equal(plan, expected)
1434    }
1435
1436    /// Test for correlated exists subquery not equal
1437    #[test]
1438    fn exists_subquery_where_not_eq() -> Result<()> {
1439        let sq = Arc::new(
1440            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1441                .filter(
1442                    out_ref_col(DataType::Int64, "customer.c_custkey")
1443                        .not_eq(col("orders.o_custkey")),
1444                )?
1445                .project(vec![col("orders.o_custkey")])?
1446                .build()?,
1447        );
1448
1449        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1450            .filter(exists(sq))?
1451            .project(vec![col("customer.c_custkey")])?
1452            .build()?;
1453
1454        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1455                        \n  LeftSemi Join:  Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1456                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1457                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1458                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1459                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1460
1461        assert_optimized_plan_equal(plan, expected)
1462    }
1463
1464    /// Test for correlated exists subquery less than
1465    #[test]
1466    fn exists_subquery_where_less_than() -> Result<()> {
1467        let sq = Arc::new(
1468            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1469                .filter(
1470                    out_ref_col(DataType::Int64, "customer.c_custkey")
1471                        .lt(col("orders.o_custkey")),
1472                )?
1473                .project(vec![col("orders.o_custkey")])?
1474                .build()?,
1475        );
1476
1477        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1478            .filter(exists(sq))?
1479            .project(vec![col("customer.c_custkey")])?
1480            .build()?;
1481
1482        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1483                        \n  LeftSemi Join:  Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1484                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1485                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1486                        \n      Projection: orders.o_custkey [o_custkey:Int64]\
1487                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1488
1489        assert_optimized_plan_equal(plan, expected)
1490    }
1491
1492    /// Test for correlated exists subquery filter with subquery disjunction
1493    #[test]
1494    fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1495        let sq = Arc::new(
1496            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1497                .filter(
1498                    out_ref_col(DataType::Int64, "customer.c_custkey")
1499                        .eq(col("orders.o_custkey"))
1500                        .or(col("o_orderkey").eq(lit(1))),
1501                )?
1502                .project(vec![col("orders.o_custkey")])?
1503                .build()?,
1504        );
1505
1506        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1507            .filter(exists(sq))?
1508            .project(vec![col("customer.c_custkey")])?
1509            .build()?;
1510
1511        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1512                        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
1513                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1514                        \n    SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]\
1515                        \n      Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]\
1516                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1517
1518        assert_optimized_plan_equal(plan, expected)
1519    }
1520
1521    /// Test for correlated exists without projection
1522    #[test]
1523    fn exists_subquery_no_projection() -> Result<()> {
1524        let sq = Arc::new(
1525            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1526                .filter(
1527                    out_ref_col(DataType::Int64, "customer.c_custkey")
1528                        .eq(col("orders.o_custkey")),
1529                )?
1530                .build()?,
1531        );
1532
1533        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1534            .filter(exists(sq))?
1535            .project(vec![col("customer.c_custkey")])?
1536            .build()?;
1537
1538        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1539                        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1540                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1541                        \n    SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1542                        \n      TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1543
1544        assert_optimized_plan_equal(plan, expected)
1545    }
1546
1547    /// Test for correlated exists expressions
1548    #[test]
1549    fn exists_subquery_project_expr() -> Result<()> {
1550        let sq = Arc::new(
1551            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1552                .filter(
1553                    out_ref_col(DataType::Int64, "customer.c_custkey")
1554                        .eq(col("orders.o_custkey")),
1555                )?
1556                .project(vec![col("orders.o_custkey").add(lit(1))])?
1557                .build()?,
1558        );
1559
1560        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1561            .filter(exists(sq))?
1562            .project(vec![col("customer.c_custkey")])?
1563            .build()?;
1564
1565        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1566                        \n  LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1567                        \n    TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1568                        \n    SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
1569                        \n      Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]\
1570                        \n        TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1571
1572        assert_optimized_plan_equal(plan, expected)
1573    }
1574
1575    /// Test for correlated exists subquery filter with additional filters
1576    #[test]
1577    fn exists_subquery_should_support_additional_filters() -> Result<()> {
1578        let sq = Arc::new(
1579            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1580                .filter(
1581                    out_ref_col(DataType::Int64, "customer.c_custkey")
1582                        .eq(col("orders.o_custkey")),
1583                )?
1584                .project(vec![col("orders.o_custkey")])?
1585                .build()?,
1586        );
1587        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1588            .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1589            .project(vec![col("customer.c_custkey")])?
1590            .build()?;
1591
1592        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1593                        \n  Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]\
1594                        \n    LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]\
1595                        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1596                        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1597                        \n        Projection: orders.o_custkey [o_custkey:Int64]\
1598                        \n          TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1599
1600        assert_optimized_plan_equal(plan, expected)
1601    }
1602
1603    /// Test for correlated exists subquery filter with disjunctions
1604    #[test]
1605    fn exists_subquery_disjunction() -> Result<()> {
1606        let sq = Arc::new(
1607            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1608                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1609                .project(vec![col("orders.o_custkey")])?
1610                .build()?,
1611        );
1612
1613        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1614            .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1615            .project(vec![col("customer.c_custkey")])?
1616            .build()?;
1617
1618        let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
1619                        \n  Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]\
1620                        \n    LeftMark Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]\
1621                        \n      TableScan: customer [c_custkey:Int64, c_name:Utf8]\
1622                        \n      SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]\
1623                        \n        Projection: orders.o_custkey [o_custkey:Int64]\
1624                        \n          Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\
1625                        \n            TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]";
1626
1627        assert_optimized_plan_equal(plan, expected)
1628    }
1629
1630    /// Test for correlated EXISTS subquery filter
1631    #[test]
1632    fn exists_subquery_correlated() -> Result<()> {
1633        let sq = Arc::new(
1634            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1635                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1636                .project(vec![col("c")])?
1637                .build()?,
1638        );
1639
1640        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1641            .filter(exists(sq))?
1642            .project(vec![col("test.c")])?
1643            .build()?;
1644
1645        let expected  = "Projection: test.c [c:UInt32]\
1646                        \n  LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1647                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1648                        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1649                        \n      Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
1650                        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1651
1652        assert_optimized_plan_equal(plan, expected)
1653    }
1654
1655    /// Test for single exists subquery filter
1656    #[test]
1657    fn exists_subquery_simple() -> Result<()> {
1658        let table_scan = test_table_scan()?;
1659        let plan = LogicalPlanBuilder::from(table_scan)
1660            .filter(exists(test_subquery_with_name("sq")?))?
1661            .project(vec![col("test.b")])?
1662            .build()?;
1663
1664        let expected  = "Projection: test.b [b:UInt32]\
1665                        \n  LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1666                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1667                        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1668                        \n      Projection: sq.c [c:UInt32]\
1669                        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1670        assert_optimized_plan_equal(plan, expected)
1671    }
1672
1673    /// Test for single NOT exists subquery filter
1674    #[test]
1675    fn not_exists_subquery_simple() -> Result<()> {
1676        let table_scan = test_table_scan()?;
1677        let plan = LogicalPlanBuilder::from(table_scan)
1678            .filter(not_exists(test_subquery_with_name("sq")?))?
1679            .project(vec![col("test.b")])?
1680            .build()?;
1681
1682        let expected  = "Projection: test.b [b:UInt32]\
1683                        \n  LeftAnti Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1684                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1685                        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1686                        \n      Projection: sq.c [c:UInt32]\
1687                        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1688        assert_optimized_plan_equal(plan, expected)
1689    }
1690
1691    #[test]
1692    fn two_exists_subquery_with_outer_filter() -> Result<()> {
1693        let table_scan = test_table_scan()?;
1694        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1695        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1696
1697        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1698            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1699            .project(vec![col("c")])?
1700            .build()?;
1701
1702        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1703            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1704            .project(vec![col("c")])?
1705            .build()?;
1706
1707        let plan = LogicalPlanBuilder::from(table_scan)
1708            .filter(
1709                exists(Arc::new(subquery1))
1710                    .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1711            )?
1712            .project(vec![col("test.b")])?
1713            .build()?;
1714
1715        let expected = "Projection: test.b [b:UInt32]\
1716                        \n  Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]\
1717                        \n    LeftSemi Join:  Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]\
1718                        \n      LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]\
1719                        \n        TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1720                        \n        SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1721                        \n          Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]\
1722                        \n            TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]\
1723                        \n      SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]\
1724                        \n        Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]\
1725                        \n          TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]";
1726
1727        assert_optimized_plan_equal(plan, expected)
1728    }
1729
1730    #[test]
1731    fn exists_subquery_expr_filter() -> Result<()> {
1732        let table_scan = test_table_scan()?;
1733        let subquery_scan = test_table_scan_with_name("sq")?;
1734        let subquery = LogicalPlanBuilder::from(subquery_scan)
1735            .filter(
1736                (lit(1u32) + col("sq.a"))
1737                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1738            )?
1739            .project(vec![lit(1u32)])?
1740            .build()?;
1741        let plan = LogicalPlanBuilder::from(table_scan)
1742            .filter(exists(Arc::new(subquery)))?
1743            .project(vec![col("test.b")])?
1744            .build()?;
1745
1746        let expected = "Projection: test.b [b:UInt32]\
1747                        \n  LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1748                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1749                        \n    SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]\
1750                        \n      Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]\
1751                        \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1752
1753        assert_optimized_plan_equal(plan, expected)
1754    }
1755
1756    #[test]
1757    fn exists_subquery_with_same_table() -> Result<()> {
1758        let outer_scan = test_table_scan()?;
1759        let subquery_scan = test_table_scan()?;
1760        let subquery = LogicalPlanBuilder::from(subquery_scan)
1761            .filter(col("test.a").gt(col("test.b")))?
1762            .project(vec![col("c")])?
1763            .build()?;
1764
1765        let plan = LogicalPlanBuilder::from(outer_scan)
1766            .filter(exists(Arc::new(subquery)))?
1767            .project(vec![col("test.b")])?
1768            .build()?;
1769
1770        // Subquery and outer query refer to the same table.
1771        let expected = "Projection: test.b [b:UInt32]\
1772                      \n  LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1773                      \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1774                      \n    SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1775                      \n      Projection: test.c [c:UInt32]\
1776                      \n        Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]\
1777                      \n          TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
1778
1779        assert_optimized_plan_equal(plan, expected)
1780    }
1781
1782    #[test]
1783    fn exists_distinct_subquery() -> Result<()> {
1784        let table_scan = test_table_scan()?;
1785        let subquery_scan = test_table_scan_with_name("sq")?;
1786        let subquery = LogicalPlanBuilder::from(subquery_scan)
1787            .filter(
1788                (lit(1u32) + col("sq.a"))
1789                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1790            )?
1791            .project(vec![col("sq.c")])?
1792            .distinct()?
1793            .build()?;
1794        let plan = LogicalPlanBuilder::from(table_scan)
1795            .filter(exists(Arc::new(subquery)))?
1796            .project(vec![col("test.b")])?
1797            .build()?;
1798
1799        let expected = "Projection: test.b [b:UInt32]\
1800                        \n  LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1801                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1802                        \n    SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]\
1803                        \n      Distinct: [c:UInt32, a:UInt32]\
1804                        \n        Projection: sq.c, sq.a [c:UInt32, a:UInt32]\
1805                        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1806
1807        assert_optimized_plan_equal(plan, expected)
1808    }
1809
1810    #[test]
1811    fn exists_distinct_expr_subquery() -> Result<()> {
1812        let table_scan = test_table_scan()?;
1813        let subquery_scan = test_table_scan_with_name("sq")?;
1814        let subquery = LogicalPlanBuilder::from(subquery_scan)
1815            .filter(
1816                (lit(1u32) + col("sq.a"))
1817                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1818            )?
1819            .project(vec![col("sq.b") + col("sq.c")])?
1820            .distinct()?
1821            .build()?;
1822        let plan = LogicalPlanBuilder::from(table_scan)
1823            .filter(exists(Arc::new(subquery)))?
1824            .project(vec![col("test.b")])?
1825            .build()?;
1826
1827        let expected = "Projection: test.b [b:UInt32]\
1828                        \n  LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1829                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1830                        \n    SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]\
1831                        \n      Distinct: [sq.b + sq.c:UInt32, a:UInt32]\
1832                        \n        Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]\
1833                        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1834
1835        assert_optimized_plan_equal(plan, expected)
1836    }
1837
1838    #[test]
1839    fn exists_distinct_subquery_with_literal() -> Result<()> {
1840        let table_scan = test_table_scan()?;
1841        let subquery_scan = test_table_scan_with_name("sq")?;
1842        let subquery = LogicalPlanBuilder::from(subquery_scan)
1843            .filter(
1844                (lit(1u32) + col("sq.a"))
1845                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1846            )?
1847            .project(vec![lit(1u32), col("sq.c")])?
1848            .distinct()?
1849            .build()?;
1850        let plan = LogicalPlanBuilder::from(table_scan)
1851            .filter(exists(Arc::new(subquery)))?
1852            .project(vec![col("test.b")])?
1853            .build()?;
1854
1855        let expected = "Projection: test.b [b:UInt32]\
1856                        \n  LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]\
1857                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1858                        \n    SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1859                        \n      Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1860                        \n        Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]\
1861                        \n          TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1862
1863        assert_optimized_plan_equal(plan, expected)
1864    }
1865
1866    #[test]
1867    fn exists_uncorrelated_unnest() -> Result<()> {
1868        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1869            "arr",
1870            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1871            true,
1872        )]));
1873        let subquery = LogicalPlanBuilder::scan_with_filters(
1874            "sq",
1875            subquery_table_source,
1876            None,
1877            vec![],
1878        )?
1879        .unnest_column("arr")?
1880        .build()?;
1881        let table_scan = test_table_scan()?;
1882        let plan = LogicalPlanBuilder::from(table_scan)
1883            .filter(exists(Arc::new(subquery)))?
1884            .project(vec![col("test.b")])?
1885            .build()?;
1886
1887        let expected = "Projection: test.b [b:UInt32]\
1888                        \n  LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]\
1889                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1890                        \n    SubqueryAlias: __correlated_sq_1 [arr:Int32;N]\
1891                        \n      Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]\
1892                        \n        TableScan: sq [arr:List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]";
1893        assert_optimized_plan_equal(plan, expected)
1894    }
1895
1896    #[test]
1897    fn exists_correlated_unnest() -> Result<()> {
1898        let table_scan = test_table_scan()?;
1899        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1900            "a",
1901            DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
1902            true,
1903        )]));
1904        let subquery = LogicalPlanBuilder::scan_with_filters(
1905            "sq",
1906            subquery_table_source,
1907            None,
1908            vec![],
1909        )?
1910        .unnest_column("a")?
1911        .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
1912        .build()?;
1913        let plan = LogicalPlanBuilder::from(table_scan)
1914            .filter(exists(Arc::new(subquery)))?
1915            .project(vec![col("test.b")])?
1916            .build()?;
1917
1918        let expected = "Projection: test.b [b:UInt32]\
1919                        \n  LeftSemi Join:  Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]\
1920                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1921                        \n    SubqueryAlias: __correlated_sq_1 [a:UInt32;N]\
1922                        \n      Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]\
1923                        \n        TableScan: sq [a:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]";
1924
1925        assert_optimized_plan_equal(plan, expected)
1926    }
1927
1928    #[test]
1929    fn upper_case_ident() -> Result<()> {
1930        let fields = vec![
1931            Field::new("A", DataType::UInt32, false),
1932            Field::new("B", DataType::UInt32, false),
1933        ];
1934
1935        let schema = Schema::new(fields);
1936        let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
1937        let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
1938
1939        let subquery = LogicalPlanBuilder::from(table_scan_b)
1940            .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
1941            .project(vec![lit(1)])?
1942            .build()?;
1943
1944        let plan = LogicalPlanBuilder::from(table_scan_a)
1945            .filter(exists(Arc::new(subquery)))?
1946            .project(vec![col("\"TEST_A\".\"B\"")])?
1947            .build()?;
1948
1949        let expected = "Projection: TEST_A.B [B:UInt32]\
1950        \n  LeftSemi Join:  Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]\
1951        \n    TableScan: TEST_A [A:UInt32, B:UInt32]\
1952        \n    SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]\
1953        \n      Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]\
1954        \n        TableScan: TEST_B [A:UInt32, B:UInt32]";
1955
1956        assert_optimized_plan_equal(plan, expected)
1957    }
1958}