Struct datafusion::datasource::physical_plan::parquet::ParquetExec
source · pub struct ParquetExec { /* private fields */ }
Expand description
Execution plan for reading one or more Parquet files.
▲
│
│ Produce a stream of
│ RecordBatches
│
┌───────────────────────┐
│ │
│ ParquetExec │
│ │
└───────────────────────┘
▲
│ Asynchronously read from one
│ or more parquet files via
│ ObjectStore interface
│
│
.───────────────────.
│ )
│`───────────────────'│
│ ObjectStore │
│.───────────────────.│
│ )
`───────────────────'
§Example: Create a ParquetExec
// Create a ParquetExec for reading `file1.parquet` with a file size of 100MB
let file_scan_config = FileScanConfig::new(object_store_url, file_schema)
.with_file(PartitionedFile::new("file1.parquet", 100*1024*1024));
let exec = ParquetExec::builder(file_scan_config)
// Provide a predicate for filtering row groups/pages
.with_predicate(predicate)
.build();
§Features
Supports the following optimizations:
-
Concurrent reads: Can read from one or more files in parallel as multiple partitions, including concurrently reading multiple row groups from a single file.
-
Predicate push down: skips row groups and pages based on min/max/null_counts in the row group metadata, the page index and bloom filters.
-
Projection pushdown: reads and decodes only the columns required.
-
Limit pushdown: stop execution early after some number of rows are read.
-
Custom readers: customize reading parquet files, e.g. to cache metadata, coalesce I/O operations, etc. See
ParquetFileReaderFactory
for more details. -
Schema adapters: read parquet files with different schemas into a unified table schema. This can be used to implement “schema evolution”. See
SchemaAdapterFactory
for more details. -
metadata_size_hint: controls the number of bytes read from the end of the file in the initial I/O when the default
ParquetFileReaderFactory
. If a custom reader is used, it supplies the metadata directly and this parameter is ignored.ParquetExecBuilder::with_metadata_size_hint
for more details. -
User provided
ParquetAccessPlan
s to skip row groups and/or pages based on external information. See “Implementing External Indexes” below
§Implementing External Indexes
It is possible to restrict the row groups and selections within those row
groups that the ParquetExec will consider by providing an initial
ParquetAccessPlan
as extensions
on PartitionedFile
. This can be
used to implement external indexes on top of parquet files and select only
portions of the files.
The ParquetExec
will try and reduce any provided ParquetAccessPlan
further based on the contents of ParquetMetadata
and other settings.
§Example of providing a ParquetAccessPlan
// create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
let mut access_plan = ParquetAccessPlan::new_all(5);
access_plan.skip(2);
access_plan.skip(4);
// provide the plan as extension to the FileScanConfig
let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
.with_extensions(Arc::new(access_plan));
// create a ParquetExec to scan this file
let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
.with_file(partitioned_file);
// this parquet exec will not even try to read row groups 2 and 4. Additional
// pruning based on predicates may also happen
let exec = ParquetExec::builder(file_scan_config).build();
For a complete example, see the [advanced_parquet_index
example]).
§Execution Overview
-
Step 1:
ParquetExec::execute
is called, returning aFileStream
configured to open parquet files with aParquetOpener
. -
Step 2: When the stream is polled, the
ParquetOpener
is called to open the file. -
Step 3: The
ParquetOpener
gets theParquetMetaData
(file metadata) viaParquetFileReaderFactory
, creating aParquetAccessPlan
by applying predicates to metadata. The plan and projections are used to determine what pages must be read. -
Step 4: The stream begins reading data, fetching the required pages and incrementally decoding them.
-
Step 5: As each [
RecordBatch]
is read, it may be adapted by aSchemaAdapter
to match the table schema. By default missing columns are filled with nulls, but this can be customized viaSchemaAdapterFactory
.
Implementations§
source§impl ParquetExec
impl ParquetExec
sourcepub fn new(
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
table_parquet_options: TableParquetOptions,
) -> Self
👎Deprecated since 39.0.0: use ParquetExec::builder
or ParquetExecBuilder
pub fn new( base_config: FileScanConfig, predicate: Option<Arc<dyn PhysicalExpr>>, metadata_size_hint: Option<usize>, table_parquet_options: TableParquetOptions, ) -> Self
ParquetExec::builder
or ParquetExecBuilder
Create a new Parquet reader execution plan provided file list and schema.
sourcepub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder
pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder
Return a ParquetExecBuilder
.
See example on ParquetExec
and ParquetExecBuilder
for specifying
parquet table options.
sourcepub fn base_config(&self) -> &FileScanConfig
pub fn base_config(&self) -> &FileScanConfig
FileScanConfig
that controls this scan (such as which files to read)
sourcepub fn table_parquet_options(&self) -> &TableParquetOptions
pub fn table_parquet_options(&self) -> &TableParquetOptions
Options passed to the parquet reader for this scan
sourcepub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>
Optional predicate.
sourcepub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>>
pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>>
Optional reference to this parquet scan’s pruning predicate
sourcepub fn with_parquet_file_reader_factory(
self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
) -> Self
pub fn with_parquet_file_reader_factory( self, parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, ) -> Self
Optional user defined parquet file reader factory.
See documentation on ParquetExecBuilder::with_parquet_file_reader_factory
sourcepub fn with_schema_adapter_factory(
self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self
pub fn with_schema_adapter_factory( self, schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, ) -> Self
Optional schema adapter factory.
See documentation on ParquetExecBuilder::with_schema_adapter_factory
sourcepub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self
pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self
sourcepub fn with_reorder_filters(self, reorder_filters: bool) -> Self
pub fn with_reorder_filters(self, reorder_filters: bool) -> Self
If true, the RowFilter
made by pushdown_filters
may try to
minimize the cost of filter evaluation by reordering the
predicate Expr
s. If false, the predicates are applied in
the same order as specified in the query. Defaults to false.
sourcepub fn with_enable_page_index(self, enable_page_index: bool) -> Self
pub fn with_enable_page_index(self, enable_page_index: bool) -> Self
If enabled, the reader will read the page index
This is used to optimise filter pushdown
via RowSelector
and RowFilter
by
eliminating unnecessary IO and decoding
sourcepub fn with_bloom_filter_on_read(self, bloom_filter_on_read: bool) -> Self
pub fn with_bloom_filter_on_read(self, bloom_filter_on_read: bool) -> Self
If enabled, the reader will read by the bloom filter
sourcepub fn with_bloom_filter_on_write(
self,
enable_bloom_filter_on_write: bool,
) -> Self
pub fn with_bloom_filter_on_write( self, enable_bloom_filter_on_write: bool, ) -> Self
If enabled, the writer will write by the bloom filter
Trait Implementations§
source§impl Clone for ParquetExec
impl Clone for ParquetExec
source§fn clone(&self) -> ParquetExec
fn clone(&self) -> ParquetExec
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl Debug for ParquetExec
impl Debug for ParquetExec
source§impl DisplayAs for ParquetExec
impl DisplayAs for ParquetExec
source§impl ExecutionPlan for ParquetExec
impl ExecutionPlan for ParquetExec
source§fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, target_partitions: usize, config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
Redistribute files across partitions according to their size
See comments on FileGroupPartitioner
for more detail.
source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan
, such as output
ordering(s), partitioning information etc. Read moresource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlan
s that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).source§fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, _: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan
where all existing children were replaced
by the children
, in ordersource§fn execute(
&self,
partition_index: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition_index: usize, ctx: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metric
s for this
ExecutionPlan
. If no Metric
s are available, return None. Read moresource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
ExecutionPlan
node. If statistics are not
available, should return Statistics::new_unknown
(the default), not
an error.source§fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan
node, if it supports
fetch limits. Returns None
otherwise.source§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name
but can be called without an instance.source§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan
, By default it’s [Distribution::UnspecifiedDistribution] for each child,source§fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>
ExecutionPlan
. Read moresource§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false
if this ExecutionPlan
’s implementation may reorder
rows within or between partitions. Read moresource§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan
benefits from increased
parallelization at its input for each child. Read moresource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Auto Trait Implementations§
impl Freeze for ParquetExec
impl !RefUnwindSafe for ParquetExec
impl Send for ParquetExec
impl Sync for ParquetExec
impl Unpin for ParquetExec
impl !UnwindSafe for ParquetExec
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more