datafusion_execution::memory_pool

Trait MemoryPool

Source
pub trait MemoryPool:
    Send
    + Sync
    + Debug {
    // Required methods
    fn grow(&self, reservation: &MemoryReservation, additional: usize);
    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
    fn try_grow(
        &self,
        reservation: &MemoryReservation,
        additional: usize,
    ) -> Result<()>;
    fn reserved(&self) -> usize;

    // Provided methods
    fn register(&self, _consumer: &MemoryConsumer) { ... }
    fn unregister(&self, _consumer: &MemoryConsumer) { ... }
}
Expand description

Tracks and potentially limits memory use across operators during execution.

§Memory Management Overview

DataFusion is a streaming query engine, processing most queries without buffering the entire input. Most operators require a fixed amount of memory based on the schema and target batch size. However, certain operations such as sorting and grouping/joining, require buffering intermediate results, which can require memory proportional to the number of input rows.

Rather than tracking all allocations, DataFusion takes a pragmatic approach: Intermediate memory used as data streams through the system is not accounted (it assumed to be “small”) but the large consumers of memory must register and constrain their use. This design trades off the additional code complexity of memory tracking with limiting resource usage.

When limiting memory with a MemoryPool you should typically reserve some overhead (e.g. 10%) for the “small” memory allocations that are not tracked.

§Memory Management Design

As explained above, DataFusion’s design ONLY limits operators that require “large” amounts of memory (proportional to number of input rows), such as GroupByHashExec. It does NOT track and limit memory used internally by other operators such as ParquetExec or the RecordBatches that flow between operators.

In order to avoid allocating memory until the OS or the container system kills the process, DataFusion ExecutionPlans (operators) that consume large amounts of memory must first request their desired allocation from a MemoryPool before allocating more. The request is typically managed via a MemoryReservation and MemoryConsumer.

If the allocation is successful, the operator should proceed and allocate the desired memory. If the allocation fails, the operator must either first free memory (e.g. by spilling to local disk) and try again, or error.

Note that a MemoryPool can be shared by concurrently executing plans, which can be used to control memory usage in a multi-tenant system.

§How MemoryPool works by example

Scenario 1: For Filter operator, RecordBatches will stream through it, so it don’t have to keep track of memory usage through MemoryPool.

Scenario 2: For CrossJoin operator, if the input size gets larger, the intermediate state will also grow. So CrossJoin operator will use MemoryPool to limit the memory usage. 2.1 CrossJoin operator has read a new batch, asked memory pool for additional memory. Memory pool updates the usage and returns success. 2.2 CrossJoin has read another batch, and tries to reserve more memory again, memory pool does not have enough memory. Since CrossJoin operator has not implemented spilling, it will stop execution and return an error.

Scenario 3: For Aggregate operator, its intermediate states will also accumulate as the input size gets larger, but with spilling capability. When it tries to reserve more memory from the memory pool, and the memory pool has already reached the memory limit, it will return an error. Then, Aggregate operator will spill the intermediate buffers to disk, and release memory from the memory pool, and continue to retry memory reservation.

§Implementing MemoryPool

You can implement a custom allocation policy by implementing the MemoryPool trait and configuring a SessionContext appropriately. However, DataFusion comes with the following simple memory pool implementations that handle many common cases:

  • UnboundedMemoryPool: no memory limits (the default)

  • GreedyMemoryPool: Limits memory usage to a fixed size using a “first come first served” policy

  • FairSpillPool: Limits memory usage to a fixed size, allocating memory to all spilling operators fairly

Required Methods§

Source

fn grow(&self, reservation: &MemoryReservation, additional: usize)

Infallibly grow the provided reservation by additional bytes

This must always succeed

Source

fn shrink(&self, reservation: &MemoryReservation, shrink: usize)

Infallibly shrink the provided reservation by shrink bytes

Source

fn try_grow( &self, reservation: &MemoryReservation, additional: usize, ) -> Result<()>

Attempt to grow the provided reservation by additional bytes

On error the allocation will not be increased in size

Source

fn reserved(&self) -> usize

Return the total amount of memory reserved

Provided Methods§

Source

fn register(&self, _consumer: &MemoryConsumer)

Registers a new MemoryConsumer

Note: Subsequent calls to Self::grow must be made to reserve memory

Source

fn unregister(&self, _consumer: &MemoryConsumer)

Records the destruction of a MemoryReservation with MemoryConsumer

Note: Prior calls to Self::shrink must be made to free any reserved memory

Implementors§