datafusion_physical_plan/
empty.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation with produce_one_row=false execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26    execution_plan::{Boundedness, EmissionType},
27    DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38/// Execution plan for empty relation with produce_one_row=false
39#[derive(Debug, Clone)]
40pub struct EmptyExec {
41    /// The schema for the produced row
42    schema: SchemaRef,
43    /// Number of partitions
44    partitions: usize,
45    cache: PlanProperties,
46}
47
48impl EmptyExec {
49    /// Create a new EmptyExec
50    pub fn new(schema: SchemaRef) -> Self {
51        let cache = Self::compute_properties(Arc::clone(&schema), 1);
52        EmptyExec {
53            schema,
54            partitions: 1,
55            cache,
56        }
57    }
58
59    /// Create a new EmptyExec with specified partition number
60    pub fn with_partitions(mut self, partitions: usize) -> Self {
61        self.partitions = partitions;
62        // Changing partitions may invalidate output partitioning, so update it:
63        let output_partitioning = Self::output_partitioning_helper(self.partitions);
64        self.cache = self.cache.with_partitioning(output_partitioning);
65        self
66    }
67
68    fn data(&self) -> Result<Vec<RecordBatch>> {
69        Ok(vec![])
70    }
71
72    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
73        Partitioning::UnknownPartitioning(n_partitions)
74    }
75
76    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
77    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
78        PlanProperties::new(
79            EquivalenceProperties::new(schema),
80            Self::output_partitioning_helper(n_partitions),
81            EmissionType::Incremental,
82            Boundedness::Bounded,
83        )
84    }
85}
86
87impl DisplayAs for EmptyExec {
88    fn fmt_as(
89        &self,
90        t: DisplayFormatType,
91        f: &mut std::fmt::Formatter,
92    ) -> std::fmt::Result {
93        match t {
94            DisplayFormatType::Default | DisplayFormatType::Verbose => {
95                write!(f, "EmptyExec")
96            }
97        }
98    }
99}
100
101impl ExecutionPlan for EmptyExec {
102    fn name(&self) -> &'static str {
103        "EmptyExec"
104    }
105
106    /// Return a reference to Any that can be used for downcasting
107    fn as_any(&self) -> &dyn Any {
108        self
109    }
110
111    fn properties(&self) -> &PlanProperties {
112        &self.cache
113    }
114
115    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
116        vec![]
117    }
118
119    fn with_new_children(
120        self: Arc<Self>,
121        _: Vec<Arc<dyn ExecutionPlan>>,
122    ) -> Result<Arc<dyn ExecutionPlan>> {
123        Ok(self)
124    }
125
126    fn execute(
127        &self,
128        partition: usize,
129        context: Arc<TaskContext>,
130    ) -> Result<SendableRecordBatchStream> {
131        trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
132
133        if partition >= self.partitions {
134            return internal_err!(
135                "EmptyExec invalid partition {} (expected less than {})",
136                partition,
137                self.partitions
138            );
139        }
140
141        Ok(Box::pin(MemoryStream::try_new(
142            self.data()?,
143            Arc::clone(&self.schema),
144            None,
145        )?))
146    }
147
148    fn statistics(&self) -> Result<Statistics> {
149        let batch = self
150            .data()
151            .expect("Create empty RecordBatch should not fail");
152        Ok(common::compute_record_batch_statistics(
153            &[batch],
154            &self.schema,
155            None,
156        ))
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use crate::test;
164    use crate::with_new_children_if_necessary;
165
166    #[tokio::test]
167    async fn empty() -> Result<()> {
168        let task_ctx = Arc::new(TaskContext::default());
169        let schema = test::aggr_test_schema();
170
171        let empty = EmptyExec::new(Arc::clone(&schema));
172        assert_eq!(empty.schema(), schema);
173
174        // We should have no results
175        let iter = empty.execute(0, task_ctx)?;
176        let batches = common::collect(iter).await?;
177        assert!(batches.is_empty());
178
179        Ok(())
180    }
181
182    #[test]
183    fn with_new_children() -> Result<()> {
184        let schema = test::aggr_test_schema();
185        let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
186
187        let empty2 = with_new_children_if_necessary(
188            Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
189            vec![],
190        )?;
191        assert_eq!(empty.schema(), empty2.schema());
192
193        let too_many_kids = vec![empty2];
194        assert!(
195            with_new_children_if_necessary(empty, too_many_kids).is_err(),
196            "expected error when providing list of kids"
197        );
198        Ok(())
199    }
200
201    #[tokio::test]
202    async fn invalid_execute() -> Result<()> {
203        let task_ctx = Arc::new(TaskContext::default());
204        let schema = test::aggr_test_schema();
205        let empty = EmptyExec::new(schema);
206
207        // ask for the wrong partition
208        assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
209        assert!(empty.execute(20, task_ctx).is_err());
210        Ok(())
211    }
212}