1use std::sync::Arc;
21
22use datafusion_common::tree_node::Transformed;
23use datafusion_common::JoinType;
24use datafusion_common::{plan_err, Result};
25use datafusion_expr::logical_plan::LogicalPlan;
26use datafusion_expr::{EmptyRelation, Projection, Union};
27
28use crate::optimizer::ApplyOrder;
29use crate::{OptimizerConfig, OptimizerRule};
30
31#[derive(Default, Debug)]
33pub struct PropagateEmptyRelation;
34
35impl PropagateEmptyRelation {
36 #[allow(missing_docs)]
37 pub fn new() -> Self {
38 Self {}
39 }
40}
41
42impl OptimizerRule for PropagateEmptyRelation {
43 fn name(&self) -> &str {
44 "propagate_empty_relation"
45 }
46
47 fn apply_order(&self) -> Option<ApplyOrder> {
48 Some(ApplyOrder::BottomUp)
49 }
50
51 fn supports_rewrite(&self) -> bool {
52 true
53 }
54
55 fn rewrite(
56 &self,
57 plan: LogicalPlan,
58 _config: &dyn OptimizerConfig,
59 ) -> Result<Transformed<LogicalPlan>> {
60 match plan {
61 LogicalPlan::EmptyRelation(_) => Ok(Transformed::no(plan)),
62 LogicalPlan::Projection(_)
63 | LogicalPlan::Filter(_)
64 | LogicalPlan::Window(_)
65 | LogicalPlan::Sort(_)
66 | LogicalPlan::SubqueryAlias(_)
67 | LogicalPlan::Repartition(_)
68 | LogicalPlan::Limit(_) => {
69 let empty = empty_child(&plan)?;
70 if let Some(empty_plan) = empty {
71 return Ok(Transformed::yes(empty_plan));
72 }
73 Ok(Transformed::no(plan))
74 }
75 LogicalPlan::Join(ref join) => {
76 let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
82
83 match join.join_type {
84 JoinType::Full if left_empty && right_empty => Ok(Transformed::yes(
86 LogicalPlan::EmptyRelation(EmptyRelation {
87 produce_one_row: false,
88 schema: Arc::clone(&join.schema),
89 }),
90 )),
91 JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
92 LogicalPlan::EmptyRelation(EmptyRelation {
93 produce_one_row: false,
94 schema: Arc::clone(&join.schema),
95 }),
96 )),
97 JoinType::Left if left_empty => Ok(Transformed::yes(
98 LogicalPlan::EmptyRelation(EmptyRelation {
99 produce_one_row: false,
100 schema: Arc::clone(&join.schema),
101 }),
102 )),
103 JoinType::Right if right_empty => Ok(Transformed::yes(
104 LogicalPlan::EmptyRelation(EmptyRelation {
105 produce_one_row: false,
106 schema: Arc::clone(&join.schema),
107 }),
108 )),
109 JoinType::LeftSemi if left_empty || right_empty => Ok(
110 Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
111 produce_one_row: false,
112 schema: Arc::clone(&join.schema),
113 })),
114 ),
115 JoinType::RightSemi if left_empty || right_empty => Ok(
116 Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
117 produce_one_row: false,
118 schema: Arc::clone(&join.schema),
119 })),
120 ),
121 JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
122 LogicalPlan::EmptyRelation(EmptyRelation {
123 produce_one_row: false,
124 schema: Arc::clone(&join.schema),
125 }),
126 )),
127 JoinType::LeftAnti if right_empty => {
128 Ok(Transformed::yes((*join.left).clone()))
129 }
130 JoinType::RightAnti if left_empty => {
131 Ok(Transformed::yes((*join.right).clone()))
132 }
133 JoinType::RightAnti if right_empty => Ok(Transformed::yes(
134 LogicalPlan::EmptyRelation(EmptyRelation {
135 produce_one_row: false,
136 schema: Arc::clone(&join.schema),
137 }),
138 )),
139 _ => Ok(Transformed::no(plan)),
140 }
141 }
142 LogicalPlan::Aggregate(ref agg) => {
143 if !agg.group_expr.is_empty() {
144 if let Some(empty_plan) = empty_child(&plan)? {
145 return Ok(Transformed::yes(empty_plan));
146 }
147 }
148 Ok(Transformed::no(LogicalPlan::Aggregate(agg.clone())))
149 }
150 LogicalPlan::Union(ref union) => {
151 let new_inputs = union
152 .inputs
153 .iter()
154 .filter(|input| match &***input {
155 LogicalPlan::EmptyRelation(empty) => empty.produce_one_row,
156 _ => true,
157 })
158 .cloned()
159 .collect::<Vec<_>>();
160
161 if new_inputs.len() == union.inputs.len() {
162 Ok(Transformed::no(plan))
163 } else if new_inputs.is_empty() {
164 Ok(Transformed::yes(LogicalPlan::EmptyRelation(
165 EmptyRelation {
166 produce_one_row: false,
167 schema: Arc::clone(plan.schema()),
168 },
169 )))
170 } else if new_inputs.len() == 1 {
171 let mut new_inputs = new_inputs;
172 let input_plan = new_inputs.pop().unwrap(); let child = Arc::unwrap_or_clone(input_plan);
174 if child.schema().eq(plan.schema()) {
175 Ok(Transformed::yes(child))
176 } else {
177 Ok(Transformed::yes(LogicalPlan::Projection(
178 Projection::new_from_schema(
179 Arc::new(child),
180 Arc::clone(plan.schema()),
181 ),
182 )))
183 }
184 } else {
185 Ok(Transformed::yes(LogicalPlan::Union(Union {
186 inputs: new_inputs,
187 schema: Arc::clone(&union.schema),
188 })))
189 }
190 }
191
192 _ => Ok(Transformed::no(plan)),
193 }
194 }
195}
196
197fn binary_plan_children_is_empty(plan: &LogicalPlan) -> Result<(bool, bool)> {
198 match plan.inputs()[..] {
199 [left, right] => {
200 let left_empty = match left {
201 LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
202 _ => false,
203 };
204 let right_empty = match right {
205 LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
206 _ => false,
207 };
208 Ok((left_empty, right_empty))
209 }
210 _ => plan_err!("plan just can have two child"),
211 }
212}
213
214fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
215 match plan.inputs()[..] {
216 [child] => match child {
217 LogicalPlan::EmptyRelation(empty) => {
218 if !empty.produce_one_row {
219 Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
220 produce_one_row: false,
221 schema: Arc::clone(plan.schema()),
222 })))
223 } else {
224 Ok(None)
225 }
226 }
227 _ => Ok(None),
228 },
229 _ => plan_err!("plan just can have one child"),
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236
237 use arrow::datatypes::{DataType, Field, Schema};
238
239 use datafusion_common::{Column, DFSchema, JoinType};
240 use datafusion_expr::logical_plan::table_scan;
241 use datafusion_expr::{
242 binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Operator,
243 };
244
245 use crate::eliminate_filter::EliminateFilter;
246 use crate::eliminate_nested_union::EliminateNestedUnion;
247 use crate::test::{
248 assert_optimized_plan_eq, assert_optimized_plan_with_rules, test_table_scan,
249 test_table_scan_fields, test_table_scan_with_name,
250 };
251
252 use super::*;
253
254 fn assert_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
255 assert_optimized_plan_eq(Arc::new(PropagateEmptyRelation::new()), plan, expected)
256 }
257
258 fn assert_together_optimized_plan(
259 plan: LogicalPlan,
260 expected: &str,
261 eq: bool,
262 ) -> Result<()> {
263 assert_optimized_plan_with_rules(
264 vec![
265 Arc::new(EliminateFilter::new()),
266 Arc::new(EliminateNestedUnion::new()),
267 Arc::new(PropagateEmptyRelation::new()),
268 ],
269 plan,
270 expected,
271 eq,
272 )
273 }
274
275 #[test]
276 fn propagate_empty() -> Result<()> {
277 let plan = LogicalPlanBuilder::empty(false)
278 .filter(lit(true))?
279 .limit(10, None)?
280 .project(vec![binary_expr(lit(1), Operator::Plus, lit(1))])?
281 .build()?;
282
283 let expected = "EmptyRelation";
284 assert_eq(plan, expected)
285 }
286
287 #[test]
288 fn cooperate_with_eliminate_filter() -> Result<()> {
289 let table_scan = test_table_scan()?;
290 let left = LogicalPlanBuilder::from(table_scan).build()?;
291 let right_table_scan = test_table_scan_with_name("test2")?;
292 let right = LogicalPlanBuilder::from(right_table_scan)
293 .project(vec![col("a")])?
294 .filter(lit(false))?
295 .build()?;
296
297 let plan = LogicalPlanBuilder::from(left)
298 .join_using(
299 right,
300 JoinType::Inner,
301 vec![Column::from_name("a".to_string())],
302 )?
303 .filter(col("a").lt_eq(lit(1i64)))?
304 .build()?;
305
306 let expected = "EmptyRelation";
307 assert_together_optimized_plan(plan, expected, true)
308 }
309
310 #[test]
311 fn propagate_union_empty() -> Result<()> {
312 let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
313 let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
314 .filter(lit(false))?
315 .build()?;
316
317 let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
318
319 let expected = "Projection: a, b, c\n TableScan: test";
320 assert_together_optimized_plan(plan, expected, true)
321 }
322
323 #[test]
324 fn propagate_union_multi_empty() -> Result<()> {
325 let one =
326 LogicalPlanBuilder::from(test_table_scan_with_name("test1")?).build()?;
327 let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
328 .filter(lit(false))?
329 .build()?;
330 let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
331 .filter(lit(false))?
332 .build()?;
333 let four =
334 LogicalPlanBuilder::from(test_table_scan_with_name("test4")?).build()?;
335
336 let plan = LogicalPlanBuilder::from(one)
337 .union(two)?
338 .union(three)?
339 .union(four)?
340 .build()?;
341
342 let expected = "Union\
343 \n TableScan: test1\
344 \n TableScan: test4";
345 assert_together_optimized_plan(plan, expected, true)
346 }
347
348 #[test]
349 fn propagate_union_all_empty() -> Result<()> {
350 let one = LogicalPlanBuilder::from(test_table_scan_with_name("test1")?)
351 .filter(lit(false))?
352 .build()?;
353 let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
354 .filter(lit(false))?
355 .build()?;
356 let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
357 .filter(lit(false))?
358 .build()?;
359 let four = LogicalPlanBuilder::from(test_table_scan_with_name("test4")?)
360 .filter(lit(false))?
361 .build()?;
362
363 let plan = LogicalPlanBuilder::from(one)
364 .union(two)?
365 .union(three)?
366 .union(four)?
367 .build()?;
368
369 let expected = "EmptyRelation";
370 assert_together_optimized_plan(plan, expected, true)
371 }
372
373 #[test]
374 fn propagate_union_children_different_schema() -> Result<()> {
375 let one_schema = Schema::new(vec![Field::new("t1a", DataType::UInt32, false)]);
376 let t1_scan = table_scan(Some("test1"), &one_schema, None)?.build()?;
377 let one = LogicalPlanBuilder::from(t1_scan)
378 .filter(lit(false))?
379 .build()?;
380
381 let two_schema = Schema::new(vec![Field::new("t2a", DataType::UInt32, false)]);
382 let t2_scan = table_scan(Some("test2"), &two_schema, None)?.build()?;
383 let two = LogicalPlanBuilder::from(t2_scan).build()?;
384
385 let three_schema = Schema::new(vec![Field::new("t3a", DataType::UInt32, false)]);
386 let t3_scan = table_scan(Some("test3"), &three_schema, None)?.build()?;
387 let three = LogicalPlanBuilder::from(t3_scan).build()?;
388
389 let plan = LogicalPlanBuilder::from(one)
390 .union(two)?
391 .union(three)?
392 .build()?;
393
394 let expected = "Union\
395 \n TableScan: test2\
396 \n TableScan: test3";
397 assert_together_optimized_plan(plan, expected, true)
398 }
399
400 #[test]
401 fn propagate_union_alias() -> Result<()> {
402 let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
403 let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
404 .filter(lit(false))?
405 .build()?;
406
407 let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
408
409 let expected = "Projection: a, b, c\n TableScan: test";
410 assert_together_optimized_plan(plan, expected, true)
411 }
412
413 #[test]
414 fn cross_join_empty() -> Result<()> {
415 let table_scan = test_table_scan()?;
416 let left = LogicalPlanBuilder::from(table_scan).build()?;
417 let right = LogicalPlanBuilder::empty(false).build()?;
418
419 let plan = LogicalPlanBuilder::from(left)
420 .cross_join(right)?
421 .filter(col("a").lt_eq(lit(1i64)))?
422 .build()?;
423
424 let expected = "EmptyRelation";
425 assert_together_optimized_plan(plan, expected, true)
426 }
427
428 fn assert_empty_left_empty_right_lp(
429 left_empty: bool,
430 right_empty: bool,
431 join_type: JoinType,
432 eq: bool,
433 ) -> Result<()> {
434 let left_lp = if left_empty {
435 let left_table_scan = test_table_scan()?;
436
437 LogicalPlanBuilder::from(left_table_scan)
438 .filter(lit(false))?
439 .build()
440 } else {
441 let scan = test_table_scan_with_name("left").unwrap();
442 LogicalPlanBuilder::from(scan).build()
443 }?;
444
445 let right_lp = if right_empty {
446 let right_table_scan = test_table_scan_with_name("right")?;
447
448 LogicalPlanBuilder::from(right_table_scan)
449 .filter(lit(false))?
450 .build()
451 } else {
452 let scan = test_table_scan_with_name("right").unwrap();
453 LogicalPlanBuilder::from(scan).build()
454 }?;
455
456 let plan = LogicalPlanBuilder::from(left_lp)
457 .join_using(
458 right_lp,
459 join_type,
460 vec![Column::from_name("a".to_string())],
461 )?
462 .build()?;
463
464 let expected = "EmptyRelation";
465 assert_together_optimized_plan(plan, expected, eq)
466 }
467
468 fn assert_anti_join_empty_join_table_is_base_table(
470 anti_left_join: bool,
471 ) -> Result<()> {
472 let (left, right, join_type, expected) = if anti_left_join {
474 let left = test_table_scan()?;
475 let right = LogicalPlanBuilder::from(test_table_scan()?)
476 .filter(lit(false))?
477 .build()?;
478 let expected = left.display_indent().to_string();
479 (left, right, JoinType::LeftAnti, expected)
480 } else {
481 let right = test_table_scan()?;
482 let left = LogicalPlanBuilder::from(test_table_scan()?)
483 .filter(lit(false))?
484 .build()?;
485 let expected = right.display_indent().to_string();
486 (left, right, JoinType::RightAnti, expected)
487 };
488
489 let plan = LogicalPlanBuilder::from(left)
490 .join_using(right, join_type, vec![Column::from_name("a".to_string())])?
491 .build()?;
492
493 assert_together_optimized_plan(plan, &expected, true)
494 }
495
496 #[test]
497 fn test_join_empty_propagation_rules() -> Result<()> {
498 assert_empty_left_empty_right_lp(true, true, JoinType::Full, true)?;
500
501 assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;
503
504 assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;
506
507 assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;
509
510 assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;
512
513 assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;
515
516 assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;
518
519 assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;
521
522 assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)?;
524
525 assert_anti_join_empty_join_table_is_base_table(true)?;
527
528 assert_anti_join_empty_join_table_is_base_table(false)
530 }
531
532 #[test]
533 fn test_join_empty_propagation_rules_noop() -> Result<()> {
534 assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;
538
539 assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;
541
542 assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;
544
545 assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;
547
548 assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;
550
551 assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;
553
554 assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;
556
557 assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
559 }
560
561 #[test]
562 fn test_empty_with_non_empty() -> Result<()> {
563 let table_scan = test_table_scan()?;
564
565 let fields = test_table_scan_fields();
566
567 let empty = LogicalPlan::EmptyRelation(EmptyRelation {
568 produce_one_row: false,
569 schema: Arc::new(DFSchema::from_unqualified_fields(
570 fields.into(),
571 Default::default(),
572 )?),
573 });
574
575 let one = LogicalPlanBuilder::from(empty.clone()).build()?;
576 let two = LogicalPlanBuilder::from(table_scan).build()?;
577 let three = LogicalPlanBuilder::from(empty).build()?;
578
579 let plan = LogicalPlanBuilder::from(one)
584 .union(two)?
585 .union(three)?
586 .build()?;
587
588 let expected = "Projection: a, b, c\
589 \n TableScan: test";
590
591 assert_together_optimized_plan(plan, expected, true)
592 }
593}