Struct datafusion_physical_plan::joins::HashJoinExec
source · pub struct HashJoinExec {
pub left: Arc<dyn ExecutionPlan>,
pub right: Arc<dyn ExecutionPlan>,
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
pub mode: PartitionMode,
pub projection: Option<Vec<usize>>,
pub null_equals_null: bool,
/* private fields */
}
Expand description
Join execution plan: Evaluates eqijoin predicates in parallel on multiple partitions using a hash table and an optional filter list to apply post join.
§Join Expressions
This implementation is optimized for evaluating eqijoin predicates (
<col1> = <col2>
) expressions, which are represented as a list of Columns
in Self::on
.
Non-equality predicates, which can not pushed down to a join inputs (e.g.
<col1> != <col2>
) are known as “filter expressions” and are evaluated
after the equijoin predicates.
§“Build Side” vs “Probe Side”
HashJoin takes two inputs, which are referred to as the “build” and the “probe”. The build side is the first child, and the probe side is the second child.
The two inputs are treated differently and it is VERY important that the smaller input is placed on the build side to minimize the work of creating the hash table.
┌───────────┐
│ HashJoin │
│ │
└───────────┘
│ │
┌─────┘ └─────┐
▼ ▼
┌────────────┐ ┌─────────────┐
│ Input │ │ Input │
│ [0] │ │ [1] │
└────────────┘ └─────────────┘
"build side" "probe side"
Execution proceeds in 2 stages:
- the build phase creates a hash table from the tuples of the build side, and single concatenated batch containing data from all fetched record batches. Resulting hash table stores hashed join-key fields for each row as a key, and indices of corresponding rows in concatenated batch.
Hash join uses LIFO data structure as a hash table, and in order to retain original build-side input order while obtaining data during probe phase, hash table is updated by iterating batch sequence in reverse order – it allows to keep rows with smaller indices “on the top” of hash table, and still maintain correct indexing for concatenated build-side data batch.
Example of build phase for 3 record batches:
Original build-side data Inserting build-side values into hashmap Concatenated build-side batch
┌───────────────────────────┐
hasmap.insert(row-hash, row-idx + offset) │ idx │
┌───────┐ │ ┌───────┐ │
│ Row 1 │ 1) update_hash for batch 3 with offset 0 │ │ Row 6 │ 0 │
Batch 1 │ │ - hashmap.insert(Row 7, idx 1) │ Batch 3 │ │ │
│ Row 2 │ - hashmap.insert(Row 6, idx 0) │ │ Row 7 │ 1 │
└───────┘ │ └───────┘ │
│ │
┌───────┐ │ ┌───────┐ │
│ Row 3 │ 2) update_hash for batch 2 with offset 2 │ │ Row 3 │ 2 │
│ │ - hashmap.insert(Row 5, idx 4) │ │ │ │
Batch 2 │ Row 4 │ - hashmap.insert(Row 4, idx 3) │ Batch 2 │ Row 4 │ 3 │
│ │ - hashmap.insert(Row 3, idx 2) │ │ │ │
│ Row 5 │ │ │ Row 5 │ 4 │
└───────┘ │ └───────┘ │
│ │
┌───────┐ │ ┌───────┐ │
│ Row 6 │ 3) update_hash for batch 1 with offset 5 │ │ Row 1 │ 5 │
Batch 3 │ │ - hashmap.insert(Row 2, idx 5) │ Batch 1 │ │ │
│ Row 7 │ - hashmap.insert(Row 1, idx 6) │ │ Row 2 │ 6 │
└───────┘ │ └───────┘ │
│ │
└───────────────────────────┘
- the probe phase where the tuples of the probe side are streamed through, checking for matches of the join keys in the hash table.
┌────────────────┐ ┌────────────────┐
│ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Hash │ │ │ │ Hash │ │
│ │ Table │ │ │ │ Table │ │
│ │(keys are│ │ │ │(keys are│ │
│ │equi join│ │ │ │equi join│ │ Stage 2: batches from
Stage 1: the │ │columns) │ │ │ │columns) │ │ the probe side are
*entire* build │ │ │ │ │ │ │ │ streamed through, and
side is read │ └─────────┘ │ │ └─────────┘ │ checked against the
into the hash │ ▲ │ │ ▲ │ contents of the hash
table │ HashJoin │ │ HashJoin │ table
└──────┼─────────┘ └──────────┼─────┘
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│ │
│ │
┌────────────┐ ┌────────────┐
│RecordBatch │ │RecordBatch │
└────────────┘ └────────────┘
┌────────────┐ ┌────────────┐
│RecordBatch │ │RecordBatch │
└────────────┘ └────────────┘
... ...
┌────────────┐ ┌────────────┐
│RecordBatch │ │RecordBatch │
└────────────┘ └────────────┘
build side probe side
§Example “Optimal” Plans
The differences in the inputs means that for classic “Star Schema Query”,
the optimal plan will be a “Right Deep Tree” . A Star Schema Query is
one where there is one large table and several smaller “dimension” tables,
joined on Foreign Key = Primary Key
predicates.
A “Right Deep Tree” looks like this large table as the probe side on the lowest join:
┌───────────┐
│ HashJoin │
│ │
└───────────┘
│ │
┌───────┘ └──────────┐
▼ ▼
┌───────────────┐ ┌───────────┐
│ small table 1 │ │ HashJoin │
│ "dimension" │ │ │
└───────────────┘ └───┬───┬───┘
┌──────────┘ └───────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────┐
│ small table 2 │ │ HashJoin │
│ "dimension" │ │ │
└───────────────┘ └───┬───┬───┘
┌────────┘ └────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ small table 3 │ │ large table │
│ "dimension" │ │ "fact" │
└───────────────┘ └───────────────┘
Fields§
§left: Arc<dyn ExecutionPlan>
left (build) side which gets hashed
right: Arc<dyn ExecutionPlan>
right (probe) side which are filtered by the hash table
on: Vec<(PhysicalExprRef, PhysicalExprRef)>
Set of equijoin columns from the relations: (left_col, right_col)
filter: Option<JoinFilter>
Filters which are applied while finding matching rows
join_type: JoinType
How the join is performed (OUTER
, INNER
, etc)
mode: PartitionMode
Partitioning mode to use
projection: Option<Vec<usize>>
The projection indices of the columns in the output schema of join
null_equals_null: bool
Null matching behavior: If null_equals_null
is true, rows that have
null
s in both left and right equijoin columns will be matched.
Otherwise, rows that have null
s in the join columns will not be
matched and thus will not appear in the output.
Implementations§
source§impl HashJoinExec
impl HashJoinExec
sourcepub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
partition_mode: PartitionMode,
null_equals_null: bool,
) -> Result<Self>
pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, on: JoinOn, filter: Option<JoinFilter>, join_type: &JoinType, projection: Option<Vec<usize>>, partition_mode: PartitionMode, null_equals_null: bool, ) -> Result<Self>
Tries to create a new HashJoinExec.
§Error
This function errors when it is not possible to join the left and right sides on keys on
.
sourcepub fn left(&self) -> &Arc<dyn ExecutionPlan>
pub fn left(&self) -> &Arc<dyn ExecutionPlan>
left (build) side which gets hashed
sourcepub fn right(&self) -> &Arc<dyn ExecutionPlan>
pub fn right(&self) -> &Arc<dyn ExecutionPlan>
right (probe) side which are filtered by the hash table
sourcepub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
Set of common columns used to join on
sourcepub fn filter(&self) -> Option<&JoinFilter>
pub fn filter(&self) -> Option<&JoinFilter>
Filters applied before join output
sourcepub fn partition_mode(&self) -> &PartitionMode
pub fn partition_mode(&self) -> &PartitionMode
The partitioning mode of this hash join
sourcepub fn null_equals_null(&self) -> bool
pub fn null_equals_null(&self) -> bool
Get null_equals_null
sourcepub fn probe_side() -> JoinSide
pub fn probe_side() -> JoinSide
Get probe side information for the hash join.
sourcepub fn contain_projection(&self) -> bool
pub fn contain_projection(&self) -> bool
Return whether the join contains a projection
sourcepub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>
Return new instance of HashJoinExec with the given projection.
Trait Implementations§
source§impl Debug for HashJoinExec
impl Debug for HashJoinExec
source§impl DisplayAs for HashJoinExec
impl DisplayAs for HashJoinExec
source§impl ExecutionPlan for HashJoinExec
impl ExecutionPlan for HashJoinExec
source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
source§fn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Any
so that it can be
downcast to a specific implementation.source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan
, such as output
ordering(s), partitioning information etc. Read moresource§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,source§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions. Read moresource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan
where all existing children were replaced
by the children
, in ordersource§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None. Read moresource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.source§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name
but can be called without an instance.source§fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
ExecutionPlan
. Read moresource§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan
benefits from increased
parallelization at its input for each child. Read moresource§fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan
to
produce target_partitions
partitions. Read moresource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.Auto Trait Implementations§
impl !Freeze for HashJoinExec
impl !RefUnwindSafe for HashJoinExec
impl Send for HashJoinExec
impl Sync for HashJoinExec
impl Unpin for HashJoinExec
impl !UnwindSafe for HashJoinExec
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more