datafusion_physical_plan/
values.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Values execution plan
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{
27    ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
28};
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::record_batch::{RecordBatch, RecordBatchOptions};
31use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::EquivalenceProperties;
34
35/// Execution plan for values list based relation (produces constant rows)
36#[deprecated(
37    since = "45.0.0",
38    note = "Use `MemorySourceConfig::try_new_as_values` instead"
39)]
40#[derive(Debug, Clone)]
41pub struct ValuesExec {
42    /// The schema
43    schema: SchemaRef,
44    /// The data
45    data: Vec<RecordBatch>,
46    /// Cache holding plan properties like equivalences, output partitioning etc.
47    cache: PlanProperties,
48}
49
50#[allow(deprecated)]
51impl ValuesExec {
52    /// Create a new values exec from data as expr
53    #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")]
54    pub fn try_new(
55        schema: SchemaRef,
56        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
57    ) -> Result<Self> {
58        if data.is_empty() {
59            return plan_err!("Values list cannot be empty");
60        }
61        let n_row = data.len();
62        let n_col = schema.fields().len();
63        // We have this single row batch as a placeholder to satisfy evaluation argument
64        // and generate a single output row
65        let batch = RecordBatch::try_new_with_options(
66            Arc::new(Schema::empty()),
67            vec![],
68            &RecordBatchOptions::new().with_row_count(Some(1)),
69        )?;
70
71        let arr = (0..n_col)
72            .map(|j| {
73                (0..n_row)
74                    .map(|i| {
75                        let r = data[i][j].evaluate(&batch);
76
77                        match r {
78                            Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
79                            Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
80                                ScalarValue::try_from_array(&a, 0)
81                            }
82                            Ok(ColumnarValue::Array(a)) => {
83                                plan_err!(
84                                    "Cannot have array values {a:?} in a values list"
85                                )
86                            }
87                            Err(err) => Err(err),
88                        }
89                    })
90                    .collect::<Result<Vec<_>>>()
91                    .and_then(ScalarValue::iter_to_array)
92            })
93            .collect::<Result<Vec<_>>>()?;
94        let batch = RecordBatch::try_new_with_options(
95            Arc::clone(&schema),
96            arr,
97            &RecordBatchOptions::new().with_row_count(Some(n_row)),
98        )?;
99        let data: Vec<RecordBatch> = vec![batch];
100        Self::try_new_from_batches(schema, data)
101    }
102
103    /// Create a new plan using the provided schema and batches.
104    ///
105    /// Errors if any of the batches don't match the provided schema, or if no
106    /// batches are provided.
107    #[deprecated(
108        since = "45.0.0",
109        note = "Use `MemoryExec::try_new_from_batches` instead"
110    )]
111    pub fn try_new_from_batches(
112        schema: SchemaRef,
113        batches: Vec<RecordBatch>,
114    ) -> Result<Self> {
115        if batches.is_empty() {
116            return plan_err!("Values list cannot be empty");
117        }
118
119        for batch in &batches {
120            let batch_schema = batch.schema();
121            if batch_schema != schema {
122                return plan_err!(
123                    "Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
124                );
125            }
126        }
127
128        let cache = Self::compute_properties(Arc::clone(&schema));
129        #[allow(deprecated)]
130        Ok(ValuesExec {
131            schema,
132            data: batches,
133            cache,
134        })
135    }
136
137    /// Provides the data
138    pub fn data(&self) -> Vec<RecordBatch> {
139        #[allow(deprecated)]
140        self.data.clone()
141    }
142
143    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
144    fn compute_properties(schema: SchemaRef) -> PlanProperties {
145        PlanProperties::new(
146            EquivalenceProperties::new(schema),
147            Partitioning::UnknownPartitioning(1),
148            EmissionType::Incremental,
149            Boundedness::Bounded,
150        )
151    }
152}
153
154#[allow(deprecated)]
155impl DisplayAs for ValuesExec {
156    fn fmt_as(
157        &self,
158        t: DisplayFormatType,
159        f: &mut std::fmt::Formatter,
160    ) -> std::fmt::Result {
161        match t {
162            DisplayFormatType::Default | DisplayFormatType::Verbose => {
163                write!(f, "ValuesExec")
164            }
165        }
166    }
167}
168
169#[allow(deprecated)]
170impl ExecutionPlan for ValuesExec {
171    fn name(&self) -> &'static str {
172        "ValuesExec"
173    }
174
175    /// Return a reference to Any that can be used for downcasting
176    fn as_any(&self) -> &dyn Any {
177        self
178    }
179
180    fn properties(&self) -> &PlanProperties {
181        #[allow(deprecated)]
182        &self.cache
183    }
184
185    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
186        vec![]
187    }
188
189    fn with_new_children(
190        self: Arc<Self>,
191        _: Vec<Arc<dyn ExecutionPlan>>,
192    ) -> Result<Arc<dyn ExecutionPlan>> {
193        #[allow(deprecated)]
194        ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
195            .map(|e| Arc::new(e) as _)
196    }
197
198    fn execute(
199        &self,
200        partition: usize,
201        _context: Arc<TaskContext>,
202    ) -> Result<SendableRecordBatchStream> {
203        // ValuesExec has a single output partition
204        if 0 != partition {
205            return internal_err!(
206                "ValuesExec invalid partition {partition} (expected 0)"
207            );
208        }
209
210        Ok(Box::pin(MemoryStream::try_new(
211            self.data(),
212            #[allow(deprecated)]
213            Arc::clone(&self.schema),
214            None,
215        )?))
216    }
217
218    fn statistics(&self) -> Result<Statistics> {
219        let batch = self.data();
220        Ok(common::compute_record_batch_statistics(
221            &[batch],
222            #[allow(deprecated)]
223            &self.schema,
224            None,
225        ))
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::expressions::lit;
233    use crate::test::{self, make_partition};
234
235    use arrow::datatypes::{DataType, Field};
236    use datafusion_common::stats::{ColumnStatistics, Precision};
237
238    #[tokio::test]
239    async fn values_empty_case() -> Result<()> {
240        let schema = test::aggr_test_schema();
241        #[allow(deprecated)]
242        let empty = ValuesExec::try_new(schema, vec![]);
243        assert!(empty.is_err());
244        Ok(())
245    }
246
247    #[test]
248    fn new_exec_with_batches() {
249        let batch = make_partition(7);
250        let schema = batch.schema();
251        let batches = vec![batch.clone(), batch];
252        #[allow(deprecated)]
253        let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
254    }
255
256    #[test]
257    fn new_exec_with_batches_empty() {
258        let batch = make_partition(7);
259        let schema = batch.schema();
260        #[allow(deprecated)]
261        let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
262    }
263
264    #[test]
265    fn new_exec_with_batches_invalid_schema() {
266        let batch = make_partition(7);
267        let batches = vec![batch.clone(), batch];
268
269        let invalid_schema = Arc::new(Schema::new(vec![
270            Field::new("col0", DataType::UInt32, false),
271            Field::new("col1", DataType::Utf8, false),
272        ]));
273        #[allow(deprecated)]
274        let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
275    }
276
277    // Test issue: https://github.com/apache/datafusion/issues/8763
278    #[test]
279    fn new_exec_with_non_nullable_schema() {
280        let schema = Arc::new(Schema::new(vec![Field::new(
281            "col0",
282            DataType::UInt32,
283            false,
284        )]));
285        #[allow(deprecated)]
286        let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
287        // Test that a null value is rejected
288        #[allow(deprecated)]
289        let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
290            .unwrap_err();
291    }
292
293    #[test]
294    fn values_stats_with_nulls_only() -> Result<()> {
295        let data = vec![
296            vec![lit(ScalarValue::Null)],
297            vec![lit(ScalarValue::Null)],
298            vec![lit(ScalarValue::Null)],
299        ];
300        let rows = data.len();
301        #[allow(deprecated)]
302        let values = ValuesExec::try_new(
303            Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
304            data,
305        )?;
306
307        assert_eq!(
308            values.statistics()?,
309            Statistics {
310                num_rows: Precision::Exact(rows),
311                total_byte_size: Precision::Exact(8), // not important
312                column_statistics: vec![ColumnStatistics {
313                    null_count: Precision::Exact(rows), // there are only nulls
314                    distinct_count: Precision::Absent,
315                    max_value: Precision::Absent,
316                    min_value: Precision::Absent,
317                    sum_value: Precision::Absent,
318                },],
319            }
320        );
321
322        Ok(())
323    }
324}