datafusion_physical_plan::execution_plan

Trait Accumulator

Source
pub trait Accumulator:
    Send
    + Sync
    + Debug {
    // Required methods
    fn update_batch(
        &mut self,
        values: &[Arc<dyn Array>],
    ) -> Result<(), DataFusionError>;
    fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>;
    fn size(&self) -> usize;
    fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>;
    fn merge_batch(
        &mut self,
        states: &[Arc<dyn Array>],
    ) -> Result<(), DataFusionError>;

    // Provided methods
    fn retract_batch(
        &mut self,
        _values: &[Arc<dyn Array>],
    ) -> Result<(), DataFusionError> { ... }
    fn supports_retract_batch(&self) -> bool { ... }
}
Expand description

Tracks an aggregate function’s state.

Accumulators are stateful objects that implement a single group. They aggregate values from multiple rows together into a final output aggregate.

[GroupsAccumulator] is an additional more performant (but also complex) API that manages state for multiple groups at once.

An accumulator knows how to:

  • update its state from inputs via update_batch

  • compute the final value from its internal state via evaluate

  • retract an update to its state from given inputs via retract_batch (when used as a window aggregate window function)

  • convert its internal state to a vector of aggregate values via state and combine the state from multiple accumulators via merge_batch, as part of efficient multi-phase grouping.

Required Methods§

Source

