datafusion_physical_plan/
memory.rsuse std::any::Any;
use std::fmt;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::{
common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, project_schema, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use futures::Stream;
#[derive(Clone)]
pub struct MemoryExec {
partitions: Vec<Vec<RecordBatch>>,
schema: SchemaRef,
projected_schema: SchemaRef,
projection: Option<Vec<usize>>,
sort_information: Vec<LexOrdering>,
cache: PlanProperties,
show_sizes: bool,
}
impl fmt::Debug for MemoryExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MemoryExec")
.field("partitions", &"[...]")
.field("schema", &self.schema)
.field("projection", &self.projection)
.field("sort_information", &self.sort_information)
.finish()
}
}
impl DisplayAs for MemoryExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let partition_sizes: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
let output_ordering = self
.sort_information
.first()
.map(|output_ordering| {
format!(", output_ordering={}", output_ordering)
})
.unwrap_or_default();
if self.show_sizes {
write!(
f,
"MemoryExec: partitions={}, partition_sizes={partition_sizes:?}{output_ordering}",
partition_sizes.len(),
)
} else {
write!(f, "MemoryExec: partitions={}", partition_sizes.len(),)
}
}
}
}
}
impl ExecutionPlan for MemoryExec {
fn name(&self) -> &'static str {
"MemoryExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
internal_err!("Children cannot be replaced in {self:?}")
}
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(MemoryStream::try_new(
self.partitions[partition].clone(),
Arc::clone(&self.projected_schema),
self.projection.clone(),
)?))
}
fn statistics(&self) -> Result<Statistics> {
Ok(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
}
}
impl MemoryExec {
pub fn try_new(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_ref())?;
let cache =
Self::compute_properties(Arc::clone(&projected_schema), &[], partitions);
Ok(Self {
partitions: partitions.to_vec(),
schema,
projected_schema,
projection,
sort_information: vec![],
cache,
show_sizes: true,
})
}
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
}
pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
pub fn projection(&self) -> &Option<Vec<usize>> {
&self.projection
}
pub fn try_with_sort_information(
mut self,
mut sort_information: Vec<LexOrdering>,
) -> Result<Self> {
let fields = self.schema.fields();
let ambiguous_column = sort_information
.iter()
.flat_map(|ordering| ordering.inner.clone())
.flat_map(|expr| collect_columns(&expr.expr))
.find(|col| {
fields
.get(col.index())
.map(|field| field.name() != col.name())
.unwrap_or(true)
});
if let Some(col) = ambiguous_column {
return internal_err!(
"Column {:?} is not found in the original schema of the MemoryExec",
col
);
}
if let Some(projection) = &self.projection {
let base_eqp = EquivalenceProperties::new_with_orderings(
self.original_schema(),
&sort_information,
);
let proj_exprs = projection
.iter()
.map(|idx| {
let base_schema = self.original_schema();
let name = base_schema.field(*idx).name();
(Arc::new(Column::new(name, *idx)) as _, name.to_string())
})
.collect::<Vec<_>>();
let projection_mapping =
ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
sort_information = base_eqp
.project(&projection_mapping, self.schema())
.oeq_class
.orderings;
}
self.sort_information = sort_information;
let eq_properties = EquivalenceProperties::new_with_orderings(
self.schema(),
&self.sort_information,
);
self.cache = self.cache.with_eq_properties(eq_properties);
Ok(self)
}
pub fn original_schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn compute_properties(
schema: SchemaRef,
orderings: &[LexOrdering],
partitions: &[Vec<RecordBatch>],
) -> PlanProperties {
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
PlanProperties::new(
eq_properties, Partitioning::UnknownPartitioning(partitions.len()), ExecutionMode::Bounded, )
}
}
pub struct MemoryStream {
data: Vec<RecordBatch>,
reservation: Option<MemoryReservation>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
index: usize,
}
impl MemoryStream {
pub fn try_new(
data: Vec<RecordBatch>,
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Ok(Self {
data,
reservation: None,
schema,
projection,
index: 0,
})
}
pub(super) fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
self.reservation = Some(reservation);
self
}
}
impl Stream for MemoryStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.data.len() {
self.index += 1;
let batch = &self.data[self.index - 1];
let batch = match self.projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch.clone(),
};
Some(Ok(batch))
} else {
None
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.data.len(), Some(self.data.len()))
}
}
impl RecordBatchStream for MemoryStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::memory::MemoryExec;
use crate::ExecutionPlan;
use arrow_schema::{DataType, Field, Schema, SortOptions};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
#[test]
fn test_memory_order_eq() -> datafusion_common::Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let sort1 = LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions::default(),
},
]);
let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]);
let mut expected_output_order = LexOrdering::default();
expected_output_order.extend(sort1.clone());
expected_output_order.extend(sort2.clone());
let sort_information = vec![sort1.clone(), sort2.clone()];
let mem_exec = MemoryExec::try_new(&[vec![]], schema, None)?
.try_with_sort_information(sort_information)?;
assert_eq!(
mem_exec.properties().output_ordering().unwrap().to_vec(),
expected_output_order.inner
);
let eq_properties = mem_exec.properties().equivalence_properties();
assert!(eq_properties.oeq_class().contains(&sort1));
assert!(eq_properties.oeq_class().contains(&sort2));
Ok(())
}
}