deltalake_core/delta_datafusion/
physical.rsuse std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::error::Result as DataFusionResult;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::{
DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use futures::{Stream, StreamExt};
use crate::DeltaTableError;
pub(crate) type MetricObserverFunction = fn(&RecordBatch, &ExecutionPlanMetricsSet) -> ();
pub(crate) struct MetricObserverExec {
parent: Arc<dyn ExecutionPlan>,
id: String,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}
impl MetricObserverExec {
pub fn new(id: String, parent: Arc<dyn ExecutionPlan>, f: MetricObserverFunction) -> Self {
MetricObserverExec {
parent,
id,
metrics: ExecutionPlanMetricsSet::new(),
update: f,
}
}
pub fn try_new(
id: String,
inputs: &[Arc<dyn ExecutionPlan>],
f: MetricObserverFunction,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
match inputs {
[input] => Ok(Arc::new(MetricObserverExec::new(id, input.clone(), f))),
_ => Err(datafusion_common::DataFusionError::External(Box::new(
DeltaTableError::Generic("MetricObserverExec expects only one child".into()),
))),
}
}
pub fn id(&self) -> &str {
&self.id
}
}
impl std::fmt::Debug for MetricObserverExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricObserverExec")
.field("id", &self.id)
.field("metrics", &self.metrics)
.finish()
}
}
impl DisplayAs for MetricObserverExec {
fn fmt_as(
&self,
_: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "MetricObserverExec id={}", self.id)
}
}
impl ExecutionPlan for MetricObserverExec {
fn name(&self) -> &str {
Self::static_name()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow_schema::SchemaRef {
self.parent.schema()
}
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.parent.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.parent]
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let res = self.parent.execute(partition, context)?;
Ok(Box::pin(MetricObserverStream {
schema: self.schema(),
input: res,
metrics: self.metrics.clone(),
update: self.update,
}))
}
fn statistics(&self) -> DataFusionResult<datafusion_common::Statistics> {
self.parent.statistics()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
MetricObserverExec::try_new(self.id.clone(), &children, self.update)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
struct MetricObserverStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}
impl Stream for MetricObserverStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => {
(self.update)(&batch, &self.metrics);
Some(Ok(batch))
}
other => other,
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
impl RecordBatchStream for MetricObserverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
pub(crate) fn find_metric_node(
id: &str,
parent: &Arc<dyn ExecutionPlan>,
) -> Option<Arc<dyn ExecutionPlan>> {
if let Some(metric) = parent.as_any().downcast_ref::<MetricObserverExec>() {
if metric.id().eq(id) {
return Some(parent.to_owned());
}
}
for child in &parent.children() {
let res = find_metric_node(id, child);
if res.is_some() {
return res;
}
}
None
}
pub(crate) fn get_metric(metrics: &MetricsSet, name: &str) -> usize {
metrics.sum_by_name(name).map(|m| m.as_usize()).unwrap_or(0)
}