datafusion_physical_plan/
values.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics};
26use crate::{
27 ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
28};
29use arrow::datatypes::{Schema, SchemaRef};
30use arrow::record_batch::{RecordBatch, RecordBatchOptions};
31use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
32use datafusion_execution::TaskContext;
33use datafusion_physical_expr::EquivalenceProperties;
34
35#[deprecated(
37 since = "45.0.0",
38 note = "Use `MemorySourceConfig::try_new_as_values` instead"
39)]
40#[derive(Debug, Clone)]
41pub struct ValuesExec {
42 schema: SchemaRef,
44 data: Vec<RecordBatch>,
46 cache: PlanProperties,
48}
49
50#[allow(deprecated)]
51impl ValuesExec {
52 #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")]
54 pub fn try_new(
55 schema: SchemaRef,
56 data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
57 ) -> Result<Self> {
58 if data.is_empty() {
59 return plan_err!("Values list cannot be empty");
60 }
61 let n_row = data.len();
62 let n_col = schema.fields().len();
63 let batch = RecordBatch::try_new_with_options(
66 Arc::new(Schema::empty()),
67 vec![],
68 &RecordBatchOptions::new().with_row_count(Some(1)),
69 )?;
70
71 let arr = (0..n_col)
72 .map(|j| {
73 (0..n_row)
74 .map(|i| {
75 let r = data[i][j].evaluate(&batch);
76
77 match r {
78 Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
79 Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
80 ScalarValue::try_from_array(&a, 0)
81 }
82 Ok(ColumnarValue::Array(a)) => {
83 plan_err!(
84 "Cannot have array values {a:?} in a values list"
85 )
86 }
87 Err(err) => Err(err),
88 }
89 })
90 .collect::<Result<Vec<_>>>()
91 .and_then(ScalarValue::iter_to_array)
92 })
93 .collect::<Result<Vec<_>>>()?;
94 let batch = RecordBatch::try_new_with_options(
95 Arc::clone(&schema),
96 arr,
97 &RecordBatchOptions::new().with_row_count(Some(n_row)),
98 )?;
99 let data: Vec<RecordBatch> = vec![batch];
100 Self::try_new_from_batches(schema, data)
101 }
102
103 #[deprecated(
108 since = "45.0.0",
109 note = "Use `MemoryExec::try_new_from_batches` instead"
110 )]
111 pub fn try_new_from_batches(
112 schema: SchemaRef,
113 batches: Vec<RecordBatch>,
114 ) -> Result<Self> {
115 if batches.is_empty() {
116 return plan_err!("Values list cannot be empty");
117 }
118
119 for batch in &batches {
120 let batch_schema = batch.schema();
121 if batch_schema != schema {
122 return plan_err!(
123 "Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
124 );
125 }
126 }
127
128 let cache = Self::compute_properties(Arc::clone(&schema));
129 #[allow(deprecated)]
130 Ok(ValuesExec {
131 schema,
132 data: batches,
133 cache,
134 })
135 }
136
137 pub fn data(&self) -> Vec<RecordBatch> {
139 #[allow(deprecated)]
140 self.data.clone()
141 }
142
143 fn compute_properties(schema: SchemaRef) -> PlanProperties {
145 PlanProperties::new(
146 EquivalenceProperties::new(schema),
147 Partitioning::UnknownPartitioning(1),
148 EmissionType::Incremental,
149 Boundedness::Bounded,
150 )
151 }
152}
153
154#[allow(deprecated)]
155impl DisplayAs for ValuesExec {
156 fn fmt_as(
157 &self,
158 t: DisplayFormatType,
159 f: &mut std::fmt::Formatter,
160 ) -> std::fmt::Result {
161 match t {
162 DisplayFormatType::Default | DisplayFormatType::Verbose => {
163 write!(f, "ValuesExec")
164 }
165 }
166 }
167}
168
169#[allow(deprecated)]
170impl ExecutionPlan for ValuesExec {
171 fn name(&self) -> &'static str {
172 "ValuesExec"
173 }
174
175 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn properties(&self) -> &PlanProperties {
181 #[allow(deprecated)]
182 &self.cache
183 }
184
185 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
186 vec![]
187 }
188
189 fn with_new_children(
190 self: Arc<Self>,
191 _: Vec<Arc<dyn ExecutionPlan>>,
192 ) -> Result<Arc<dyn ExecutionPlan>> {
193 #[allow(deprecated)]
194 ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
195 .map(|e| Arc::new(e) as _)
196 }
197
198 fn execute(
199 &self,
200 partition: usize,
201 _context: Arc<TaskContext>,
202 ) -> Result<SendableRecordBatchStream> {
203 if 0 != partition {
205 return internal_err!(
206 "ValuesExec invalid partition {partition} (expected 0)"
207 );
208 }
209
210 Ok(Box::pin(MemoryStream::try_new(
211 self.data(),
212 #[allow(deprecated)]
213 Arc::clone(&self.schema),
214 None,
215 )?))
216 }
217
218 fn statistics(&self) -> Result<Statistics> {
219 let batch = self.data();
220 Ok(common::compute_record_batch_statistics(
221 &[batch],
222 #[allow(deprecated)]
223 &self.schema,
224 None,
225 ))
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::expressions::lit;
233 use crate::test::{self, make_partition};
234
235 use arrow::datatypes::{DataType, Field};
236 use datafusion_common::stats::{ColumnStatistics, Precision};
237
238 #[tokio::test]
239 async fn values_empty_case() -> Result<()> {
240 let schema = test::aggr_test_schema();
241 #[allow(deprecated)]
242 let empty = ValuesExec::try_new(schema, vec![]);
243 assert!(empty.is_err());
244 Ok(())
245 }
246
247 #[test]
248 fn new_exec_with_batches() {
249 let batch = make_partition(7);
250 let schema = batch.schema();
251 let batches = vec![batch.clone(), batch];
252 #[allow(deprecated)]
253 let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
254 }
255
256 #[test]
257 fn new_exec_with_batches_empty() {
258 let batch = make_partition(7);
259 let schema = batch.schema();
260 #[allow(deprecated)]
261 let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
262 }
263
264 #[test]
265 fn new_exec_with_batches_invalid_schema() {
266 let batch = make_partition(7);
267 let batches = vec![batch.clone(), batch];
268
269 let invalid_schema = Arc::new(Schema::new(vec![
270 Field::new("col0", DataType::UInt32, false),
271 Field::new("col1", DataType::Utf8, false),
272 ]));
273 #[allow(deprecated)]
274 let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
275 }
276
277 #[test]
279 fn new_exec_with_non_nullable_schema() {
280 let schema = Arc::new(Schema::new(vec![Field::new(
281 "col0",
282 DataType::UInt32,
283 false,
284 )]));
285 #[allow(deprecated)]
286 let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
287 #[allow(deprecated)]
289 let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
290 .unwrap_err();
291 }
292
293 #[test]
294 fn values_stats_with_nulls_only() -> Result<()> {
295 let data = vec![
296 vec![lit(ScalarValue::Null)],
297 vec![lit(ScalarValue::Null)],
298 vec![lit(ScalarValue::Null)],
299 ];
300 let rows = data.len();
301 #[allow(deprecated)]
302 let values = ValuesExec::try_new(
303 Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
304 data,
305 )?;
306
307 assert_eq!(
308 values.statistics()?,
309 Statistics {
310 num_rows: Precision::Exact(rows),
311 total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
313 null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
315 max_value: Precision::Absent,
316 min_value: Precision::Absent,
317 sum_value: Precision::Absent,
318 },],
319 }
320 );
321
322 Ok(())
323 }
324}