datafusion_physical_plan/
empty.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::memory::MemoryStream;
24use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
25use crate::{
26 execution_plan::{Boundedness, EmissionType},
27 DisplayFormatType, ExecutionPlan, Partitioning,
28};
29
30use arrow::datatypes::SchemaRef;
31use arrow::record_batch::RecordBatch;
32use datafusion_common::{internal_err, Result};
33use datafusion_execution::TaskContext;
34use datafusion_physical_expr::EquivalenceProperties;
35
36use log::trace;
37
38#[derive(Debug, Clone)]
40pub struct EmptyExec {
41 schema: SchemaRef,
43 partitions: usize,
45 cache: PlanProperties,
46}
47
48impl EmptyExec {
49 pub fn new(schema: SchemaRef) -> Self {
51 let cache = Self::compute_properties(Arc::clone(&schema), 1);
52 EmptyExec {
53 schema,
54 partitions: 1,
55 cache,
56 }
57 }
58
59 pub fn with_partitions(mut self, partitions: usize) -> Self {
61 self.partitions = partitions;
62 let output_partitioning = Self::output_partitioning_helper(self.partitions);
64 self.cache = self.cache.with_partitioning(output_partitioning);
65 self
66 }
67
68 fn data(&self) -> Result<Vec<RecordBatch>> {
69 Ok(vec![])
70 }
71
72 fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
73 Partitioning::UnknownPartitioning(n_partitions)
74 }
75
76 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
78 PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Self::output_partitioning_helper(n_partitions),
81 EmissionType::Incremental,
82 Boundedness::Bounded,
83 )
84 }
85}
86
87impl DisplayAs for EmptyExec {
88 fn fmt_as(
89 &self,
90 t: DisplayFormatType,
91 f: &mut std::fmt::Formatter,
92 ) -> std::fmt::Result {
93 match t {
94 DisplayFormatType::Default | DisplayFormatType::Verbose => {
95 write!(f, "EmptyExec")
96 }
97 }
98 }
99}
100
101impl ExecutionPlan for EmptyExec {
102 fn name(&self) -> &'static str {
103 "EmptyExec"
104 }
105
106 fn as_any(&self) -> &dyn Any {
108 self
109 }
110
111 fn properties(&self) -> &PlanProperties {
112 &self.cache
113 }
114
115 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
116 vec![]
117 }
118
119 fn with_new_children(
120 self: Arc<Self>,
121 _: Vec<Arc<dyn ExecutionPlan>>,
122 ) -> Result<Arc<dyn ExecutionPlan>> {
123 Ok(self)
124 }
125
126 fn execute(
127 &self,
128 partition: usize,
129 context: Arc<TaskContext>,
130 ) -> Result<SendableRecordBatchStream> {
131 trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
132
133 if partition >= self.partitions {
134 return internal_err!(
135 "EmptyExec invalid partition {} (expected less than {})",
136 partition,
137 self.partitions
138 );
139 }
140
141 Ok(Box::pin(MemoryStream::try_new(
142 self.data()?,
143 Arc::clone(&self.schema),
144 None,
145 )?))
146 }
147
148 fn statistics(&self) -> Result<Statistics> {
149 let batch = self
150 .data()
151 .expect("Create empty RecordBatch should not fail");
152 Ok(common::compute_record_batch_statistics(
153 &[batch],
154 &self.schema,
155 None,
156 ))
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::test;
164 use crate::with_new_children_if_necessary;
165
166 #[tokio::test]
167 async fn empty() -> Result<()> {
168 let task_ctx = Arc::new(TaskContext::default());
169 let schema = test::aggr_test_schema();
170
171 let empty = EmptyExec::new(Arc::clone(&schema));
172 assert_eq!(empty.schema(), schema);
173
174 let iter = empty.execute(0, task_ctx)?;
176 let batches = common::collect(iter).await?;
177 assert!(batches.is_empty());
178
179 Ok(())
180 }
181
182 #[test]
183 fn with_new_children() -> Result<()> {
184 let schema = test::aggr_test_schema();
185 let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
186
187 let empty2 = with_new_children_if_necessary(
188 Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
189 vec![],
190 )?;
191 assert_eq!(empty.schema(), empty2.schema());
192
193 let too_many_kids = vec![empty2];
194 assert!(
195 with_new_children_if_necessary(empty, too_many_kids).is_err(),
196 "expected error when providing list of kids"
197 );
198 Ok(())
199 }
200
201 #[tokio::test]
202 async fn invalid_execute() -> Result<()> {
203 let task_ctx = Arc::new(TaskContext::default());
204 let schema = test::aggr_test_schema();
205 let empty = EmptyExec::new(schema);
206
207 assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
209 assert!(empty.execute(20, task_ctx).is_err());
210 Ok(())
211 }
212}