datafusion_physical_plan/
values.rsuse std::any::Any;
use std::sync::Arc;
use super::{
common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use crate::{
memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning,
PhysicalExpr,
};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use datafusion_common::{internal_err, plan_err, Result, ScalarValue};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
#[derive(Debug, Clone)]
pub struct ValuesExec {
schema: SchemaRef,
data: Vec<RecordBatch>,
cache: PlanProperties,
}
impl ValuesExec {
pub fn try_new(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Self> {
if data.is_empty() {
return plan_err!("Values list cannot be empty");
}
let n_row = data.len();
let n_col = schema.fields().len();
let batch = RecordBatch::try_new_with_options(
Arc::new(Schema::empty()),
vec![],
&RecordBatchOptions::new().with_row_count(Some(1)),
)?;
let arr = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let r = data[i][j].evaluate(&batch);
match r {
Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
ScalarValue::try_from_array(&a, 0)
}
Ok(ColumnarValue::Array(a)) => {
plan_err!(
"Cannot have array values {a:?} in a values list"
)
}
Err(err) => Err(err),
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new_with_options(
Arc::clone(&schema),
arr,
&RecordBatchOptions::new().with_row_count(Some(n_row)),
)?;
let data: Vec<RecordBatch> = vec![batch];
Self::try_new_from_batches(schema, data)
}
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}
for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
);
}
}
let cache = Self::compute_properties(Arc::clone(&schema));
Ok(ValuesExec {
schema,
data: batches,
cache,
})
}
pub fn data(&self) -> Vec<RecordBatch> {
self.data.clone()
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}
impl DisplayAs for ValuesExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ValuesExec")
}
}
}
}
impl ExecutionPlan for ValuesExec {
fn name(&self) -> &'static str {
"ValuesExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone())
.map(|e| Arc::new(e) as _)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return internal_err!(
"ValuesExec invalid partition {partition} (expected 0)"
);
}
Ok(Box::pin(MemoryStream::try_new(
self.data(),
Arc::clone(&self.schema),
None,
)?))
}
fn statistics(&self) -> Result<Statistics> {
let batch = self.data();
Ok(common::compute_record_batch_statistics(
&[batch],
&self.schema,
None,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::lit;
use crate::test::{self, make_partition};
use arrow_schema::{DataType, Field};
use datafusion_common::stats::{ColumnStatistics, Precision};
#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = ValuesExec::try_new(schema, vec![]);
assert!(empty.is_err());
Ok(())
}
#[test]
fn new_exec_with_batches() {
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
}
#[test]
fn new_exec_with_batches_empty() {
let batch = make_partition(7);
let schema = batch.schema();
let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
}
#[test]
fn new_exec_with_batches_invalid_schema() {
let batch = make_partition(7);
let batches = vec![batch.clone(), batch];
let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
}
#[test]
fn new_exec_with_non_nullable_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap();
let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
.unwrap_err();
}
#[test]
fn values_stats_with_nulls_only() -> Result<()> {
let data = vec![
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = ValuesExec::try_new(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;
assert_eq!(
values.statistics()?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
},],
}
);
Ok(())
}
}