pub struct SymmetricHashJoinExec { /* private fields */ }
Expand description
A symmetric hash join with range conditions is when both streams are hashed on the join key and the resulting hash tables are used to join the streams. The join is considered symmetric because the hash table is built on the join keys from both streams, and the matching of rows is based on the values of the join keys in both streams. This type of join is efficient in streaming context as it allows for fast lookups in the hash table, rather than having to scan through one or both of the streams to find matching rows, also it only considers the elements from the stream that fall within a certain sliding window (w/ range conditions), making it more efficient and less likely to store stale data. This enables operating on unbounded streaming data without any memory issues.
For each input stream, create a hash table.
- For each new RecordBatch in build side, hash and insert into inputs hash table. Update offsets.
- Test if input is equal to a predefined set of other inputs.
- If so record the visited rows. If the matched row results must be produced (INNER, LEFT), output the RecordBatch.
- Try to prune other side (probe) with new RecordBatch.
- If the join type indicates that the unmatched rows results must be produced (LEFT, FULL etc.), output the RecordBatch when a pruning happens or at the end of the data.
+-------------------------+
| |
left stream ---------| Left OneSideHashJoiner |---+
| | |
+-------------------------+ |
|
|--------- Joined output
|
+-------------------------+ |
| | |
right stream ---------| Right OneSideHashJoiner |---+
| |
+-------------------------+
Prune build side when the new RecordBatch comes to the probe side. We utilize interval arithmetic
on JoinFilter's sorted PhysicalExprs to calculate the joinable range.
PROBE SIDE BUILD SIDE
BUFFER BUFFER
+-------------+ +------------+
| | | | Unjoinable
| | | | Range
| | | |
| | |---------------------------------
| | | | |
| | | | |
| | / | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | | Joinable
| |/ | | Range
| || | |
|+-----------+|| | |
|| Record || | |
|| Batch || | |
|+-----------+|| | |
+-------------+\ +------------+
|
\
|---------------------------------
This happens when range conditions are provided on sorted columns. E.g.
SELECT * FROM left_table, right_table
ON
left_key = right_key AND
left_time > right_time - INTERVAL 12 MINUTES AND left_time < right_time + INTERVAL 2 HOUR
or
SELECT * FROM left_table, right_table
ON
left_key = right_key AND
left_sorted > right_sorted - 3 AND left_sorted < right_sorted + 10
For general purpose, in the second scenario, when the new data comes to probe side, the conditions can be used to
determine a specific threshold for discarding rows from the inner buffer. For example, if the sort order the
two columns ("left_sorted" and "right_sorted") are ascending (it can be different in another scenarios)
and the join condition is "left_sorted > right_sorted - 3" and the latest value on the right input is 1234, meaning
that the left side buffer must only keep rows where "leftTime > rightTime - 3 > 1234 - 3 > 1231" ,
making the smallest value in 'left_sorted' 1231 and any rows below (since ascending)
than that can be dropped from the inner buffer.
Implementations§
source§impl SymmetricHashJoinExec
impl SymmetricHashJoinExec
sourcepub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
null_equals_null: bool,
left_sort_exprs: Option<Vec<PhysicalSortExpr>>,
right_sort_exprs: Option<Vec<PhysicalSortExpr>>,
mode: StreamJoinPartitionMode,
) -> Result<Self>
pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, on: JoinOn, filter: Option<JoinFilter>, join_type: &JoinType, null_equals_null: bool, left_sort_exprs: Option<Vec<PhysicalSortExpr>>, right_sort_exprs: Option<Vec<PhysicalSortExpr>>, mode: StreamJoinPartitionMode, ) -> Result<Self>
Tries to create a new SymmetricHashJoinExec.
§Error
This function errors when:
- It is not possible to join the left and right sides on keys
on
, or - It fails to construct
SortedFilterExpr
s, or - It fails to create the ExprIntervalGraph.
sourcepub fn left(&self) -> &Arc<dyn ExecutionPlan>
pub fn left(&self) -> &Arc<dyn ExecutionPlan>
left stream
sourcepub fn right(&self) -> &Arc<dyn ExecutionPlan>
pub fn right(&self) -> &Arc<dyn ExecutionPlan>
right stream
sourcepub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
Set of common columns used to join on
sourcepub fn filter(&self) -> Option<&JoinFilter>
pub fn filter(&self) -> Option<&JoinFilter>
Filters applied before join output
sourcepub fn null_equals_null(&self) -> bool
pub fn null_equals_null(&self) -> bool
Get null_equals_null
sourcepub fn partition_mode(&self) -> StreamJoinPartitionMode
pub fn partition_mode(&self) -> StreamJoinPartitionMode
Get partition mode
sourcepub fn left_sort_exprs(&self) -> Option<&[PhysicalSortExpr]>
pub fn left_sort_exprs(&self) -> Option<&[PhysicalSortExpr]>
Get left_sort_exprs
sourcepub fn right_sort_exprs(&self) -> Option<&[PhysicalSortExpr]>
pub fn right_sort_exprs(&self) -> Option<&[PhysicalSortExpr]>
Get right_sort_exprs
sourcepub fn check_if_order_information_available(&self) -> Result<bool>
pub fn check_if_order_information_available(&self) -> Result<bool>
Check if order information covers every column in the filter expression.
Trait Implementations§
source§impl Debug for SymmetricHashJoinExec
impl Debug for SymmetricHashJoinExec
source§impl DisplayAs for SymmetricHashJoinExec
impl DisplayAs for SymmetricHashJoinExec
source§impl ExecutionPlan for SymmetricHashJoinExec
impl ExecutionPlan for SymmetricHashJoinExec
source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
source§fn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Any
so that it can be
downcast to a specific implementation.source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan
, such as output
ordering(s), partitioning information etc. Read moresource§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,source§fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
ExecutionPlan
. Read moresource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan
where all existing children were replaced
by the children
, in ordersource§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None. Read moresource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.source§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
source§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name
but can be called without an instance.source§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions. Read moresource§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan
benefits from increased
parallelization at its input for each child. Read moresource§fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan
to
produce target_partitions
partitions. Read moresource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.Auto Trait Implementations§
impl Freeze for SymmetricHashJoinExec
impl !RefUnwindSafe for SymmetricHashJoinExec
impl Send for SymmetricHashJoinExec
impl Sync for SymmetricHashJoinExec
impl Unpin for SymmetricHashJoinExec
impl !UnwindSafe for SymmetricHashJoinExec
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more