pub trait ExecutionPlan:
Debug
+ DisplayAs
+ Send
+ Sync {
Show 17 methods
// Required methods
fn name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
fn properties(&self) -> &PlanProperties;
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
// Provided methods
fn static_name() -> &'static str
where Self: Sized { ... }
fn schema(&self) -> SchemaRef { ... }
fn required_input_distribution(&self) -> Vec<Distribution> { ... }
fn required_input_ordering(
&self,
) -> Vec<Option<Vec<PhysicalSortRequirement>>> { ... }
fn maintains_input_order(&self) -> Vec<bool> { ... }
fn benefits_from_input_partitioning(&self) -> Vec<bool> { ... }
fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> { ... }
fn metrics(&self) -> Option<MetricsSet> { ... }
fn statistics(&self) -> Result<Statistics> { ... }
fn supports_limit_pushdown(&self) -> bool { ... }
fn with_fetch(
&self,
_limit: Option<usize>,
) -> Option<Arc<dyn ExecutionPlan>> { ... }
}
Expand description
Represent nodes in the DataFusion Physical Plan.
Calling execute
produces an async
SendableRecordBatchStream
of
RecordBatch
that incrementally computes a partition of the
ExecutionPlan
’s output from its input. See Partitioning
for more
details on partitioning.
Methods such as Self::schema
and Self::properties
communicate
properties of the output to the DataFusion optimizer, and methods such as
required_input_distribution
and required_input_ordering
express
requirements of the ExecutionPlan
from its input.
ExecutionPlan
can be displayed in a simplified form using the
return value from displayable
in addition to the (normally
quite verbose) Debug
output.
Required Methods§
sourcefn name(&self) -> &str
fn name(&self) -> &str
Short name for the ExecutionPlan, such as ‘ParquetExec’.
Implementation note: this method can just proxy to
static_name
if no special action is
needed. It doesn’t provide a default implementation like that because
this method doesn’t require the Sized
constrain to allow a wilder
range of use cases.
sourcefn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Returns the execution plan as Any
so that it can be
downcast to a specific implementation.
sourcefn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
Return properties of the output of the ExecutionPlan
, such as output
ordering(s), partitioning information etc.
This information is available via methods on ExecutionPlanProperties
trait, which is implemented for all ExecutionPlan
s.
sourcefn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
Get a list of children ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).
sourcefn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
Returns a new ExecutionPlan
where all existing children were replaced
by the children
, in order
sourcefn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
Begin execution of partition
, returning a Stream
of
RecordBatch
es.
§Notes
The execute
method itself is not async
but it returns an async
futures::stream::Stream
. This Stream
should incrementally compute
the output, RecordBatch
by RecordBatch
(in a streaming fashion).
Most ExecutionPlan
s should not do any work before the first
RecordBatch
is requested from the stream.
RecordBatchStreamAdapter
can be used to convert an async
Stream
into a SendableRecordBatchStream
.
Using async
Streams
allows for network I/O during execution and
takes advantage of Rust’s built in support for async
continuations and
crate ecosystem.
§Cancellation / Aborting Execution
The Stream
that is returned must ensure that any allocated resources
are freed when the stream itself is dropped. This is particularly
important for spawn
ed tasks or threads. Unless care is taken to
“abort” such tasks, they may continue to consume resources even after
the plan is dropped, generating intermediate results that are never
used.
Thus, spawn
is disallowed, and instead use SpawnedTask
.
For more details see SpawnedTask
, JoinSet
and RecordBatchReceiverStreamBuilder
for structures to help ensure all background tasks are cancelled.
§Implementation Examples
While async
Stream
s have a non trivial learning curve, the
futures
crate provides StreamExt
and TryStreamExt
which help simplify many common operations.
Here are some common patterns:
§Return Precomputed RecordBatch
We can return a precomputed RecordBatch
as a Stream
:
struct MyPlan {
batch: RecordBatch,
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// use functions from futures crate convert the batch into a stream
let fut = futures::future::ready(Ok(self.batch.clone()));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
}
}
§Lazily (async) Compute RecordBatch
We can also lazily compute a RecordBatch
when the returned Stream
is polled
struct MyPlan {
schema: SchemaRef,
}
/// Returns a single batch when the returned stream is polled
async fn get_batch() -> Result<RecordBatch> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
let fut = get_batch();
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
§Lazily (async) create a Stream
If you need to create the return Stream
using an async
function,
you can do so by flattening the result:
struct MyPlan {
schema: SchemaRef,
}
/// async function that returns a stream
async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
todo!()
}
impl MyPlan {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>
) -> Result<SendableRecordBatchStream> {
// A future that yields a stream
let fut = get_batch_stream();
// Use TryStreamExt::try_flatten to flatten the stream of streams
let stream = futures::stream::once(fut).try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
}
}
Provided Methods§
sourcefn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
Short name for the ExecutionPlan, such as ‘ParquetExec’.
Like name
but can be called without an instance.
sourcefn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
Specifies the data distribution requirements for all the
children for this ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,
sourcefn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
Specifies the ordering required for all of the children of this
ExecutionPlan
.
For each child, it’s the local ordering requirement within each partition rather than the global ordering
NOTE that checking !is_empty()
does not check for a
required input ordering. Instead, the correct check is that at
least one entry must be Some
sourcefn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
Returns false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions.
For example, Projection, Filter, and Limit maintain the order of inputs – they may transform values (Projection) or not produce the same number of rows that went in (Filter and Limit), but the rows that are produced go in the same way.
DataFusion uses this metadata to apply certain optimizations such as automatically repartitioning correctly.
The default implementation returns false
WARNING: if you override this default, you MUST ensure that
the ExecutionPlan
’s maintains the ordering invariant or else
DataFusion may produce incorrect results.
sourcefn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
Specifies whether the ExecutionPlan
benefits from increased
parallelization at its input for each child.
If returns true
, the ExecutionPlan
would benefit from partitioning
its corresponding child (and thus from more parallelism). For
ExecutionPlan
that do very little work the overhead of extra
parallelism may outweigh any benefits
The default implementation returns true
unless this ExecutionPlan
has signalled it requires a single child input partition.
sourcefn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
If supported, attempt to increase the partitioning of this ExecutionPlan
to
produce target_partitions
partitions.
If the ExecutionPlan
does not support changing its partitioning,
returns Ok(None)
(the default).
It is the ExecutionPlan
can increase its partitioning, but not to the
target_partitions
, it may return an ExecutionPlan with fewer
partitions. This might happen, for example, if each new partition would
be too small to be efficiently processed individually.
The DataFusion optimizer attempts to use as many threads as possible by
repartitioning its inputs to match the target number of threads
available (target_partitions
). Some data sources, such as the built in
CSV and Parquet readers, implement this method as they are able to read
from their input files in parallel, regardless of how the source data is
split amongst files.
sourcefn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Return a snapshot of the set of Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None.
While the values of the metrics in the returned
MetricsSet
s may change as execution progresses, the
specific metrics will not.
Once self.execute()
has returned (technically the future is
resolved) for all available partitions, the set of metrics
should be complete. If this function is called prior to
execute()
new metrics may appear in subsequent calls.
sourcefn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
Returns statistics for this ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.
sourcefn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Returns true
if a limit can be safely pushed down through this
ExecutionPlan
node.
If this method returns true
, and the query plan contains a limit at
the output of this node, DataFusion will push the limit to the input
of this node.
sourcefn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
Returns a fetching variant of this ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.
Trait Implementations§
source§impl DynTreeNode for dyn ExecutionPlan
impl DynTreeNode for dyn ExecutionPlan
source§impl ExecutionPlanProperties for &dyn ExecutionPlan
impl ExecutionPlanProperties for &dyn ExecutionPlan
source§fn output_partitioning(&self) -> &Partitioning
fn output_partitioning(&self) -> &Partitioning
ExecutionPlan
is split into
partitions.source§fn execution_mode(&self) -> ExecutionMode
fn execution_mode(&self) -> ExecutionMode
ExecutionMode::PipelineBreaking
to indicate this.source§fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>
ExecutionPlan
within each partition is sorted,
returns Some(keys)
describing the ordering. A None
return value
indicates no assumptions should be made on the output ordering. Read moresource§fn equivalence_properties(&self) -> &EquivalenceProperties
fn equivalence_properties(&self) -> &EquivalenceProperties
EquivalenceProperties
within the plan. Read more