Struct datafusion::datasource::physical_plan::parquet::ParquetExec

source ·
pub struct ParquetExec { /* private fields */ }
Expand description

Execution plan for reading one or more Parquet files.

            ▲
            │
            │  Produce a stream of
            │  RecordBatches
            │
┌───────────────────────┐
│                       │
│      ParquetExec      │
│                       │
└───────────────────────┘
            ▲
            │  Asynchronously read from one
            │  or more parquet files via
            │  ObjectStore interface
            │
            │
  .───────────────────.
 │                     )
 │`───────────────────'│
 │    ObjectStore      │
 │.───────────────────.│
 │                     )
  `───────────────────'

§Example: Create a ParquetExec

// Create a ParquetExec for reading `file1.parquet` with a file size of 100MB
let file_scan_config = FileScanConfig::new(object_store_url, file_schema)
   .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024));
let exec = ParquetExec::builder(file_scan_config)
  // Provide a predicate for filtering row groups/pages
  .with_predicate(predicate)
  .build();

§Features

Supports the following optimizations:

  • Concurrent reads: Can read from one or more files in parallel as multiple partitions, including concurrently reading multiple row groups from a single file.

  • Predicate push down: skips row groups and pages based on min/max/null_counts in the row group metadata, the page index and bloom filters.

  • Projection pushdown: reads and decodes only the columns required.

  • Limit pushdown: stop execution early after some number of rows are read.

  • Custom readers: customize reading parquet files, e.g. to cache metadata, coalesce I/O operations, etc. See ParquetFileReaderFactory for more details.

  • Schema adapters: read parquet files with different schemas into a unified table schema. This can be used to implement “schema evolution”. See SchemaAdapterFactory for more details.

  • metadata_size_hint: controls the number of bytes read from the end of the file in the initial I/O when the default ParquetFileReaderFactory. If a custom reader is used, it supplies the metadata directly and this parameter is ignored. ParquetExecBuilder::with_metadata_size_hint for more details.

  • User provided ParquetAccessPlans to skip row groups and/or pages based on external information. See “Implementing External Indexes” below

§Implementing External Indexes

It is possible to restrict the row groups and selections within those row groups that the ParquetExec will consider by providing an initial ParquetAccessPlan as extensions on PartitionedFile. This can be used to implement external indexes on top of parquet files and select only portions of the files.

The ParquetExec will try and reduce any provided ParquetAccessPlan further based on the contents of ParquetMetadata and other settings.

§Example of providing a ParquetAccessPlan

// create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
let mut access_plan = ParquetAccessPlan::new_all(5);
access_plan.skip(2);
access_plan.skip(4);
// provide the plan as extension to the FileScanConfig
let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
  .with_extensions(Arc::new(access_plan));
// create a ParquetExec to scan this file
let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema())
    .with_file(partitioned_file);
// this parquet exec will not even try to read row groups 2 and 4. Additional
// pruning based on predicates may also happen
let exec = ParquetExec::builder(file_scan_config).build();

For a complete example, see the [advanced_parquet_index example]).

§Execution Overview

  • Step 1: ParquetExec::execute is called, returning a FileStream configured to open parquet files with a ParquetOpener.

  • Step 2: When the stream is polled, the ParquetOpener is called to open the file.

  • Step 3: The ParquetOpener gets the ParquetMetaData (file metadata) via ParquetFileReaderFactory, creating a ParquetAccessPlan by applying predicates to metadata. The plan and projections are used to determine what pages must be read.

  • Step 4: The stream begins reading data, fetching the required pages and incrementally decoding them.

  • Step 5: As each [RecordBatch] is read, it may be adapted by a SchemaAdapter to match the table schema. By default missing columns are filled with nulls, but this can be customized via SchemaAdapterFactory.

Implementations§

source§

impl ParquetExec

source

pub fn new( base_config: FileScanConfig, predicate: Option<Arc<dyn PhysicalExpr>>, metadata_size_hint: Option<usize>, table_parquet_options: TableParquetOptions, ) -> Self

👎Deprecated since 39.0.0: use ParquetExec::builder or ParquetExecBuilder

Create a new Parquet reader execution plan provided file list and schema.

source

pub fn builder(file_scan_config: FileScanConfig) -> ParquetExecBuilder

Return a ParquetExecBuilder.

See example on ParquetExec and ParquetExecBuilder for specifying parquet table options.

