datafusion_physical_plan::joins

Struct HashJoinExec

Source
pub struct HashJoinExec {
    pub left: Arc<dyn ExecutionPlan>,
    pub right: Arc<dyn ExecutionPlan>,
    pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
    pub filter: Option<JoinFilter>,
    pub join_type: JoinType,
    pub mode: PartitionMode,
    pub projection: Option<Vec<usize>>,
    pub null_equals_null: bool,
    /* private fields */
}
Expand description

Join execution plan: Evaluates eqijoin predicates in parallel on multiple partitions using a hash table and an optional filter list to apply post join.

§Join Expressions

This implementation is optimized for evaluating eqijoin predicates ( <col1> = <col2>) expressions, which are represented as a list of Columns in Self::on.

Non-equality predicates, which can not pushed down to a join inputs (e.g. <col1> != <col2>) are known as “filter expressions” and are evaluated after the equijoin predicates.

§“Build Side” vs “Probe Side”

HashJoin takes two inputs, which are referred to as the “build” and the “probe”. The build side is the first child, and the probe side is the second child.

The two inputs are treated differently and it is VERY important that the smaller input is placed on the build side to minimize the work of creating the hash table.

         ┌───────────┐
         │ HashJoin  │
         │           │
         └───────────┘
             │   │
       ┌─────┘   └─────┐
       ▼               ▼
┌────────────┐  ┌─────────────┐
│   Input    │  │    Input    │
│    [0]     │  │     [1]     │
└────────────┘  └─────────────┘

 "build side"    "probe side"

Execution proceeds in 2 stages:

  1. the build phase creates a hash table from the tuples of the build side, and single concatenated batch containing data from all fetched record batches. Resulting hash table stores hashed join-key fields for each row as a key, and indices of corresponding rows in concatenated batch.

Hash join uses LIFO data structure as a hash table, and in order to retain original build-side input order while obtaining data during probe phase, hash table is updated by iterating batch sequence in reverse order – it allows to keep rows with smaller indices “on the top” of hash table, and still maintain correct indexing for concatenated build-side data batch.

Example of build phase for 3 record batches:


 Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
                                                                        ┌───────────────────────────┐
                            hasmap.insert(row-hash, row-idx + offset)   │                      idx  │
           ┌───────┐                                                    │          ┌───────┐        │
           │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
  Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
           │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
           └───────┘                                                    │          └───────┘        │
                                                                        │                           │
           ┌───────┐                                                    │          ┌───────┐        │
           │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
           │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
  Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
           │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
           │ Row 5 │                                                    │          │ Row 5 │    4   │
           └───────┘                                                    │          └───────┘        │
                                                                        │                           │
           ┌───────┐                                                    │          ┌───────┐        │
           │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
  Batch 3  │       │           - hashmap.insert(Row 2, idx 5)           │ Batch 1  │       │        │
           │ Row 7 │           - hashmap.insert(Row 1, idx 6)           │          │ Row 2 │    6   │
           └───────┘                                                    │          └───────┘        │
                                                                        │                           │
                                                                        └───────────────────────────┘
  1. the probe phase where the tuples of the probe side are streamed through, checking for matches of the join keys in the hash table.
                ┌────────────────┐          ┌────────────────┐
                │ ┌─────────┐    │          │ ┌─────────┐    │
                │ │  Hash   │    │          │ │  Hash   │    │
                │ │  Table  │    │          │ │  Table  │    │
                │ │(keys are│    │          │ │(keys are│    │
                │ │equi join│    │          │ │equi join│    │  Stage 2: batches from
 Stage 1: the   │ │columns) │    │          │ │columns) │    │    the probe side are
*entire* build  │ │         │    │          │ │         │    │  streamed through, and
 side is read   │ └─────────┘    │          │ └─────────┘    │   checked against the
into the hash   │      ▲         │          │          ▲     │   contents of the hash
    table       │       HashJoin │          │  HashJoin      │          table
                └──────┼─────────┘          └──────────┼─────┘
            ─ ─ ─ ─ ─ ─                                 ─ ─ ─ ─ ─ ─ ─
           │                                                         │

           │                                                         │
    ┌────────────┐                                            ┌────────────┐
    │RecordBatch │                                            │RecordBatch │
    └────────────┘                                            └────────────┘
    ┌────────────┐                                            ┌────────────┐
    │RecordBatch │                                            │RecordBatch │
    └────────────┘                                            └────────────┘
          ...                                                       ...
    ┌────────────┐                                            ┌────────────┐
    │RecordBatch │                                            │RecordBatch │
    └────────────┘                                            └────────────┘

       build side                                                probe side

§Example “Optimal” Plans

The differences in the inputs means that for classic “Star Schema Query”, the optimal plan will be a “Right Deep Tree” . A Star Schema Query is one where there is one large table and several smaller “dimension” tables, joined on Foreign Key = Primary Key predicates.

A “Right Deep Tree” looks like this large table as the probe side on the lowest join:

            ┌───────────┐
            │ HashJoin  │
            │           │
            └───────────┘
                │   │
        ┌───────┘   └──────────┐
        ▼                      ▼
