Expand description
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s target users are developers building fast and feature rich database and analytic systems, customized to particular workloads. See use cases for examples.
“Out of the box,” DataFusion offers SQL and Dataframe
APIs,
excellent performance, built-in support for CSV, Parquet, JSON, and Avro,
extensive customization, and a great community.
Python Bindings are also available.
DataFusion features a full query planner, a columnar, streaming, multi-threaded, vectorized execution engine, and partitioned data sources. You can customize DataFusion at almost all points including additional data sources, query languages, functions, custom operators and more. See the Architecture section below for more details.
§Examples
The main entry point for interacting with DataFusion is the
SessionContext
. Expr
s represent expressions such as a + b
.
§DataFrame
To execute a query against data stored
in a CSV file using a DataFrame
:
let ctx = SessionContext::new();
// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
§SQL
To execute a query against a CSV file using SQL:
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;
// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
.to_string();
let expected = vec![
"+---+----------------+",
"| a | min(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+"
];
assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);
§More Examples
There are many additional annotated examples of using DataFusion in the datafusion-examples directory.
§Architecture
You can find a formal description of DataFusion’s architecture in our SIGMOD 2024 Paper.
§Design Goals
DataFusion’s Architecture Goals are:
-
Work “out of the box”: Provide a very fast, world class query engine with minimal setup or required configuration.
-
Customizable everything: All behavior should be customizable by implementing traits.
-
Architecturally boring 🥱: Follow industrial best practice rather than trying cutting edge, but unproven, techniques.
With these principles, users start with a basic, high-performance engine and specialize it over time to suit their needs and available engineering capacity.
§Overview Presentations
The following presentations offer high level overviews of the different components and how they interact together.
- [Apr 2023]: The Apache DataFusion Architecture talks
- [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
- [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
- [February 2021]: How DataFusion is used within the Ballista Project is described in Ballista: Distributed Compute with Rust and Apache Arrow: recording
§Customization and Extension
DataFusion is designed to be highly extensible, so you can
start with a working, full featured engine, and then
specialize any behavior for your usecase. For example,
some projects may add custom ExecutionPlan
operators, or create their own
query language that directly creates LogicalPlan
rather than using the
built in SQL planner, SqlToRel
.
In order to achieve this, DataFusion supports extension at many points:
- read from any datasource (
TableProvider
) - define your own catalogs, schemas, and table lists (
catalog
andCatalogProvider
) - build your own query language or plans (
LogicalPlanBuilder
) - declare and use user-defined functions (
ScalarUDF
, andAggregateUDF
,WindowUDF
) - add custom plan rewrite passes (
AnalyzerRule
,OptimizerRule
andPhysicalOptimizerRule
) - extend the planner to use user-defined logical and physical nodes (
QueryPlanner
)
You can find examples of each of them in the datafusion-examples directory.
§Query Planning and Execution Overview
§SQL
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes
-
The query string is parsed to an Abstract Syntax Tree (AST)
Statement
using sqlparser. -
The AST is converted to a
LogicalPlan
and logical expressionsExpr
s to compute the desired result by theSqlToRel
planner.
§DataFrame
When executing plans using the DataFrame
API, the process is
identical as with SQL, except the DataFrame API builds the
LogicalPlan
directly using LogicalPlanBuilder
. Systems
that have their own custom query languages typically also build
LogicalPlan
directly.
§Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
To process large datasets with many rows as efficiently as possible, significant effort is spent planning and optimizing, in the following manner:
-
The
LogicalPlan
is checked and rewritten to enforce semantic rules, such as type coercion, byAnalyzerRule
s -
The
LogicalPlan
is rewritten byOptimizerRule
s, such as projection and filter pushdown, to improve its efficiency. -
The
LogicalPlan
is converted to anExecutionPlan
by aPhysicalPlanner
-
The
ExecutionPlan
is rewritten byPhysicalOptimizerRule
s, such as sort and join selection, to improve its efficiency.
§Data Sources
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlan
DataFusion includes several built in data sources for common use
cases, and can be extended by implementing the TableProvider
trait. A TableProvider
provides information for planning and
an ExecutionPlan
s for execution.
-
ListingTable
: Reads data from Parquet, JSON, CSV, or AVRO files. Supports single files or multiple files with HIVE style partitioning, optional compression, directly reading from remote object store and more. -
MemTable
: Reads data from in memoryRecordBatch
es. -
StreamingTable
: Reads data from potentially unbounded inputs.
§Plan Representations
§Logical Plans
Logical planning yields LogicalPlan
nodes and Expr
representing expressions which are Schema
aware and represent statements
independent of how they are physically executed.
A LogicalPlan
is a Directed Acyclic Graph (DAG) of other
LogicalPlan
s, each potentially containing embedded Expr
s.
LogicalPlan
s can be rewritten with TreeNode
API, see the
tree_node module
for more details.
Expr
s can also be rewritten with TreeNode
API and simplified using
ExprSimplifier
. Examples of working with and executing Expr
s can be
found in the expr_api
.rs example
§Physical Plans
An ExecutionPlan
(sometimes referred to as a “physical plan”)
is a plan that can be executed against data. It a DAG of other
ExecutionPlan
s each potentially containing expressions that implement the
PhysicalExpr
trait.
Compared to a LogicalPlan
, an ExecutionPlan
has additional concrete
information about how to perform calculations (e.g. hash vs merge
join), and how data flows during execution (e.g. partitioning and
sortedness).
cp_solver performs range propagation analysis on PhysicalExpr
s and
PruningPredicate
can prove certain boolean PhysicalExpr
s used for
filtering can never be true
using additional statistical information.
§Execution
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─
ExecutionPlan
s process data using the Apache Arrow memory
format, making heavy use of functions from the arrow
crate. Values are represented with ColumnarValue
, which are either
ScalarValue
(single constant values) or ArrayRef
(Arrow
Arrays).
Calling execute
produces 1 or more partitions of data,
as a SendableRecordBatchStream
, which implements a pull based execution
API. Calling .next().await
will incrementally compute and return the next
RecordBatch
. Balanced parallelism is achieved using Volcano style
“Exchange” operations implemented by RepartitionExec
.
While some recent research such as Morsel-Driven Parallelism describes challenges with the pull style Volcano execution model on NUMA architectures, in practice DataFusion achieves similar scalability as systems that use morsel driven approach such as DuckDB. See the [DataFusion paper submitted to SIGMOD] for more details.
See the implementors of ExecutionPlan
for a list of physical operators available.
§Thread Scheduling
DataFusion incrementally computes output from a SendableRecordBatchStream
with target_partitions
threads. Parallelism is implementing using multiple
Tokio task
s, which are executed by threads managed by a tokio Runtime.
While tokio is most commonly used
for asynchronous network I/O, its combination of an efficient, work-stealing
scheduler, first class compiler support for automatic continuation generation,
and exceptional performance makes it a compelling choice for CPU intensive
applications as well. This is explained in more detail in Using Rustlang’s Async Tokio
Runtime for CPU-Bound Tasks.
§State Management and Configuration
ConfigOptions
contain options to control DataFusion’s
execution.
The state required to execute queries is managed by the following structures:
-
SessionContext
: State needed for createLogicalPlan
s such as the table definitions, and the function registries. -
TaskContext
: State needed for execution such as theMemoryPool
,DiskManager
, andObjectStoreRegistry
. -
ExecutionProps
: Per-execution properties and data (such as starting timestamps, etc).
§Resource Management
The amount of memory and temporary local disk space used by
DataFusion when running a plan can be controlled using the
MemoryPool
and DiskManager
. Other runtime options can be
found on RuntimeEnv
.
§Crate Organization
DataFusion is organized into multiple crates to enforce modularity and improve compilation times. The crates are:
- datafusion_common: Common traits and types
- datafusion_execution: State and structures needed for execution
- datafusion_expr:
LogicalPlan
,Expr
and related logical planning structure - datafusion_functions: Scalar function packages
- datafusion_functions_nested: Scalar function packages for
ARRAY
s,MAP
s andSTRUCT
s - datafusion_optimizer:
OptimizerRule
s andAnalyzerRule
s - datafusion_physical_expr:
PhysicalExpr
and related expressions - datafusion_physical_plan:
ExecutionPlan
and related expressions - datafusion_sql: SQL planner (
SqlToRel
)
§Citing DataFusion in Academic Papers
You can use the following citation to reference DataFusion in academic papers:
@inproceedings{lamb2024apache
title={Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine},
author={Lamb, Andrew and Shen, Yijie and Heres, Dani{\"e}l and Chakraborty, Jayjeet and Kabak, Mehmet Ozan and Hsieh, Liang-Chi and Sun, Chao},
booktitle={Companion of the 2024 International Conference on Management of Data},
pages={5--17},
year={2024}
}
Re-exports§
Modules§
- re-export of
datafusion_catalog
crate - Interfaces and default implementations of catalogs and schemas.
- re-export of
datafusion_common
crate - Runtime configuration, via
ConfigOptions
DataFrame
API for building and executing query plans.- DataFusion data sources:
TableProvider
andListingTable
- DataFusion error type
DataFusionError
andResult
. - Shared state for query planning and execution.
- re-export of
datafusion_functions
crate - re-export of
datafusion_functions_aggregate
crate - functions_
array Deprecated re-export ofdatafusion_functions_nested
crate asfunctions_array
for backward compatibility, if “nested_expressions” feature is enabled - re-export of
datafusion_functions_nested
crate, if “nested_expressions” feature is enabled - re-export of
datafusion_functions_window
crate - re-export of
datafusion_expr
crate - re-export of
datafusion_optimizer
crate - re-export of
datafusion_physical_expr
crate - re-export of
datafusion_physical_expr
crate - Optimizer that rewrites
ExecutionPlan
s. - re-export of
datafusion_physical_plan
crate - Planner for
LogicalPlan
toExecutionPlan
- DataFusion “prelude” to simplify importing common types.
ScalarValue
single value representation.- re-export of
datafusion_sql
crate - Utility functions to make testing DataFusion based crates easier
- re-export of variable provider for
@name
and@@name
style runtime values.
Macros§
- Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
- Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line
Constants§
- DataFusion crate version