datafusion_physical_plan/
empty.rsuse std::any::Any;
use std::sync::Arc;
use super::{
common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use log::trace;
#[derive(Debug, Clone)]
pub struct EmptyExec {
schema: SchemaRef,
partitions: usize,
cache: PlanProperties,
}
impl EmptyExec {
pub fn new(schema: SchemaRef) -> Self {
let cache = Self::compute_properties(Arc::clone(&schema), 1);
EmptyExec {
schema,
partitions: 1,
cache,
}
}
pub fn with_partitions(mut self, partitions: usize) -> Self {
self.partitions = partitions;
let output_partitioning = Self::output_partitioning_helper(self.partitions);
self.cache = self.cache.with_partitioning(output_partitioning);
self
}
fn data(&self) -> Result<Vec<RecordBatch>> {
Ok(vec![])
}
fn output_partitioning_helper(n_partitions: usize) -> Partitioning {
Partitioning::UnknownPartitioning(n_partitions)
}
fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Self::output_partitioning_helper(n_partitions);
PlanProperties::new(
eq_properties,
output_partitioning,
ExecutionMode::Bounded,
)
}
}
impl DisplayAs for EmptyExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "EmptyExec")
}
}
}
}
impl ExecutionPlan for EmptyExec {
fn name(&self) -> &'static str {
"EmptyExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
if partition >= self.partitions {
return internal_err!(
"EmptyExec invalid partition {} (expected less than {})",
partition,
self.partitions
);
}
Ok(Box::pin(MemoryStream::try_new(
self.data()?,
Arc::clone(&self.schema),
None,
)?))
}
fn statistics(&self) -> Result<Statistics> {
let batch = self
.data()
.expect("Create empty RecordBatch should not fail");
Ok(common::compute_record_batch_statistics(
&[batch],
&self.schema,
None,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test;
use crate::with_new_children_if_necessary;
#[tokio::test]
async fn empty() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(Arc::clone(&schema));
assert_eq!(empty.schema(), schema);
let iter = empty.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert!(batches.is_empty());
Ok(())
}
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let empty2 = with_new_children_if_necessary(
Arc::clone(&empty) as Arc<dyn ExecutionPlan>,
vec![],
)?;
assert_eq!(empty.schema(), empty2.schema());
let too_many_kids = vec![empty2];
assert!(
with_new_children_if_necessary(empty, too_many_kids).is_err(),
"expected error when providing list of kids"
);
Ok(())
}
#[tokio::test]
async fn invalid_execute() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(schema);
assert!(empty.execute(1, Arc::clone(&task_ctx)).is_err());
assert!(empty.execute(20, task_ctx).is_err());
Ok(())
}
}