pub struct SortMergeJoinExec {
pub left: Arc<dyn ExecutionPlan>,
pub right: Arc<dyn ExecutionPlan>,
pub on: JoinOn,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
pub sort_options: Vec<SortOptions>,
pub null_equals_null: bool,
/* private fields */
}
Expand description
Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge join algorithm and applies an optional filter post join. Can be used to join arbitrarily large inputs where one or both of the inputs don’t fit in the available memory.
§Join Expressions
Equi-join predicate (e.g. <col1> = <col2>
) expressions are represented by Self::on
.
Non-equality predicates, which can not be pushed down to join inputs (e.g.
<col1> != <col2>
) are known as “filter expressions” and are evaluated
after the equijoin predicates. They are represented by Self::filter
. These are optional
expressions.
§Sorting
Assumes that both the left and right input to the join are pre-sorted. It is not the responsibility of this execution plan to sort the inputs.
§“Streamed” vs “Buffered”
The number of record batches of streamed input currently present in the memory will depend
on the output batch size of the execution plan. There is no spilling support for streamed input.
The comparisons are performed from values of join keys in streamed input with the values of
join keys in buffered input. One row in streamed record batch could be matched with multiple rows in
buffered input batches. The streamed input is managed through the states in StreamedState
and streamed input batches are represented by StreamedBatch
.
Buffered input is buffered for all record batches having the same value of join key.
If the memory limit increases beyond the specified value and spilling is enabled,
buffered batches could be spilled to disk. If spilling is disabled, the execution
will fail under the same conditions. Multiple record batches of buffered could currently reside
in memory/disk during the execution. The number of buffered batches residing in
memory/disk depends on the number of rows of buffered input having the same value
of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs,
the algorithm understands when it is not needed anymore, and releases the buffered batches
from memory/disk. The buffered input is managed through the states in BufferedState
and buffered input batches are represented by BufferedBatch
.
Depending on the type of join, left or right input may be selected as streamed or buffered respectively. For example, in a left-outer join, the left execution plan will be selected as streamed input while in a right-outer join, the right execution plan will be selected as the streamed input.
Reference for the algorithm: https://en.wikipedia.org/wiki/Sort-merge_join.
Helpful short video demonstration: https://www.youtube.com/watch?v=jiWCPJtDE2c.
Fields§
§left: Arc<dyn ExecutionPlan>
Left sorted joining execution plan
right: Arc<dyn ExecutionPlan>
Right sorting joining execution plan
on: JoinOn
Set of common columns used to join on
filter: Option<JoinFilter>
Filters which are applied while finding matching rows
join_type: JoinType
How the join is performed
sort_options: Vec<SortOptions>
Sort options of join columns used in sorting left and right execution plans
null_equals_null: bool
If null_equals_null is true, null == null else null != null
Implementations§
Source§impl SortMergeJoinExec
impl SortMergeJoinExec
Sourcepub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: JoinType,
sort_options: Vec<SortOptions>,
null_equals_null: bool,
) -> Result<Self>
pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, on: JoinOn, filter: Option<JoinFilter>, join_type: JoinType, sort_options: Vec<SortOptions>, null_equals_null: bool, ) -> Result<Self>
Tries to create a new SortMergeJoinExec.
The inputs are sorted using sort_options
are applied to the columns in the on
§Error
This function errors when it is not possible to join the left and right sides on keys on
.
Sourcepub fn probe_side(join_type: &JoinType) -> JoinSide
pub fn probe_side(join_type: &JoinType) -> JoinSide
Get probe side (e.g streaming side) information for this sort merge join. In current implementation, probe side is determined according to join type.
Sourcepub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
Set of common columns used to join on
Sourcepub fn right(&self) -> &Arc<dyn ExecutionPlan>
pub fn right(&self) -> &Arc<dyn ExecutionPlan>
Ref to right execution plan
Sourcepub fn left(&self) -> &Arc<dyn ExecutionPlan>
pub fn left(&self) -> &Arc<dyn ExecutionPlan>
Ref to left execution plan
Sourcepub fn filter(&self) -> &Option<JoinFilter>
pub fn filter(&self) -> &Option<JoinFilter>
Ref to join filter
Sourcepub fn sort_options(&self) -> &[SortOptions]
pub fn sort_options(&self) -> &[SortOptions]
Ref to sort options
Sourcepub fn null_equals_null(&self) -> bool
pub fn null_equals_null(&self) -> bool
Null equals null
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>
Trait Implementations§
Source§impl Clone for SortMergeJoinExec
impl Clone for SortMergeJoinExec
Source§fn clone(&self) -> SortMergeJoinExec
fn clone(&self) -> SortMergeJoinExec
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreSource§impl Debug for SortMergeJoinExec
impl Debug for SortMergeJoinExec
Source§impl DisplayAs for SortMergeJoinExec
impl DisplayAs for SortMergeJoinExec
Source§impl ExecutionPlan for SortMergeJoinExec
impl ExecutionPlan for SortMergeJoinExec
Source§fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
Tries to swap the projection with its input SortMergeJoinExec
. If it can be done,
it returns the new swapped version having the SortMergeJoinExec
as the top plan.
Otherwise, it returns None.
Source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
Source§fn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Any
so that it can be
downcast to a specific implementation.Source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan
, such as output
ordering(s), partitioning information etc. Read moreSource§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,Source§fn required_input_ordering(&self) -> Vec<Option<LexRequirement>>
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>>
ExecutionPlan
. Read moreSource§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions. Read moreSource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).Source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan
where all existing children were replaced
by the children
, in orderSource§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
Source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None. Read moreSource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error. Read moreSource§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name
but can be called without an instance.Source§fn check_invariants(&self, _check: InvariantLevel) -> Result<()>
fn check_invariants(&self, _check: InvariantLevel) -> Result<()>
Source§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan
benefits from increased
parallelization at its input for each child. Read moreSource§fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan
to
produce target_partitions
partitions. Read moreSource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.Source§fn fetch(&self) -> Option<usize>
fn fetch(&self) -> Option<usize>
None
means there is no fetch.Source§fn cardinality_effect(&self) -> CardinalityEffect
fn cardinality_effect(&self) -> CardinalityEffect
Auto Trait Implementations§
impl Freeze for SortMergeJoinExec
impl !RefUnwindSafe for SortMergeJoinExec
impl Send for SortMergeJoinExec
impl Sync for SortMergeJoinExec
impl Unpin for SortMergeJoinExec
impl !UnwindSafe for SortMergeJoinExec
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more