┌───────────────┐        ┌───────────┐
│ small table 1 │        │ HashJoin  │
│  "dimension"  │        │           │
└───────────────┘        └───┬───┬───┘
                  ┌──────────┘   └───────┐
                  │                      │
                  ▼                      ▼
          ┌───────────────┐        ┌───────────┐
          │ small table 2 │        │ HashJoin  │
          │  "dimension"  │        │           │
          └───────────────┘        └───┬───┬───┘
                              ┌────────┘   └────────┐
                              │                     │
                              ▼                     ▼
                      ┌───────────────┐     ┌───────────────┐
                      │ small table 3 │     │  large table  │
                      │  "dimension"  │     │    "fact"     │
                      └───────────────┘     └───────────────┘

Note that the Clone trait is not implemented for this struct due to the left_fut [OnceAsync], which is used to coordinate the loading of the left side with the processing in each output stream.

Fields§

§left: Arc<dyn ExecutionPlan>

left (build) side which gets hashed

§right: Arc<dyn ExecutionPlan>

right (probe) side which are filtered by the hash table

§on: Vec<(PhysicalExprRef, PhysicalExprRef)>

Set of equijoin columns from the relations: (left_col, right_col)

§filter: Option<JoinFilter>

Filters which are applied while finding matching rows

§join_type: JoinType

How the join is performed (OUTER, INNER, etc)

§mode: PartitionMode

Partitioning mode to use

§projection: Option<Vec<usize>>

The projection indices of the columns in the output schema of join

§null_equals_null: bool

Null matching behavior: If null_equals_null is true, rows that have nulls in both left and right equijoin columns will be matched. Otherwise, rows that have nulls in the join columns will not be matched and thus will not appear in the output.

Implementations§

Source§

impl HashJoinExec

Source

pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, on: JoinOn, filter: Option<JoinFilter>, join_type: &JoinType, projection: Option<Vec<usize>>, partition_mode: PartitionMode, null_equals_null: bool, ) -> Result<Self>

Tries to create a new HashJoinExec.

§Error

This function errors when it is not possible to join the left and right sides on keys on.

Source

pub fn left(&self) -> &Arc<dyn ExecutionPlan>

left (build) side which gets hashed

Source

pub fn right(&self) -> &Arc<dyn ExecutionPlan>

right (probe) side which are filtered by the hash table

Source

pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]

Set of common columns used to join on

Source

pub fn filter(&self) -> Option<&JoinFilter>

Filters applied before join output

Source

pub fn join_type(&self) -> &JoinType

How the join is performed

Source

pub fn join_schema(&self) -> &SchemaRef

The schema after join. Please be careful when using this schema, if there is a projection, the schema isn’t the same as the output schema.

Source

pub fn partition_mode(&self) -> &PartitionMode

The partitioning mode of this hash join

Source

pub fn null_equals_null(&self) -> bool

Get null_equals_null

Source

pub fn probe_side() -> JoinSide

Get probe side information for the hash join.

Source

pub fn contain_projection(&self) -> bool

Return whether the join contains a projection

Source

pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>

Return new instance of HashJoinExec with the given projection.

Trait Implementations§

Source§

impl Debug for HashJoinExec

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl DisplayAs for HashJoinExec

Source§

fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> Result

Format according to DisplayFormatType, used when verbose representation looks different from the default one Read more
Source§

impl ExecutionPlan for HashJoinExec

Source§

fn name(&self) -> &'static str

Short name for the ExecutionPlan, such as ‘ParquetExec’. Read more
Source§

fn as_any(&self) -> &dyn Any

Returns the execution plan as Any so that it can be downcast to a specific implementation.
Source§

fn properties(&self) -> &PlanProperties

Return properties of the output of the ExecutionPlan, such as output ordering(s), partitioning information etc. Read more
Source§

fn required_input_distribution(&self) -> Vec<Distribution>

Specifies the data distribution requirements for all the children for this ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,
Source§

fn maintains_input_order(&self) -> Vec<bool>

Returns false if this ExecutionPlan’s implementation may reorder rows within or between partitions. Read more
Source§

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>

Get a list of children ExecutionPlans that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).
Source§

fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>

Returns a new ExecutionPlan where all existing children were replaced by the children, in order
Source§

fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>

Begin execution of partition, returning a Stream of RecordBatches. Read more
Source§

fn metrics(&self) -> Option<MetricsSet>

Return a snapshot of the set of Metrics for this ExecutionPlan. If no Metrics are available, return None. Read more
Source§

fn statistics(&self) -> Result<Statistics>

Returns statistics for this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error. Read more
Source§

fn static_name() -> &'static str
where Self: Sized,

Short name for the ExecutionPlan, such as ‘ParquetExec’. Like name but can be called without an instance.
Source§

fn schema(&self) -> SchemaRef

Get the schema for this execution plan
Source§

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>>

Specifies the ordering required for all of the children of this ExecutionPlan. Read more
Source§

fn benefits_from_input_partitioning(&self) -> Vec<bool>

Specifies whether the ExecutionPlan benefits from increased parallelization at its input for each child. Read more
Source§

fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>

If supported, attempt to increase the partitioning of this ExecutionPlan to produce target_partitions partitions. Read more
Source§

fn supports_limit_pushdown(&self) -> bool

Returns true if a limit can be safely pushed down through this ExecutionPlan node. Read more
Source§

fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>

Returns a fetching variant of this ExecutionPlan node, if it supports fetch limits. Returns None otherwise.
Source§

fn fetch(&self) -> Option<usize>

Gets the fetch count for the operator, None means there is no fetch.
Source§

fn cardinality_effect(&self) -> CardinalityEffect

Gets the effect on cardinality, if known

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> MaybeSendSync for T