datafusion_physical_plan/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19pub use crate::metrics::Metric;
20pub use crate::ordering::InputOrderMode;
21pub use crate::stream::EmptyRecordBatchStream;
22
23pub use datafusion_common::hash_utils;
24pub use datafusion_common::utils::project_schema;
25pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
26pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
27pub use datafusion_expr::{Accumulator, ColumnarValue};
28pub use datafusion_physical_expr::window::WindowExpr;
29pub use datafusion_physical_expr::{
30    expressions, udf, Distribution, Partitioning, PhysicalExpr,
31};
32
33use std::any::Any;
34use std::fmt::Debug;
35use std::sync::Arc;
36
37use crate::coalesce_partitions::CoalescePartitionsExec;
38use crate::display::DisplayableExecutionPlan;
39use crate::metrics::MetricsSet;
40use crate::projection::ProjectionExec;
41use crate::repartition::RepartitionExec;
42use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
43use crate::stream::RecordBatchStreamAdapter;
44
45use arrow::array::{Array, RecordBatch};
46use arrow::datatypes::SchemaRef;
47use datafusion_common::config::ConfigOptions;
48use datafusion_common::{exec_err, Constraints, Result};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
51use datafusion_physical_expr_common::sort_expr::LexRequirement;
52
53use futures::stream::{StreamExt, TryStreamExt};
54use tokio::task::JoinSet;
55
56/// Represent nodes in the DataFusion Physical Plan.
57///
58/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
59/// [`RecordBatch`] that incrementally computes a partition of the
60/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
61/// details on partitioning.
62///
63/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
64/// properties of the output to the DataFusion optimizer, and methods such as
65/// [`required_input_distribution`] and [`required_input_ordering`] express
66/// requirements of the `ExecutionPlan` from its input.
67///
68/// [`ExecutionPlan`] can be displayed in a simplified form using the
69/// return value from [`displayable`] in addition to the (normally
70/// quite verbose) `Debug` output.
71///
72/// [`execute`]: ExecutionPlan::execute
73/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
74/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
75pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
76    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
77    ///
78    /// Implementation note: this method can just proxy to
79    /// [`static_name`](ExecutionPlan::static_name) if no special action is
80    /// needed. It doesn't provide a default implementation like that because
81    /// this method doesn't require the `Sized` constrain to allow a wilder
82    /// range of use cases.
83    fn name(&self) -> &str;
84
85    /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
86    /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
87    fn static_name() -> &'static str
88    where
89        Self: Sized,
90    {
91        let full_name = std::any::type_name::<Self>();
92        let maybe_start_idx = full_name.rfind(':');
93        match maybe_start_idx {
94            Some(start_idx) => &full_name[start_idx + 1..],
95            None => "UNKNOWN",
96        }
97    }
98
99    /// Returns the execution plan as [`Any`] so that it can be
100    /// downcast to a specific implementation.
101    fn as_any(&self) -> &dyn Any;
102
103    /// Get the schema for this execution plan
104    fn schema(&self) -> SchemaRef {
105        Arc::clone(self.properties().schema())
106    }
107
108    /// Return properties of the output of the `ExecutionPlan`, such as output
109    /// ordering(s), partitioning information etc.
110    ///
111    /// This information is available via methods on [`ExecutionPlanProperties`]
112    /// trait, which is implemented for all `ExecutionPlan`s.
113    fn properties(&self) -> &PlanProperties;
114
115    /// Returns an error if this individual node does not conform to its invariants.
116    /// These invariants are typically only checked in debug mode.
117    ///
118    /// A default set of invariants is provided in the default implementation.
119    /// Extension nodes can provide their own invariants.
120    fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
121        Ok(())
122    }
123
124    /// Specifies the data distribution requirements for all the
125    /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
126    fn required_input_distribution(&self) -> Vec<Distribution> {
127        vec![Distribution::UnspecifiedDistribution; self.children().len()]
128    }
129
130    /// Specifies the ordering required for all of the children of this
131    /// `ExecutionPlan`.
132    ///
133    /// For each child, it's the local ordering requirement within
134    /// each partition rather than the global ordering
135    ///
136    /// NOTE that checking `!is_empty()` does **not** check for a
137    /// required input ordering. Instead, the correct check is that at
138    /// least one entry must be `Some`
139    fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
140        vec![None; self.children().len()]
141    }
142
143    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
144    /// rows within or between partitions.
145    ///
146    /// For example, Projection, Filter, and Limit maintain the order
147    /// of inputs -- they may transform values (Projection) or not
148    /// produce the same number of rows that went in (Filter and
149    /// Limit), but the rows that are produced go in the same way.
150    ///
151    /// DataFusion uses this metadata to apply certain optimizations
152    /// such as automatically repartitioning correctly.
153    ///
154    /// The default implementation returns `false`
155    ///
156    /// WARNING: if you override this default, you *MUST* ensure that
157    /// the `ExecutionPlan`'s maintains the ordering invariant or else
158    /// DataFusion may produce incorrect results.
159    fn maintains_input_order(&self) -> Vec<bool> {
160        vec![false; self.children().len()]
161    }
162
163    /// Specifies whether the `ExecutionPlan` benefits from increased
164    /// parallelization at its input for each child.
165    ///
166    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
167    /// its corresponding child (and thus from more parallelism). For
168    /// `ExecutionPlan` that do very little work the overhead of extra
169    /// parallelism may outweigh any benefits
170    ///
171    /// The default implementation returns `true` unless this `ExecutionPlan`
172    /// has signalled it requires a single child input partition.
173    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
174        // By default try to maximize parallelism with more CPUs if
175        // possible
176        self.required_input_distribution()
177            .into_iter()
178            .map(|dist| !matches!(dist, Distribution::SinglePartition))
179            .collect()
180    }
181
182    /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
183    /// The returned list will be empty for leaf nodes such as scans, will contain
184    /// a single value for unary nodes, or two values for binary nodes (such as
185    /// joins).
186    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
187
188    /// Returns a new `ExecutionPlan` where all existing children were replaced
189    /// by the `children`, in order
190    fn with_new_children(
191        self: Arc<Self>,
192        children: Vec<Arc<dyn ExecutionPlan>>,
193    ) -> Result<Arc<dyn ExecutionPlan>>;
194
195    /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
196    /// produce `target_partitions` partitions.
197    ///
198    /// If the `ExecutionPlan` does not support changing its partitioning,
199    /// returns `Ok(None)` (the default).
200    ///
201    /// It is the `ExecutionPlan` can increase its partitioning, but not to the
202    /// `target_partitions`, it may return an ExecutionPlan with fewer
203    /// partitions. This might happen, for example, if each new partition would
204    /// be too small to be efficiently processed individually.
205    ///
206    /// The DataFusion optimizer attempts to use as many threads as possible by
207    /// repartitioning its inputs to match the target number of threads
208    /// available (`target_partitions`). Some data sources, such as the built in
209    /// CSV and Parquet readers, implement this method as they are able to read
210    /// from their input files in parallel, regardless of how the source data is
211    /// split amongst files.
212    fn repartitioned(
213        &self,
214        _target_partitions: usize,
215        _config: &ConfigOptions,
216    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
217        Ok(None)
218    }
219
220    /// Begin execution of `partition`, returning a [`Stream`] of
221    /// [`RecordBatch`]es.
222    ///
223    /// # Notes
224    ///
225    /// The `execute` method itself is not `async` but it returns an `async`
226    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
227    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
228    /// Most `ExecutionPlan`s should not do any work before the first
229    /// `RecordBatch` is requested from the stream.
230    ///
231    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
232    /// [`Stream`] into a [`SendableRecordBatchStream`].
233    ///
234    /// Using `async` `Streams` allows for network I/O during execution and
235    /// takes advantage of Rust's built in support for `async` continuations and
236    /// crate ecosystem.
237    ///
238    /// [`Stream`]: futures::stream::Stream
239    /// [`StreamExt`]: futures::stream::StreamExt
240    /// [`TryStreamExt`]: futures::stream::TryStreamExt
241    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
242    ///
243    /// # Error handling
244    ///
245    /// Any error that occurs during execution is sent as an `Err` in the output
246    /// stream.
247    ///
248    /// `ExecutionPlan` implementations in DataFusion cancel additional work
249    /// immediately once an error occurs. The rationale is that if the overall
250    /// query will return an error,  any additional work such as continued
251    /// polling of inputs will be wasted as it will be thrown away.
252    ///
253    /// # Cancellation / Aborting Execution
254    ///
255    /// The [`Stream`] that is returned must ensure that any allocated resources
256    /// are freed when the stream itself is dropped. This is particularly
257    /// important for [`spawn`]ed tasks or threads. Unless care is taken to
258    /// "abort" such tasks, they may continue to consume resources even after
259    /// the plan is dropped, generating intermediate results that are never
260    /// used.
261    /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
262    ///
263    /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
264    /// for structures to help ensure all background tasks are cancelled.
265    ///
266    /// [`spawn`]: tokio::task::spawn
267    /// [`JoinSet`]: tokio::task::JoinSet
268    /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
269    /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
270    ///
271    /// # Implementation Examples
272    ///
273    /// While `async` `Stream`s have a non trivial learning curve, the
274    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
275    /// which help simplify many common operations.
276    ///
277    /// Here are some common patterns:
278    ///
279    /// ## Return Precomputed `RecordBatch`
280    ///
281    /// We can return a precomputed `RecordBatch` as a `Stream`:
282    ///
283    /// ```
284    /// # use std::sync::Arc;
285    /// # use arrow::array::RecordBatch;
286    /// # use arrow::datatypes::SchemaRef;
287    /// # use datafusion_common::Result;
288    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
289    /// # use datafusion_physical_plan::memory::MemoryStream;
290    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
291    /// struct MyPlan {
292    ///     batch: RecordBatch,
293    /// }
294    ///
295    /// impl MyPlan {
296    ///     fn execute(
297    ///         &self,
298    ///         partition: usize,
299    ///         context: Arc<TaskContext>
300    ///     ) -> Result<SendableRecordBatchStream> {
301    ///         // use functions from futures crate convert the batch into a stream
302    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
303    ///         let stream = futures::stream::once(fut);
304    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
305    ///     }
306    /// }
307    /// ```
308    ///
309    /// ## Lazily (async) Compute `RecordBatch`
310    ///
311    /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
312    ///
313    /// ```
314    /// # use std::sync::Arc;
315    /// # use arrow::array::RecordBatch;
316    /// # use arrow::datatypes::SchemaRef;
317    /// # use datafusion_common::Result;
318    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
319    /// # use datafusion_physical_plan::memory::MemoryStream;
320    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
321    /// struct MyPlan {
322    ///     schema: SchemaRef,
323    /// }
324    ///
325    /// /// Returns a single batch when the returned stream is polled
326    /// async fn get_batch() -> Result<RecordBatch> {
327    ///     todo!()
328    /// }
329    ///
330    /// impl MyPlan {
331    ///     fn execute(
332    ///         &self,
333    ///         partition: usize,
334    ///         context: Arc<TaskContext>
335    ///     ) -> Result<SendableRecordBatchStream> {
336    ///         let fut = get_batch();
337    ///         let stream = futures::stream::once(fut);
338    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
339    ///     }
340    /// }
341    /// ```
342    ///
343    /// ## Lazily (async) create a Stream
344    ///
345    /// If you need to create the return `Stream` using an `async` function,
346    /// you can do so by flattening the result:
347    ///
348    /// ```
349    /// # use std::sync::Arc;
350    /// # use arrow::array::RecordBatch;
351    /// # use arrow::datatypes::SchemaRef;
352    /// # use futures::TryStreamExt;
353    /// # use datafusion_common::Result;
354    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
355    /// # use datafusion_physical_plan::memory::MemoryStream;
356    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
357    /// struct MyPlan {
358    ///     schema: SchemaRef,
359    /// }
360    ///
361    /// /// async function that returns a stream
362    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
363    ///     todo!()
364    /// }
365    ///
366    /// impl MyPlan {
367    ///     fn execute(
368    ///         &self,
369    ///         partition: usize,
370    ///         context: Arc<TaskContext>
371    ///     ) -> Result<SendableRecordBatchStream> {
372    ///         // A future that yields a stream
373    ///         let fut = get_batch_stream();
374    ///         // Use TryStreamExt::try_flatten to flatten the stream of streams
375    ///         let stream = futures::stream::once(fut).try_flatten();
376    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
377    ///     }
378    /// }
379    /// ```
380    fn execute(
381        &self,
382        partition: usize,
383        context: Arc<TaskContext>,
384    ) -> Result<SendableRecordBatchStream>;
385
386    /// Return a snapshot of the set of [`Metric`]s for this
387    /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
388    ///
389    /// While the values of the metrics in the returned
390    /// [`MetricsSet`]s may change as execution progresses, the
391    /// specific metrics will not.
392    ///
393    /// Once `self.execute()` has returned (technically the future is
394    /// resolved) for all available partitions, the set of metrics
395    /// should be complete. If this function is called prior to
396    /// `execute()` new metrics may appear in subsequent calls.
397    fn metrics(&self) -> Option<MetricsSet> {
398        None
399    }
400
401    /// Returns statistics for this `ExecutionPlan` node. If statistics are not
402    /// available, should return [`Statistics::new_unknown`] (the default), not
403    /// an error.
404    ///
405    /// For TableScan executors, which supports filter pushdown, special attention
406    /// needs to be paid to whether the stats returned by this method are exact or not
407    fn statistics(&self) -> Result<Statistics> {
408        Ok(Statistics::new_unknown(&self.schema()))
409    }
410
411    /// Returns `true` if a limit can be safely pushed down through this
412    /// `ExecutionPlan` node.
413    ///
414    /// If this method returns `true`, and the query plan contains a limit at
415    /// the output of this node, DataFusion will push the limit to the input
416    /// of this node.
417    fn supports_limit_pushdown(&self) -> bool {
418        false
419    }
420
421    /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
422    /// fetch limits. Returns `None` otherwise.
423    fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
424        None
425    }
426
427    /// Gets the fetch count for the operator, `None` means there is no fetch.
428    fn fetch(&self) -> Option<usize> {
429        None
430    }
431
432    /// Gets the effect on cardinality, if known
433    fn cardinality_effect(&self) -> CardinalityEffect {
434        CardinalityEffect::Unknown
435    }
436
437    /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
438    ///
439    /// If the operator supports this optimization, the resulting plan will be:
440    /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
441    /// Otherwise, it returns the current `ExecutionPlan` as-is.
442    ///
443    /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
444    /// or not possible, or `Err` on failure.
445    fn try_swapping_with_projection(
446        &self,
447        _projection: &ProjectionExec,
448    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
449        Ok(None)
450    }
451}
452
453/// [`ExecutionPlan`] Invariant Level
454///
455/// What set of assertions ([Invariant]s)  holds for a particular `ExecutionPlan`
456///
457/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
458#[derive(Clone, Copy)]
459pub enum InvariantLevel {
460    /// Invariants that are always true for the [`ExecutionPlan`] node
461    /// such as the number of expected children.
462    Always,
463    /// Invariants that must hold true for the [`ExecutionPlan`] node
464    /// to be "executable", such as ordering and/or distribution requirements
465    /// being fulfilled.
466    Executable,
467}
468
469/// Extension trait provides an easy API to fetch various properties of
470/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
471pub trait ExecutionPlanProperties {
472    /// Specifies how the output of this `ExecutionPlan` is split into
473    /// partitions.
474    fn output_partitioning(&self) -> &Partitioning;
475
476    /// If the output of this `ExecutionPlan` within each partition is sorted,
477    /// returns `Some(keys)` describing the ordering. A `None` return value
478    /// indicates no assumptions should be made on the output ordering.
479    ///
480    /// For example, `SortExec` (obviously) produces sorted output as does
481    /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
482    /// output if its input is sorted as it does not reorder the input rows.
483    fn output_ordering(&self) -> Option<&LexOrdering>;
484
485    /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
486    /// For more details, see [`Boundedness`].
487    fn boundedness(&self) -> Boundedness;
488
489    /// Indicates how the stream of this `ExecutionPlan` emits its results.
490    /// For more details, see [`EmissionType`].
491    fn pipeline_behavior(&self) -> EmissionType;
492
493    /// Get the [`EquivalenceProperties`] within the plan.
494    ///
495    /// Equivalence properties tell DataFusion what columns are known to be
496    /// equal, during various optimization passes. By default, this returns "no
497    /// known equivalences" which is always correct, but may cause DataFusion to
498    /// unnecessarily resort data.
499    ///
500    /// If this ExecutionPlan makes no changes to the schema of the rows flowing
501    /// through it or how columns within each row relate to each other, it
502    /// should return the equivalence properties of its input. For
503    /// example, since `FilterExec` may remove rows from its input, but does not
504    /// otherwise modify them, it preserves its input equivalence properties.
505    /// However, since `ProjectionExec` may calculate derived expressions, it
506    /// needs special handling.
507    ///
508    /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
509    /// for related concepts.
510    fn equivalence_properties(&self) -> &EquivalenceProperties;
511}
512
513impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
514    fn output_partitioning(&self) -> &Partitioning {
515        self.properties().output_partitioning()
516    }
517
518    fn output_ordering(&self) -> Option<&LexOrdering> {
519        self.properties().output_ordering()
520    }
521
522    fn boundedness(&self) -> Boundedness {
523        self.properties().boundedness
524    }
525
526    fn pipeline_behavior(&self) -> EmissionType {
527        self.properties().emission_type
528    }
529
530    fn equivalence_properties(&self) -> &EquivalenceProperties {
531        self.properties().equivalence_properties()
532    }
533}
534
535impl ExecutionPlanProperties for &dyn ExecutionPlan {
536    fn output_partitioning(&self) -> &Partitioning {
537        self.properties().output_partitioning()
538    }
539
540    fn output_ordering(&self) -> Option<&LexOrdering> {
541        self.properties().output_ordering()
542    }
543
544    fn boundedness(&self) -> Boundedness {
545        self.properties().boundedness
546    }
547
548    fn pipeline_behavior(&self) -> EmissionType {
549        self.properties().emission_type
550    }
551
552    fn equivalence_properties(&self) -> &EquivalenceProperties {
553        self.properties().equivalence_properties()
554    }
555}
556
557/// Represents whether a stream of data **generated** by an operator is bounded (finite)
558/// or unbounded (infinite).
559///
560/// This is used to determine whether an execution plan will eventually complete
561/// processing all its data (bounded) or could potentially run forever (unbounded).
562///
563/// For unbounded streams, it also tracks whether the operator requires finite memory
564/// to process the stream or if memory usage could grow unbounded.
565///
566/// Boundedness of the output stream is based on the the boundedness of the input stream and the nature of
567/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum Boundedness {
570    /// The data stream is bounded (finite) and will eventually complete
571    Bounded,
572    /// The data stream is unbounded (infinite) and could run forever
573    Unbounded {
574        /// Whether this operator requires infinite memory to process the unbounded stream.
575        /// If false, the operator can process an infinite stream with bounded memory.
576        /// If true, memory usage may grow unbounded while processing the stream.
577        ///
578        /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
579        /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
580        requires_infinite_memory: bool,
581    },
582}
583
584impl Boundedness {
585    pub fn is_unbounded(&self) -> bool {
586        matches!(self, Boundedness::Unbounded { .. })
587    }
588}
589
590/// Represents how an operator emits its output records.
591///
592/// This is used to determine whether an operator emits records incrementally as they arrive,
593/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
594/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
595///
596/// For example, in the following plan:
597/// ```text
598///   SortExec [EmissionType::Final]
599///     |_ on: [col1 ASC]
600///     FilterExec [EmissionType::Incremental]
601///       |_ pred: col2 > 100
602///       DataSourceExec [EmissionType::Incremental]
603///         |_ file: "data.csv"
604/// ```
605/// - DataSourceExec emits records incrementally as it reads from the file
606/// - FilterExec processes and emits filtered records incrementally as they arrive
607/// - SortExec must wait for all input records before it can emit the sorted result,
608///   since it needs to see all values to determine their final order
609///
610/// Left joins can emit both incrementally and finally:
611/// - Incrementally emit matches as they are found
612/// - Finally emit non-matches after all input is processed
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614pub enum EmissionType {
615    /// Records are emitted incrementally as they arrive and are processed
616    Incremental,
617    /// Records are only emitted once all input has been processed
618    Final,
619    /// Records can be emitted both incrementally and as a final result
620    Both,
621}
622
623/// Utility to determine an operator's boundedness based on its children's boundedness.
624///
625/// Assumes boundedness can be inferred from child operators:
626/// - Unbounded (requires_infinite_memory: true) takes precedence.
627/// - Unbounded (requires_infinite_memory: false) is considered next.
628/// - Otherwise, the operator is bounded.
629///
630/// **Note:** This is a general-purpose utility and may not apply to
631/// all multi-child operators. Ensure your operator's behavior aligns
632/// with these assumptions before using.
633pub(crate) fn boundedness_from_children<'a>(
634    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
635) -> Boundedness {
636    let mut unbounded_with_finite_mem = false;
637
638    for child in children {
639        match child.boundedness() {
640            Boundedness::Unbounded {
641                requires_infinite_memory: true,
642            } => {
643                return Boundedness::Unbounded {
644                    requires_infinite_memory: true,
645                }
646            }
647            Boundedness::Unbounded {
648                requires_infinite_memory: false,
649            } => {
650                unbounded_with_finite_mem = true;
651            }
652            Boundedness::Bounded => {}
653        }
654    }
655
656    if unbounded_with_finite_mem {
657        Boundedness::Unbounded {
658            requires_infinite_memory: false,
659        }
660    } else {
661        Boundedness::Bounded
662    }
663}
664
665/// Determines the emission type of an operator based on its children's pipeline behavior.
666///
667/// The precedence of emission types is:
668/// - `Final` has the highest precedence.
669/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
670/// - `Incremental` is the default if all children emit incremental results.
671///
672/// **Note:** This is a general-purpose utility and may not apply to
673/// all multi-child operators. Verify your operator's behavior aligns
674/// with these assumptions.
675pub(crate) fn emission_type_from_children<'a>(
676    children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
677) -> EmissionType {
678    let mut inc_and_final = false;
679
680    for child in children {
681        match child.pipeline_behavior() {
682            EmissionType::Final => return EmissionType::Final,
683            EmissionType::Both => inc_and_final = true,
684            EmissionType::Incremental => continue,
685        }
686    }
687
688    if inc_and_final {
689        EmissionType::Both
690    } else {
691        EmissionType::Incremental
692    }
693}
694
695/// Stores certain, often expensive to compute, plan properties used in query
696/// optimization.
697///
698/// These properties are stored a single structure to permit this information to
699/// be computed once and then those cached results used multiple times without
700/// recomputation (aka a cache)
701#[derive(Debug, Clone)]
702pub struct PlanProperties {
703    /// See [ExecutionPlanProperties::equivalence_properties]
704    pub eq_properties: EquivalenceProperties,
705    /// See [ExecutionPlanProperties::output_partitioning]
706    pub partitioning: Partitioning,
707    /// See [ExecutionPlanProperties::pipeline_behavior]
708    pub emission_type: EmissionType,
709    /// See [ExecutionPlanProperties::boundedness]
710    pub boundedness: Boundedness,
711    /// See [ExecutionPlanProperties::output_ordering]
712    output_ordering: Option<LexOrdering>,
713}
714
715impl PlanProperties {
716    /// Construct a new `PlanPropertiesCache` from the
717    pub fn new(
718        eq_properties: EquivalenceProperties,
719        partitioning: Partitioning,
720        emission_type: EmissionType,
721        boundedness: Boundedness,
722    ) -> Self {
723        // Output ordering can be derived from `eq_properties`.
724        let output_ordering = eq_properties.output_ordering();
725        Self {
726            eq_properties,
727            partitioning,
728            emission_type,
729            boundedness,
730            output_ordering,
731        }
732    }
733
734    /// Overwrite output partitioning with its new value.
735    pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
736        self.partitioning = partitioning;
737        self
738    }
739
740    /// Overwrite equivalence properties with its new value.
741    pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
742        // Changing equivalence properties also changes output ordering, so
743        // make sure to overwrite it:
744        self.output_ordering = eq_properties.output_ordering();
745        self.eq_properties = eq_properties;
746        self
747    }
748
749    /// Overwrite boundedness with its new value.
750    pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
751        self.boundedness = boundedness;
752        self
753    }
754
755    /// Overwrite emission type with its new value.
756    pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
757        self.emission_type = emission_type;
758        self
759    }
760
761    /// Overwrite constraints with its new value.
762    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
763        self.eq_properties = self.eq_properties.with_constraints(constraints);
764        self
765    }
766
767    pub fn equivalence_properties(&self) -> &EquivalenceProperties {
768        &self.eq_properties
769    }
770
771    pub fn output_partitioning(&self) -> &Partitioning {
772        &self.partitioning
773    }
774
775    pub fn output_ordering(&self) -> Option<&LexOrdering> {
776        self.output_ordering.as_ref()
777    }
778
779    /// Get schema of the node.
780    pub(crate) fn schema(&self) -> &SchemaRef {
781        self.eq_properties.schema()
782    }
783}
784
785/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
786/// especially for the distributed engine to judge whether need to deal with shuffling.
787/// Currently there are 3 kinds of execution plan which needs data exchange
788///     1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
789///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
790///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
791pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
792    if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
793        !matches!(
794            repartition.properties().output_partitioning(),
795            Partitioning::RoundRobinBatch(_)
796        )
797    } else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
798    {
799        coalesce.input().output_partitioning().partition_count() > 1
800    } else if let Some(sort_preserving_merge) =
801        plan.as_any().downcast_ref::<SortPreservingMergeExec>()
802    {
803        sort_preserving_merge
804            .input()
805            .output_partitioning()
806            .partition_count()
807            > 1
808    } else {
809        false
810    }
811}
812
813/// Returns a copy of this plan if we change any child according to the pointer comparison.
814/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
815pub fn with_new_children_if_necessary(
816    plan: Arc<dyn ExecutionPlan>,
817    children: Vec<Arc<dyn ExecutionPlan>>,
818) -> Result<Arc<dyn ExecutionPlan>> {
819    let old_children = plan.children();
820    if children.len() != old_children.len() {
821        internal_err!("Wrong number of children")
822    } else if children.is_empty()
823        || children
824            .iter()
825            .zip(old_children.iter())
826            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
827    {
828        plan.with_new_children(children)
829    } else {
830        Ok(plan)
831    }
832}
833
834/// Return a [`DisplayableExecutionPlan`] wrapper around an
835/// [`ExecutionPlan`] which can be displayed in various easier to
836/// understand ways.
837///
838/// See examples on [`DisplayableExecutionPlan`]
839pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
840    DisplayableExecutionPlan::new(plan)
841}
842
843/// Execute the [ExecutionPlan] and collect the results in memory
844pub async fn collect(
845    plan: Arc<dyn ExecutionPlan>,
846    context: Arc<TaskContext>,
847) -> Result<Vec<RecordBatch>> {
848    let stream = execute_stream(plan, context)?;
849    crate::common::collect(stream).await
850}
851
852/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
853///
854/// See [collect] to buffer the `RecordBatch`es in memory.
855///
856/// # Aborting Execution
857///
858/// Dropping the stream will abort the execution of the query, and free up
859/// any allocated resources
860pub fn execute_stream(
861    plan: Arc<dyn ExecutionPlan>,
862    context: Arc<TaskContext>,
863) -> Result<SendableRecordBatchStream> {
864    match plan.output_partitioning().partition_count() {
865        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
866        1 => plan.execute(0, context),
867        2.. => {
868            // merge into a single partition
869            let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
870            // CoalescePartitionsExec must produce a single partition
871            assert_eq!(1, plan.properties().output_partitioning().partition_count());
872            plan.execute(0, context)
873        }
874    }
875}
876
877/// Execute the [ExecutionPlan] and collect the results in memory
878pub async fn collect_partitioned(
879    plan: Arc<dyn ExecutionPlan>,
880    context: Arc<TaskContext>,
881) -> Result<Vec<Vec<RecordBatch>>> {
882    let streams = execute_stream_partitioned(plan, context)?;
883
884    let mut join_set = JoinSet::new();
885    // Execute the plan and collect the results into batches.
886    streams.into_iter().enumerate().for_each(|(idx, stream)| {
887        join_set.spawn(async move {
888            let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
889            (idx, result)
890        });
891    });
892
893    let mut batches = vec![];
894    // Note that currently this doesn't identify the thread that panicked
895    //
896    // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
897    // once it is stable
898    while let Some(result) = join_set.join_next().await {
899        match result {
900            Ok((idx, res)) => batches.push((idx, res?)),
901            Err(e) => {
902                if e.is_panic() {
903                    std::panic::resume_unwind(e.into_panic());
904                } else {
905                    unreachable!();
906                }
907            }
908        }
909    }
910
911    batches.sort_by_key(|(idx, _)| *idx);
912    let batches = batches.into_iter().map(|(_, batch)| batch).collect();
913
914    Ok(batches)
915}
916
917/// Execute the [ExecutionPlan] and return a vec with one stream per output
918/// partition
919///
920/// # Aborting Execution
921///
922/// Dropping the stream will abort the execution of the query, and free up
923/// any allocated resources
924pub fn execute_stream_partitioned(
925    plan: Arc<dyn ExecutionPlan>,
926    context: Arc<TaskContext>,
927) -> Result<Vec<SendableRecordBatchStream>> {
928    let num_partitions = plan.output_partitioning().partition_count();
929    let mut streams = Vec::with_capacity(num_partitions);
930    for i in 0..num_partitions {
931        streams.push(plan.execute(i, Arc::clone(&context))?);
932    }
933    Ok(streams)
934}
935
936/// Executes an input stream and ensures that the resulting stream adheres to
937/// the `not null` constraints specified in the `sink_schema`.
938///
939/// # Arguments
940///
941/// * `input` - An execution plan
942/// * `sink_schema` - The schema to be applied to the output stream
943/// * `partition` - The partition index to be executed
944/// * `context` - The task context
945///
946/// # Returns
947///
948/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
949///
950/// This function first executes the given input plan for the specified partition
951/// and context. It then checks if there are any columns in the input that might
952/// violate the `not null` constraints specified in the `sink_schema`. If there are
953/// such columns, it wraps the resulting stream to enforce the `not null` constraints
954/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
955pub fn execute_input_stream(
956    input: Arc<dyn ExecutionPlan>,
957    sink_schema: SchemaRef,
958    partition: usize,
959    context: Arc<TaskContext>,
960) -> Result<SendableRecordBatchStream> {
961    let input_stream = input.execute(partition, context)?;
962
963    debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
964
965    // Find input columns that may violate the not null constraint.
966    let risky_columns: Vec<_> = sink_schema
967        .fields()
968        .iter()
969        .zip(input.schema().fields().iter())
970        .enumerate()
971        .filter_map(|(idx, (sink_field, input_field))| {
972            (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
973        })
974        .collect();
975
976    if risky_columns.is_empty() {
977        Ok(input_stream)
978    } else {
979        // Check not null constraint on the input stream
980        Ok(Box::pin(RecordBatchStreamAdapter::new(
981            sink_schema,
982            input_stream
983                .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
984        )))
985    }
986}
987
988/// Checks a `RecordBatch` for `not null` constraints on specified columns.
989///
990/// # Arguments
991///
992/// * `batch` - The `RecordBatch` to be checked
993/// * `column_indices` - A vector of column indices that should be checked for
994///   `not null` constraints.
995///
996/// # Returns
997///
998/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
999///
1000/// This function iterates over the specified column indices and ensures that none
1001/// of the columns contain null values. If any column contains null values, an error
1002/// is returned.
1003pub fn check_not_null_constraints(
1004    batch: RecordBatch,
1005    column_indices: &Vec<usize>,
1006) -> Result<RecordBatch> {
1007    for &index in column_indices {
1008        if batch.num_columns() <= index {
1009            return exec_err!(
1010                "Invalid batch column count {} expected > {}",
1011                batch.num_columns(),
1012                index
1013            );
1014        }
1015
1016        if batch
1017            .column(index)
1018            .logical_nulls()
1019            .map(|nulls| nulls.null_count())
1020            .unwrap_or_default()
1021            > 0
1022        {
1023            return exec_err!(
1024                "Invalid batch column at '{}' has null but schema specifies non-nullable",
1025                index
1026            );
1027        }
1028    }
1029
1030    Ok(batch)
1031}
1032
1033/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1034pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1035    let formatted = displayable(plan.as_ref()).indent(true).to_string();
1036    let actual: Vec<&str> = formatted.trim().lines().collect();
1037    actual.iter().map(|elem| elem.to_string()).collect()
1038}
1039
1040/// Indicates the effect an execution plan operator will have on the cardinality
1041/// of its input stream
1042pub enum CardinalityEffect {
1043    /// Unknown effect. This is the default
1044    Unknown,
1045    /// The operator is guaranteed to produce exactly one row for
1046    /// each input row
1047    Equal,
1048    /// The operator may produce fewer output rows than it receives input rows
1049    LowerEqual,
1050    /// The operator may produce more output rows than it receives input rows
1051    GreaterEqual,
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057    use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1058    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1059    use std::any::Any;
1060    use std::sync::Arc;
1061
1062    use datafusion_common::{Result, Statistics};
1063    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1064
1065    use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1066
1067    #[derive(Debug)]
1068    pub struct EmptyExec;
1069
1070    impl EmptyExec {
1071        pub fn new(_schema: SchemaRef) -> Self {
1072            Self
1073        }
1074    }
1075
1076    impl DisplayAs for EmptyExec {
1077        fn fmt_as(
1078            &self,
1079            _t: DisplayFormatType,
1080            _f: &mut std::fmt::Formatter,
1081        ) -> std::fmt::Result {
1082            unimplemented!()
1083        }
1084    }
1085
1086    impl ExecutionPlan for EmptyExec {
1087        fn name(&self) -> &'static str {
1088            Self::static_name()
1089        }
1090
1091        fn as_any(&self) -> &dyn Any {
1092            self
1093        }
1094
1095        fn properties(&self) -> &PlanProperties {
1096            unimplemented!()
1097        }
1098
1099        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1100            vec![]
1101        }
1102
1103        fn with_new_children(
1104            self: Arc<Self>,
1105            _: Vec<Arc<dyn ExecutionPlan>>,
1106        ) -> Result<Arc<dyn ExecutionPlan>> {
1107            unimplemented!()
1108        }
1109
1110        fn execute(
1111            &self,
1112            _partition: usize,
1113            _context: Arc<TaskContext>,
1114        ) -> Result<SendableRecordBatchStream> {
1115            unimplemented!()
1116        }
1117
1118        fn statistics(&self) -> Result<Statistics> {
1119            unimplemented!()
1120        }
1121    }
1122
1123    #[derive(Debug)]
1124    pub struct RenamedEmptyExec;
1125
1126    impl RenamedEmptyExec {
1127        pub fn new(_schema: SchemaRef) -> Self {
1128            Self
1129        }
1130    }
1131
1132    impl DisplayAs for RenamedEmptyExec {
1133        fn fmt_as(
1134            &self,
1135            _t: DisplayFormatType,
1136            _f: &mut std::fmt::Formatter,
1137        ) -> std::fmt::Result {
1138            unimplemented!()
1139        }
1140    }
1141
1142    impl ExecutionPlan for RenamedEmptyExec {
1143        fn name(&self) -> &'static str {
1144            Self::static_name()
1145        }
1146
1147        fn static_name() -> &'static str
1148        where
1149            Self: Sized,
1150        {
1151            "MyRenamedEmptyExec"
1152        }
1153
1154        fn as_any(&self) -> &dyn Any {
1155            self
1156        }
1157
1158        fn properties(&self) -> &PlanProperties {
1159            unimplemented!()
1160        }
1161
1162        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1163            vec![]
1164        }
1165
1166        fn with_new_children(
1167            self: Arc<Self>,
1168            _: Vec<Arc<dyn ExecutionPlan>>,
1169        ) -> Result<Arc<dyn ExecutionPlan>> {
1170            unimplemented!()
1171        }
1172
1173        fn execute(
1174            &self,
1175            _partition: usize,
1176            _context: Arc<TaskContext>,
1177        ) -> Result<SendableRecordBatchStream> {
1178            unimplemented!()
1179        }
1180
1181        fn statistics(&self) -> Result<Statistics> {
1182            unimplemented!()
1183        }
1184    }
1185
1186    #[test]
1187    fn test_execution_plan_name() {
1188        let schema1 = Arc::new(Schema::empty());
1189        let default_name_exec = EmptyExec::new(schema1);
1190        assert_eq!(default_name_exec.name(), "EmptyExec");
1191
1192        let schema2 = Arc::new(Schema::empty());
1193        let renamed_exec = RenamedEmptyExec::new(schema2);
1194        assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1195        assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1196    }
1197
1198    /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1199    /// be called from a trait object.
1200    /// Related ticket: https://github.com/apache/datafusion/pull/11047
1201    #[allow(dead_code)]
1202    fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1203        let _ = plan.name();
1204    }
1205
1206    #[test]
1207    fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1208        check_not_null_constraints(
1209            RecordBatch::try_new(
1210                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1211                vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1212            )?,
1213            &vec![0],
1214        )?;
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn test_check_not_null_constraints_reject_null() -> Result<()> {
1220        let result = check_not_null_constraints(
1221            RecordBatch::try_new(
1222                Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1223                vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1224            )?,
1225            &vec![0],
1226        );
1227        assert!(result.is_err());
1228        assert_eq!(
1229            result.err().unwrap().strip_backtrace(),
1230            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1231        );
1232        Ok(())
1233    }
1234
1235    #[test]
1236    fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1237        // some null value inside REE array
1238        let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1239        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1240        let run_end_array = RunArray::try_new(&run_ends, &values)?;
1241        let result = check_not_null_constraints(
1242            RecordBatch::try_new(
1243                Arc::new(Schema::new(vec![Field::new(
1244                    "a",
1245                    run_end_array.data_type().to_owned(),
1246                    true,
1247                )])),
1248                vec![Arc::new(run_end_array)],
1249            )?,
1250            &vec![0],
1251        );
1252        assert!(result.is_err());
1253        assert_eq!(
1254            result.err().unwrap().strip_backtrace(),
1255            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1256        );
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1262        let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1263        let keys = Int32Array::from(vec![0, 1, 2, 3]);
1264        let dictionary = DictionaryArray::new(keys, values);
1265        let result = check_not_null_constraints(
1266            RecordBatch::try_new(
1267                Arc::new(Schema::new(vec![Field::new(
1268                    "a",
1269                    dictionary.data_type().to_owned(),
1270                    true,
1271                )])),
1272                vec![Arc::new(dictionary)],
1273            )?,
1274            &vec![0],
1275        );
1276        assert!(result.is_err());
1277        assert_eq!(
1278            result.err().unwrap().strip_backtrace(),
1279            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1280        );
1281        Ok(())
1282    }
1283
1284    #[test]
1285    fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1286        // some null value marked out by dictionary array
1287        let values = Arc::new(Int32Array::from(vec![
1288            Some(1),
1289            None, // this null value is masked by dictionary keys
1290            Some(3),
1291            Some(4),
1292        ]));
1293        let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1294        let dictionary = DictionaryArray::new(keys, values);
1295        check_not_null_constraints(
1296            RecordBatch::try_new(
1297                Arc::new(Schema::new(vec![Field::new(
1298                    "a",
1299                    dictionary.data_type().to_owned(),
1300                    true,
1301                )])),
1302                vec![Arc::new(dictionary)],
1303            )?,
1304            &vec![0],
1305        )?;
1306        Ok(())
1307    }
1308
1309    #[test]
1310    fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1311        // null value of Null type
1312        let result = check_not_null_constraints(
1313            RecordBatch::try_new(
1314                Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1315                vec![Arc::new(NullArray::new(3))],
1316            )?,
1317            &vec![0],
1318        );
1319        assert!(result.is_err());
1320        assert_eq!(
1321            result.err().unwrap().strip_backtrace(),
1322            "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1323        );
1324        Ok(())
1325    }
1326}