fn update_batch( &mut self, values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Updates the accumulator’s state from its input.

values contains the arguments to this aggregate function.

For example, the SUM accumulator maintains a running sum, and update_batch adds each of the input values to the running sum.

Source

fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>

Returns the final aggregate value, consuming the internal state.

For example, the SUM accumulator maintains a running sum, and evaluate will produce that running sum as its output.

This function should not be called twice, otherwise it will result in potentially non-deterministic behavior.

This function gets &mut self to allow for the accumulator to build arrow-compatible internal state that can be returned without copying when possible (for example distinct strings)

Source

fn size(&self) -> usize

Returns the allocated size required for this accumulator, in bytes, including Self.

This value is used to calculate the memory used during execution so DataFusion can stay within its allotted limit.

“Allocated” means that for internal containers such as Vec, the capacity should be used not the len.

Source

fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>

Returns the intermediate state of the accumulator, consuming the intermediate state.

This function should not be called twice, otherwise it will result in potentially non-deterministic behavior.

This function gets &mut self to allow for the accumulator to build arrow-compatible internal state that can be returned without copying when possible (for example distinct strings).

Intermediate state is used for “multi-phase” grouping in DataFusion, where an aggregate is computed in parallel with multiple Accumulator instances, as described below:

§Multi-Phase Grouping
                              ▲
                              │                   evaluate() is called to
                              │                   produce the final aggregate
                              │                   value per group
                              │
                 ┌─────────────────────────┐
                 │GroupBy                  │
                 │(AggregateMode::Final)   │      state() is called for each
                 │                         │      group and the resulting
                 └─────────────────────────┘      RecordBatches passed to the
                              ▲
                              │
             ┌────────────────┴───────────────┐
             │                                │
             │                                │
┌─────────────────────────┐      ┌─────────────────────────┐
│        GroubyBy         │      │        GroubyBy         │
│(AggregateMode::Partial) │      │(AggregateMode::Partial) │
└─────────────────────────┘      └─────────────────────────┘
             ▲                                ▲
             │                                │    update_batch() is called for
             │                                │    each input RecordBatch
        .─────────.                      .─────────.
     ,─'           '─.                ,─'           '─.
    ;      Input      :              ;      Input      :
    :   Partition 0   ;              :   Partition 1   ;
     ╲               ╱                ╲               ╱
      '─.         ,─'                  '─.         ,─'
         `───────'                        `───────'

The partial state is serialized as Arrays and then combined with other partial states from different instances of this Accumulator (that ran on different partitions, for example).

The state can be and often is a different type than the output type of the Accumulator and needs different merge operations (for example, the partial state for COUNT needs to be summed together)

Some accumulators can return multiple values for their intermediate states. For example, the average accumulator tracks sum and n, and this function should return a vector of two values, sum and n.

Note that ScalarValue::List can be used to pass multiple values if the number of intermediate values is not known at planning time (e.g. for MEDIAN)

§Multi-phase repartitioned Grouping

Many multi-phase grouping plans contain a Repartition operation as well as shown below:

               ▲                          ▲
               │                          │
               │                          │
               │                          │
               │                          │
               │                          │
   ┌───────────────────────┐  ┌───────────────────────┐       4. Each AggregateMode::Final
   │GroupBy                │  │GroupBy                │       GroupBy has an entry for its
   │(AggregateMode::Final) │  │(AggregateMode::Final) │       subset of groups (in this case
   │                       │  │                       │       that means half the entries)
   └───────────────────────┘  └───────────────────────┘
               ▲                          ▲
               │                          │
               └─────────────┬────────────┘
                             │
                             │
                             │
                ┌─────────────────────────┐                   3. Repartitioning by hash(group
                │       Repartition       │                   keys) ensures that each distinct
                │         HASH(x)         │                   group key now appears in exactly
                └─────────────────────────┘                   one partition
                             ▲
                             │
             ┌───────────────┴─────────────┐
             │                             │
             │                             │
┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial
│        GroubyBy         │  │         GroubyBy         │     GroupBy has an entry for *all*
│(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     the groups
└─────────────────────────┘  └──────────────────────────┘
             ▲                             ▲
             │                             │
             │                             │
        .─────────.                   .─────────.
     ,─'           '─.             ,─'           '─.
    ;      Input      :           ;      Input      :         1. Since input data is
    :   Partition 0   ;           :   Partition 1   ;         arbitrarily or RoundRobin
     ╲               ╱             ╲               ╱          distributed, each partition
      '─.         ,─'               '─.         ,─'           likely has all distinct
         `───────'                     `───────'

This structure is used so that the AggregateMode::Partial accumulators reduces the cardinality of the input as soon as possible. Typically, each partial accumulator sees all groups in the input as the group keys are evenly distributed across the input.

The final output is computed by repartitioning the result of Self::state from each Partial aggregate and hash(group keys) so that each distinct group key appears in exactly one of the AggregateMode::Final GroupBy nodes. The outputs of the final nodes are then unioned together to produce the overall final output.

Here is an example that shows the distribution of groups in the different phases

              ┌─────┐                ┌─────┐
              │  1  │                │  3  │
              ├─────┤                ├─────┤
              │  2  │                │  4  │                After repartitioning by
              └─────┘                └─────┘                hash(group keys), each distinct
              ┌─────┐                ┌─────┐                group key now appears in exactly
              │  1  │                │  3  │                one partition
              ├─────┤                ├─────┤
              │  2  │                │  4  │
              └─────┘                └─────┘


─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

              ┌─────┐                ┌─────┐
              │  2  │                │  2  │
              ├─────┤                ├─────┤
              │  1  │                │  2  │
              ├─────┤                ├─────┤
              │  3  │                │  3  │
              ├─────┤                ├─────┤
              │  4  │                │  1  │
              └─────┘                └─────┘                Input data is arbitrarily or
                ...                    ...                  RoundRobin distributed, each
              ┌─────┐                ┌─────┐                partition likely has all
              │  1  │                │  4  │                distinct group keys
              ├─────┤                ├─────┤
              │  4  │                │  3  │
              ├─────┤                ├─────┤
              │  1  │                │  1  │
              ├─────┤                ├─────┤
              │  4  │                │  3  │
              └─────┘                └─────┘

          group values           group values
          in partition 0         in partition 1
Source

fn merge_batch( &mut self, states: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Updates the accumulator’s state from an Array containing one or more intermediate values.

For some aggregates (such as SUM), merge_batch is the same as update_batch, but for some aggregrates (such as COUNT) the operations differ. See Self::state for more details on how state is used and merged.

The states array passed was formed by concatenating the results of calling Self::state on zero or more other Accumulator instances.

Provided Methods§

Source

fn retract_batch( &mut self, _values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Retracts (removed) an update (caused by the given inputs) to accumulator’s state.

This is the inverse operation of Self::update_batch and is used to incrementally calculate window aggregates where the OVER clause defines a bounded window.

§Example

For example, given the following input partition

                    │      current      │
                           window
                    │                   │
               ┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
    Input      │ A  │ B  │ C  │ D  │ E  │ F  │ G  │ H  │ I  │
  partition    └────┴────┴────┴────┼────┴────┴────┴────┼────┘

                                   │         next      │
                                            window

First, Self::evaluate will be called to produce the output for the current window.

Then, to advance to the next window:

First, Self::retract_batch will be called with the values that are leaving the window, [B, C, D] and then Self::update_batch will be called with the values that are entering the window, [F, G, H].

Source

fn supports_retract_batch(&self) -> bool

Does the accumulator support incrementally updating its value by removing values.

If this function returns true, Self::retract_batch will be called for sliding window functions such as queries with an OVER (ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)

Implementations on Foreign Types§

Source§

impl Accumulator for BytesViewDistinctCountAccumulator

Source§

fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>

Source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn merge_batch( &mut self, states: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>

Source§

fn size(&self) -> usize

Source§

impl<O> Accumulator for BytesDistinctCountAccumulator<O>
where O: OffsetSizeTrait,

Source§

fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>

Source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn merge_batch( &mut self, states: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>

Source§

fn size(&self) -> usize

Source§

impl<T> Accumulator for FloatDistinctCountAccumulator<T>

Source§

fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>

Source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn merge_batch( &mut self, states: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>

Source§

fn size(&self) -> usize

Source§

impl<T> Accumulator for PrimitiveDistinctCountAccumulator<T>

Source§

fn state(&mut self) -> Result<Vec<ScalarValue>, DataFusionError>

Source§

fn update_batch( &mut self, values: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn merge_batch( &mut self, states: &[Arc<dyn Array>], ) -> Result<(), DataFusionError>

Source§

fn evaluate(&mut self) -> Result<ScalarValue, DataFusionError>

Source§

fn size(&self) -> usize

Implementors§