datafusion_physical_plan/
placeholder_row.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EmptyRelation produce_one_row=true execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
27use arrow::array::{ArrayRef, NullArray};
28use arrow::array::{RecordBatch, RecordBatchOptions};
29use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
30use datafusion_common::{internal_err, Result};
31use datafusion_execution::TaskContext;
32use datafusion_physical_expr::EquivalenceProperties;
33
34use log::trace;
35
36/// Execution plan for empty relation with produce_one_row=true
37#[derive(Debug, Clone)]
38pub struct PlaceholderRowExec {
39    /// The schema for the produced row
40    schema: SchemaRef,
41    /// Number of partitions
42    partitions: usize,
43    cache: PlanProperties,
44}
45
46impl PlaceholderRowExec {
47    /// Create a new PlaceholderRowExec
48    pub fn new(schema: SchemaRef) -> Self {
49        let partitions = 1;
50        let cache = Self::compute_properties(Arc::clone(&schema), partitions);
51        PlaceholderRowExec {
52            schema,
53            partitions,
54            cache,
55        }
56    }
57
58    /// Create a new PlaceholderRowExecPlaceholderRowExec with specified partition number
59    pub fn with_partitions(mut self, partitions: usize) -> Self {
60        self.partitions = partitions;
61        // Update output partitioning when updating partitions:
62        let output_partitioning = Self::output_partitioning_helper(self.partitions);
63        self.cache = self.cache.with_partitioning(output_partitioning);
64        self
65    }
66
67    fn data(&self) -> Result<Vec<RecordBatch>> {
68        Ok({
69            let n_field = self.schema.fields.len();
70            vec![RecordBatch::try_new_with_options(
71                Arc::new(Schema::new(
72                    (0..n_field)
73                        .map(|i| {
74                            Field::new(format!("placeholder_{i}"), DataType::Null, true)
75                        })
76                        .collect::<Fields>(),
77                )),
78                (0..n_field)
79                    .map(|_i| {
80                        let ret: ArrayRef = Arc::new(NullArray::new(1));
81                        ret
82                    })
83                    .collect(),
84                // Even if column number is empty we can generate single row.
85                &RecordBatchOptions::new().with_row_count(Some(1)),
86            )?]
87        })
88    }
89
90    fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
91        Partitioning::UnknownPartitioning(n_partitions)
92    }
93
94    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
95    fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
96        PlanProperties::new(
97            EquivalenceProperties::new(schema),
98            Self::output_partitioning_helper(n_partitions),
99            EmissionType::Incremental,
100            Boundedness::Bounded,
101        )
102    }
103}
104
105impl DisplayAs for PlaceholderRowExec {
106    fn fmt_as(
107        &self,
108        t: DisplayFormatType,
109        f: &mut std::fmt::Formatter,
110    ) -> std::fmt::Result {
111        match t {
112            DisplayFormatType::Default | DisplayFormatType::Verbose => {
113                write!(f, "PlaceholderRowExec")
114            }
115        }
116    }
117}
118
119impl ExecutionPlan for PlaceholderRowExec {
120    fn name(&self) -> &'static str {
121        "PlaceholderRowExec"
122    }
123
124    /// Return a reference to Any that can be used for downcasting
125    fn as_any(&self) -> &dyn Any {
126        self
127    }
128
129    fn properties(&self) -> &PlanProperties {
130        &self.cache
131    }
132
133    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
134        vec![]
135    }
136
137    fn with_new_children(
138        self: Arc<Self>,
139        _: Vec<Arc<dyn ExecutionPlan>>,
140    ) -> Result<Arc<dyn ExecutionPlan>> {
141        Ok(self)
142    }
143
144    fn execute(
145        &self,
146        partition: usize,
147        context: Arc<TaskContext>,
148    ) -> Result<SendableRecordBatchStream> {
149        trace!("Start PlaceholderRowExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
150
151        if partition >= self.partitions {
152            return internal_err!(
153                "PlaceholderRowExec invalid partition {} (expected less than {})",
154                partition,
155                self.partitions
156            );
157        }
158
159        Ok(Box::pin(MemoryStream::try_new(
160            self.data()?,
161            Arc::clone(&self.schema),
162            None,
163        )?))
164    }
165
166    fn statistics(&self) -> Result<Statistics> {
167        let batch = self
168            .data()
169            .expect("Create single row placeholder RecordBatch should not fail");
170        Ok(common::compute_record_batch_statistics(
171            &[batch],
172            &self.schema,
173            None,
174        ))
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use crate::test;
182    use crate::with_new_children_if_necessary;
183
184    #[test]
185    fn with_new_children() -> Result<()> {
186        let schema = test::aggr_test_schema();
187
188        let placeholder = Arc::new(PlaceholderRowExec::new(schema));
189
190        let placeholder_2 = with_new_children_if_necessary(
191            Arc::clone(&placeholder) as Arc<dyn ExecutionPlan>,
192            vec![],
193        )?;
194        assert_eq!(placeholder.schema(), placeholder_2.schema());
195
196        let too_many_kids = vec![placeholder_2];
197        assert!(
198            with_new_children_if_necessary(placeholder, too_many_kids).is_err(),
199            "expected error when providing list of kids"
200        );
201        Ok(())
202    }
203
204    #[tokio::test]
205    async fn invalid_execute() -> Result<()> {
206        let task_ctx = Arc::new(TaskContext::default());
207        let schema = test::aggr_test_schema();
208        let placeholder = PlaceholderRowExec::new(schema);
209
210        // Ask for the wrong partition
211        assert!(placeholder.execute(1, Arc::clone(&task_ctx)).is_err());
212        assert!(placeholder.execute(20, task_ctx).is_err());
213        Ok(())
214    }
215
216    #[tokio::test]
217    async fn produce_one_row() -> Result<()> {
218        let task_ctx = Arc::new(TaskContext::default());
219        let schema = test::aggr_test_schema();
220        let placeholder = PlaceholderRowExec::new(schema);
221
222        let iter = placeholder.execute(0, task_ctx)?;
223        let batches = common::collect(iter).await?;
224
225        // Should have one item
226        assert_eq!(batches.len(), 1);
227
228        Ok(())
229    }
230
231    #[tokio::test]
232    async fn produce_one_row_multiple_partition() -> Result<()> {
233        let task_ctx = Arc::new(TaskContext::default());
234        let schema = test::aggr_test_schema();
235        let partitions = 3;
236        let placeholder = PlaceholderRowExec::new(schema).with_partitions(partitions);
237
238        for n in 0..partitions {
239            let iter = placeholder.execute(n, Arc::clone(&task_ctx))?;
240            let batches = common::collect(iter).await?;
241
242            // Should have one item
243            assert_eq!(batches.len(), 1);
244        }
245
246        Ok(())
247    }
248}