datafusion_physical_optimizer/enforce_sorting/
mod.rspub mod replace_with_order_preserving_variants;
pub mod sort_pushdown;
use std::sync::Arc;
use crate::enforce_sorting::replace_with_order_preserving_variants::{
replace_with_order_preserving_variants, OrderPreservationContext,
};
use crate::enforce_sorting::sort_pushdown::{
assign_initial_requirements, pushdown_sorts, SortPushDown,
};
use crate::utils::{
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
};
use crate::PhysicalOptimizerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::windows::{
get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrderMode};
use itertools::izip;
#[derive(Default, Debug)]
pub struct EnforceSorting {}
impl EnforceSorting {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
pub type PlanWithCorrespondingSort = PlanContext<bool>;
fn update_sort_ctx_children(
mut node: PlanWithCorrespondingSort,
data: bool,
) -> Result<PlanWithCorrespondingSort> {
for child_node in node.children.iter_mut() {
let plan = &child_node.plan;
child_node.data = if is_sort(plan) {
true
} else if is_limit(plan) {
false
} else {
let is_spm = is_sort_preserving_merge(plan);
let required_orderings = plan.required_input_ordering();
let flags = plan.maintains_input_order();
izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
let propagates_ordering =
(maintains && required_ordering.is_none()) || is_spm;
let connected_to_sort =
child_node.children.iter().any(|child| child.data);
propagates_ordering && connected_to_sort
})
}
}
node.data = data;
node.update_plan_from_children()
}
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
fn update_coalesce_ctx_children(
coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
) {
let children = &coalesce_context.children;
coalesce_context.data = if children.is_empty() {
false
} else if is_coalesce_partitions(&coalesce_context.plan) {
true
} else {
children.iter().enumerate().any(|(idx, node)| {
node.data
&& !matches!(
coalesce_context.plan.required_input_distribution()[idx],
Distribution::SinglePartition
)
})
};
}
impl PhysicalOptimizerRule for EnforceSorting {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_requirements = PlanWithCorrespondingSort::new_default(plan);
let adjusted = plan_requirements.transform_up(ensure_sorting)?.data;
let new_plan = if config.optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(parallelize_sorts)
.data()?;
parallel.plan
} else {
adjusted.plan
};
let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
.transform_up(|plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
false,
true,
config,
)
})
.data()?;
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = pushdown_sorts(sort_pushdown)?;
adjusted
.plan
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
.data()
}
fn name(&self) -> &str {
"EnforceSorting"
}
fn schema_check(&self) -> bool {
true
}
}
fn replace_with_partial_sort(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = Arc::clone(sort_plan.children()[0]);
if !child.boundedness().is_unbounded() {
return Ok(plan);
}
let child_eq_properties = child.equivalence_properties();
let sort_req = LexRequirement::from(sort_plan.expr().clone());
let mut common_prefix_length = 0;
while child_eq_properties.ordering_satisfy_requirement(&LexRequirement {
inner: sort_req[0..common_prefix_length + 1].to_vec(),
}) {
common_prefix_length += 1;
}
if common_prefix_length > 0 {
return Ok(Arc::new(
PartialSortExec::new(
LexOrdering::new(sort_plan.expr().to_vec()),
Arc::clone(sort_plan.input()),
common_prefix_length,
)
.with_preserve_partitioning(sort_plan.preserve_partitioning())
.with_fetch(sort_plan.fetch()),
));
}
}
Ok(plan)
}
pub fn parallelize_sorts(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
update_coalesce_ctx_children(&mut requirements);
if requirements.children.is_empty() || !requirements.children[0].data {
Ok(Transformed::no(requirements))
} else if (is_sort(&requirements.plan)
|| is_sort_preserving_merge(&requirements.plan))
&& requirements.plan.output_partitioning().partition_count() <= 1
{
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = LexRequirement::from(sort_exprs.clone());
let sort_exprs = sort_exprs.clone();
requirements = remove_bottleneck_in_subplan(requirements)?;
requirements = requirements.children.swap_remove(0);
requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);
let spm =
SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(spm.with_fetch(fetch)),
false,
vec![requirements],
),
))
} else if is_coalesce_partitions(&requirements.plan) {
requirements = remove_bottleneck_in_subplan(requirements)?;
requirements = requirements.children.swap_remove(0);
Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
false,
vec![requirements],
),
))
} else {
Ok(Transformed::yes(requirements))
}
}
pub fn ensure_sorting(
mut requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
requirements = update_sort_ctx_children(requirements, false)?;
if requirements.children.is_empty() {
return Ok(Transformed::no(requirements));
}
let maybe_requirements = analyze_immediate_sort_removal(requirements);
requirements = if !maybe_requirements.transformed {
maybe_requirements.data
} else {
return Ok(maybe_requirements);
};
let plan = &requirements.plan;
let mut updated_children = vec![];
for (idx, (required_ordering, mut child)) in plan
.required_input_ordering()
.into_iter()
.zip(requirements.children.into_iter())
.enumerate()
{
let physical_ordering = child.plan.output_ordering();
if let Some(required) = required_ordering {
let eq_properties = child.plan.equivalence_properties();
if !eq_properties.ordering_satisfy_requirement(&required) {
if physical_ordering.is_some() {
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
}
child = add_sort_above(child, required, None);
child = update_sort_ctx_children(child, true)?;
}
} else if physical_ordering.is_none()
|| !plan.maintains_input_order()[idx]
|| is_union(plan)
{
child = update_child_to_remove_unnecessary_sort(idx, child, plan)?;
}
updated_children.push(child);
}
requirements.children = updated_children;
let child_node = &requirements.children[0];
if is_window(plan) && child_node.data {
return adjust_window_sort_removal(requirements).map(Transformed::yes);
} else if is_sort_preserving_merge(plan)
&& child_node.plan.output_partitioning().partition_count() <= 1
{
let child_node = requirements.children.swap_remove(0);
return Ok(Transformed::yes(child_node));
}
update_sort_ctx_children(requirements, false).map(Transformed::yes)
}
fn analyze_immediate_sort_removal(
mut node: PlanWithCorrespondingSort,
) -> Transformed<PlanWithCorrespondingSort> {
if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
let sort_input = sort_exec.input();
if sort_input.equivalence_properties().ordering_satisfy(
sort_exec
.properties()
.output_ordering()
.unwrap_or(LexOrdering::empty()),
) {
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
{
let expr = LexOrdering::new(sort_exec.expr().to_vec());
Arc::new(
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
.with_fetch(sort_exec.fetch()),
) as _
} else {
node.children = node.children.swap_remove(0).children;
if let Some(fetch) = sort_exec.fetch() {
if sort_exec
.properties()
.output_partitioning()
.partition_count()
== 1
{
Arc::new(GlobalLimitExec::new(
Arc::clone(sort_input),
0,
Some(fetch),
))
} else {
Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch))
}
} else {
Arc::clone(sort_input)
}
};
for child in node.children.iter_mut() {
child.data = false;
}
node.data = false;
return Transformed::yes(node);
}
}
Transformed::no(node)
}
fn adjust_window_sort_removal(
mut window_tree: PlanWithCorrespondingSort,
) -> Result<PlanWithCorrespondingSort> {
let child_node = remove_corresponding_sort_from_sub_plan(
window_tree.children.swap_remove(0),
matches!(
window_tree.plan.required_input_distribution()[0],
Distribution::SinglePartition
),
)?;
window_tree.children.push(child_node);
let plan = window_tree.plan.as_any();
let child_plan = &window_tree.children[0].plan;
let (window_expr, new_window) =
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?;
(window_expr, new_window)
} else if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
let window_expr = exec.window_expr();
let new_window =
get_best_fitting_window(window_expr, child_plan, &exec.partition_keys)?;
(window_expr, new_window)
} else {
return plan_err!("Expected WindowAggExec or BoundedWindowAggExec");
};
window_tree.plan = if let Some(new_window) = new_window {
new_window
} else {
let reqs = window_tree
.plan
.required_input_ordering()
.swap_remove(0)
.unwrap_or_default();
let mut child_node = window_tree.children.swap_remove(0);
child_node = add_sort_above(child_node, reqs, None);
let child_plan = Arc::clone(&child_node.plan);
window_tree.children.push(child_node);
if window_expr.iter().all(|e| e.uses_bounded_memory()) {
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
child_plan,
window_expr[0].partition_by().to_vec(),
InputOrderMode::Sorted,
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr.to_vec(),
child_plan,
window_expr[0].partition_by().to_vec(),
)?) as _
}
};
window_tree.data = false;
Ok(window_tree)
}
fn remove_bottleneck_in_subplan(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<PlanWithCorrespondingCoalescePartitions> {
let plan = &requirements.plan;
let children = &mut requirements.children;
if is_coalesce_partitions(&children[0].plan) {
let mut new_child_node = children[0].children.swap_remove(0);
while new_child_node.plan.output_partitioning() == plan.output_partitioning()
&& is_repartition(&new_child_node.plan)
&& is_repartition(plan)
{
new_child_node = new_child_node.children.swap_remove(0)
}
children[0] = new_child_node;
} else {
requirements.children = requirements
.children
.into_iter()
.map(|node| {
if node.data {
remove_bottleneck_in_subplan(node)
} else {
Ok(node)
}
})
.collect::<Result<_>>()?;
}
let mut new_reqs = requirements.update_plan_from_children()?;
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
let input_partitioning = repartition.input().output_partitioning();
let mut can_remove = input_partitioning.eq(repartition.partitioning());
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
}
Ok(new_reqs)
}
fn update_child_to_remove_unnecessary_sort(
child_idx: usize,
mut node: PlanWithCorrespondingSort,
parent: &Arc<dyn ExecutionPlan>,
) -> Result<PlanWithCorrespondingSort> {
if node.data {
let requires_single_partition = matches!(
parent.required_input_distribution()[child_idx],
Distribution::SinglePartition
);
node = remove_corresponding_sort_from_sub_plan(node, requires_single_partition)?;
}
node.data = false;
Ok(node)
}
fn remove_corresponding_sort_from_sub_plan(
mut node: PlanWithCorrespondingSort,
requires_single_partition: bool,
) -> Result<PlanWithCorrespondingSort> {
if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
if sort_exec.fetch().is_none() {
node = node.children.swap_remove(0);
}
} else {
let mut any_connection = false;
let required_dist = node.plan.required_input_distribution();
node.children = node
.children
.into_iter()
.enumerate()
.map(|(idx, child)| {
if child.data {
any_connection = true;
remove_corresponding_sort_from_sub_plan(
child,
matches!(required_dist[idx], Distribution::SinglePartition),
)
} else {
Ok(child)
}
})
.collect::<Result<_>>()?;
if any_connection || node.children.is_empty() {
node = update_sort_ctx_children(node, false)?;
}
if is_sort_preserving_merge(&node.plan) {
node.children = node.children.swap_remove(0).children;
node.plan = Arc::clone(node.plan.children().swap_remove(0));
} else if let Some(repartition) =
node.plan.as_any().downcast_ref::<RepartitionExec>()
{
node.plan = Arc::new(RepartitionExec::try_new(
Arc::clone(&node.children[0].plan),
repartition.properties().output_partitioning().clone(),
)?) as _;
}
};
if requires_single_partition && node.plan.output_partitioning().partition_count() > 1
{
let plan = Arc::clone(&node.plan);
let fetch = plan.fetch();
let plan = if let Some(ordering) = plan.output_ordering() {
Arc::new(
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
.with_fetch(fetch),
) as _
} else {
Arc::new(CoalescePartitionsExec::new(plan)) as _
};
node = PlanWithCorrespondingSort::new(plan, false, vec![node]);
node = update_sort_ctx_children(node, false)?;
}
Ok(node)
}
fn get_sort_exprs(
sort_any: &Arc<dyn ExecutionPlan>,
) -> Result<(&LexOrdering, Option<usize>)> {
if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
Ok((sort_exec.expr(), sort_exec.fetch()))
} else if let Some(spm) = sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
{
Ok((spm.expr(), spm.fetch()))
} else {
plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec")
}
}