source

pub fn base_config(&self) -> &FileScanConfig

FileScanConfig that controls this scan (such as which files to read)

source

pub fn table_parquet_options(&self) -> &TableParquetOptions

Options passed to the parquet reader for this scan

source

pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>

Optional predicate.

source

pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>>

Optional reference to this parquet scan’s pruning predicate

source

pub fn with_parquet_file_reader_factory( self, parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, ) -> Self

Optional user defined parquet file reader factory.

See documentation on ParquetExecBuilder::with_parquet_file_reader_factory

source

pub fn with_schema_adapter_factory( self, schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, ) -> Self

Optional schema adapter factory.

See documentation on ParquetExecBuilder::with_schema_adapter_factory

source

pub fn with_pushdown_filters(self, pushdown_filters: bool) -> Self

If true, any filter Exprs on the scan will converted to a RowFilter in the ParquetRecordBatchStream. These filters are applied by the parquet decoder to skip unecessairly decoding other columns which would not pass the predicate. Defaults to false

source

pub fn with_reorder_filters(self, reorder_filters: bool) -> Self

If true, the RowFilter made by pushdown_filters may try to minimize the cost of filter evaluation by reordering the predicate Exprs. If false, the predicates are applied in the same order as specified in the query. Defaults to false.

source

pub fn with_enable_page_index(self, enable_page_index: bool) -> Self

If enabled, the reader will read the page index This is used to optimise filter pushdown via RowSelector and RowFilter by eliminating unnecessary IO and decoding

source

pub fn with_bloom_filter_on_read(self, bloom_filter_on_read: bool) -> Self

If enabled, the reader will read by the bloom filter

source

pub fn with_bloom_filter_on_write( self, enable_bloom_filter_on_write: bool, ) -> Self

If enabled, the writer will write by the bloom filter

Trait Implementations§

source§

impl Clone for ParquetExec

source§

fn clone(&self) -> ParquetExec

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for ParquetExec

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl DisplayAs for ParquetExec

source§

fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> Result

Format according to DisplayFormatType, used when verbose representation looks different from the default one Read more
source§

impl ExecutionPlan for ParquetExec

source§

fn as_any(&self) -> &dyn Any

Return a reference to Any that can be used for downcasting

source§

fn repartitioned( &self, target_partitions: usize, config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>

Redistribute files across partitions according to their size See comments on FileGroupPartitioner for more detail.

source§

fn name(&self) -> &'static str

Short name for the ExecutionPlan, such as ‘ParquetExec’. Read more
source§

fn properties(&self) -> &PlanProperties

Return properties of the output of the ExecutionPlan, such as output ordering(s), partitioning information etc. Read more
source§

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>

Get a list of children ExecutionPlans that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).
source§

fn with_new_children( self: Arc<Self>, _: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>

Returns a new ExecutionPlan where all existing children were replaced by the children, in order
source§

fn execute( &self, partition_index: usize, ctx: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>

Begin execution of partition, returning a Stream of RecordBatches. Read more
source§

fn metrics(&self) -> Option<MetricsSet>

Return a snapshot of the set of Metrics for this ExecutionPlan. If no Metrics are available, return None. Read more
source§

fn statistics(&self) -> Result<Statistics>

Returns statistics for this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error.
source§

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>

Returns a fetching variant of this ExecutionPlan node, if it supports fetch limits. Returns None otherwise.
source§

fn static_name() -> &'static str
where Self: Sized,

Short name for the ExecutionPlan, such as ‘ParquetExec’. Like name but can be called without an instance.
source§

fn schema(&self) -> Arc<Schema>

Get the schema for this execution plan
source§

fn required_input_distribution(&self) -> Vec<Distribution>

Specifies the data distribution requirements for all the children for this ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,
source§

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>>

Specifies the ordering required for all of the children of this ExecutionPlan. Read more
source§

fn maintains_input_order(&self) -> Vec<bool>

Returns false if this ExecutionPlan’s implementation may reorder rows within or between partitions. Read more
source§

fn benefits_from_input_partitioning(&self) -> Vec<bool>

Specifies whether the ExecutionPlan benefits from increased parallelization at its input for each child. Read more
source§

fn supports_limit_pushdown(&self) -> bool

Returns true if a limit can be safely pushed down through this ExecutionPlan node. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

default unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V