datafusion_physical_plan/execution_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
19pub use crate::metrics::Metric;
20pub use crate::ordering::InputOrderMode;
21pub use crate::stream::EmptyRecordBatchStream;
22
23pub use datafusion_common::hash_utils;
24pub use datafusion_common::utils::project_schema;
25pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
26pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
27pub use datafusion_expr::{Accumulator, ColumnarValue};
28pub use datafusion_physical_expr::window::WindowExpr;
29pub use datafusion_physical_expr::{
30 expressions, udf, Distribution, Partitioning, PhysicalExpr,
31};
32
33use std::any::Any;
34use std::fmt::Debug;
35use std::sync::Arc;
36
37use crate::coalesce_partitions::CoalescePartitionsExec;
38use crate::display::DisplayableExecutionPlan;
39use crate::metrics::MetricsSet;
40use crate::projection::ProjectionExec;
41use crate::repartition::RepartitionExec;
42use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
43use crate::stream::RecordBatchStreamAdapter;
44
45use arrow::array::{Array, RecordBatch};
46use arrow::datatypes::SchemaRef;
47use datafusion_common::config::ConfigOptions;
48use datafusion_common::{exec_err, Constraints, Result};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
51use datafusion_physical_expr_common::sort_expr::LexRequirement;
52
53use futures::stream::{StreamExt, TryStreamExt};
54use tokio::task::JoinSet;
55
56/// Represent nodes in the DataFusion Physical Plan.
57///
58/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
59/// [`RecordBatch`] that incrementally computes a partition of the
60/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
61/// details on partitioning.
62///
63/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
64/// properties of the output to the DataFusion optimizer, and methods such as
65/// [`required_input_distribution`] and [`required_input_ordering`] express
66/// requirements of the `ExecutionPlan` from its input.
67///
68/// [`ExecutionPlan`] can be displayed in a simplified form using the
69/// return value from [`displayable`] in addition to the (normally
70/// quite verbose) `Debug` output.
71///
72/// [`execute`]: ExecutionPlan::execute
73/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
74/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
75pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
76 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
77 ///
78 /// Implementation note: this method can just proxy to
79 /// [`static_name`](ExecutionPlan::static_name) if no special action is
80 /// needed. It doesn't provide a default implementation like that because
81 /// this method doesn't require the `Sized` constrain to allow a wilder
82 /// range of use cases.
83 fn name(&self) -> &str;
84
85 /// Short name for the ExecutionPlan, such as 'DataSourceExec'.
86 /// Like [`name`](ExecutionPlan::name) but can be called without an instance.
87 fn static_name() -> &'static str
88 where
89 Self: Sized,
90 {
91 let full_name = std::any::type_name::<Self>();
92 let maybe_start_idx = full_name.rfind(':');
93 match maybe_start_idx {
94 Some(start_idx) => &full_name[start_idx + 1..],
95 None => "UNKNOWN",
96 }
97 }
98
99 /// Returns the execution plan as [`Any`] so that it can be
100 /// downcast to a specific implementation.
101 fn as_any(&self) -> &dyn Any;
102
103 /// Get the schema for this execution plan
104 fn schema(&self) -> SchemaRef {
105 Arc::clone(self.properties().schema())
106 }
107
108 /// Return properties of the output of the `ExecutionPlan`, such as output
109 /// ordering(s), partitioning information etc.
110 ///
111 /// This information is available via methods on [`ExecutionPlanProperties`]
112 /// trait, which is implemented for all `ExecutionPlan`s.
113 fn properties(&self) -> &PlanProperties;
114
115 /// Returns an error if this individual node does not conform to its invariants.
116 /// These invariants are typically only checked in debug mode.
117 ///
118 /// A default set of invariants is provided in the default implementation.
119 /// Extension nodes can provide their own invariants.
120 fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
121 Ok(())
122 }
123
124 /// Specifies the data distribution requirements for all the
125 /// children for this `ExecutionPlan`, By default it's [[Distribution::UnspecifiedDistribution]] for each child,
126 fn required_input_distribution(&self) -> Vec<Distribution> {
127 vec![Distribution::UnspecifiedDistribution; self.children().len()]
128 }
129
130 /// Specifies the ordering required for all of the children of this
131 /// `ExecutionPlan`.
132 ///
133 /// For each child, it's the local ordering requirement within
134 /// each partition rather than the global ordering
135 ///
136 /// NOTE that checking `!is_empty()` does **not** check for a
137 /// required input ordering. Instead, the correct check is that at
138 /// least one entry must be `Some`
139 fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
140 vec![None; self.children().len()]
141 }
142
143 /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
144 /// rows within or between partitions.
145 ///
146 /// For example, Projection, Filter, and Limit maintain the order
147 /// of inputs -- they may transform values (Projection) or not
148 /// produce the same number of rows that went in (Filter and
149 /// Limit), but the rows that are produced go in the same way.
150 ///
151 /// DataFusion uses this metadata to apply certain optimizations
152 /// such as automatically repartitioning correctly.
153 ///
154 /// The default implementation returns `false`
155 ///
156 /// WARNING: if you override this default, you *MUST* ensure that
157 /// the `ExecutionPlan`'s maintains the ordering invariant or else
158 /// DataFusion may produce incorrect results.
159 fn maintains_input_order(&self) -> Vec<bool> {
160 vec![false; self.children().len()]
161 }
162
163 /// Specifies whether the `ExecutionPlan` benefits from increased
164 /// parallelization at its input for each child.
165 ///
166 /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
167 /// its corresponding child (and thus from more parallelism). For
168 /// `ExecutionPlan` that do very little work the overhead of extra
169 /// parallelism may outweigh any benefits
170 ///
171 /// The default implementation returns `true` unless this `ExecutionPlan`
172 /// has signalled it requires a single child input partition.
173 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
174 // By default try to maximize parallelism with more CPUs if
175 // possible
176 self.required_input_distribution()
177 .into_iter()
178 .map(|dist| !matches!(dist, Distribution::SinglePartition))
179 .collect()
180 }
181
182 /// Get a list of children `ExecutionPlan`s that act as inputs to this plan.
183 /// The returned list will be empty for leaf nodes such as scans, will contain
184 /// a single value for unary nodes, or two values for binary nodes (such as
185 /// joins).
186 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
187
188 /// Returns a new `ExecutionPlan` where all existing children were replaced
189 /// by the `children`, in order
190 fn with_new_children(
191 self: Arc<Self>,
192 children: Vec<Arc<dyn ExecutionPlan>>,
193 ) -> Result<Arc<dyn ExecutionPlan>>;
194
195 /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
196 /// produce `target_partitions` partitions.
197 ///
198 /// If the `ExecutionPlan` does not support changing its partitioning,
199 /// returns `Ok(None)` (the default).
200 ///
201 /// It is the `ExecutionPlan` can increase its partitioning, but not to the
202 /// `target_partitions`, it may return an ExecutionPlan with fewer
203 /// partitions. This might happen, for example, if each new partition would
204 /// be too small to be efficiently processed individually.
205 ///
206 /// The DataFusion optimizer attempts to use as many threads as possible by
207 /// repartitioning its inputs to match the target number of threads
208 /// available (`target_partitions`). Some data sources, such as the built in
209 /// CSV and Parquet readers, implement this method as they are able to read
210 /// from their input files in parallel, regardless of how the source data is
211 /// split amongst files.
212 fn repartitioned(
213 &self,
214 _target_partitions: usize,
215 _config: &ConfigOptions,
216 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
217 Ok(None)
218 }
219
220 /// Begin execution of `partition`, returning a [`Stream`] of
221 /// [`RecordBatch`]es.
222 ///
223 /// # Notes
224 ///
225 /// The `execute` method itself is not `async` but it returns an `async`
226 /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
227 /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
228 /// Most `ExecutionPlan`s should not do any work before the first
229 /// `RecordBatch` is requested from the stream.
230 ///
231 /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
232 /// [`Stream`] into a [`SendableRecordBatchStream`].
233 ///
234 /// Using `async` `Streams` allows for network I/O during execution and
235 /// takes advantage of Rust's built in support for `async` continuations and
236 /// crate ecosystem.
237 ///
238 /// [`Stream`]: futures::stream::Stream
239 /// [`StreamExt`]: futures::stream::StreamExt
240 /// [`TryStreamExt`]: futures::stream::TryStreamExt
241 /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
242 ///
243 /// # Error handling
244 ///
245 /// Any error that occurs during execution is sent as an `Err` in the output
246 /// stream.
247 ///
248 /// `ExecutionPlan` implementations in DataFusion cancel additional work
249 /// immediately once an error occurs. The rationale is that if the overall
250 /// query will return an error, any additional work such as continued
251 /// polling of inputs will be wasted as it will be thrown away.
252 ///
253 /// # Cancellation / Aborting Execution
254 ///
255 /// The [`Stream`] that is returned must ensure that any allocated resources
256 /// are freed when the stream itself is dropped. This is particularly
257 /// important for [`spawn`]ed tasks or threads. Unless care is taken to
258 /// "abort" such tasks, they may continue to consume resources even after
259 /// the plan is dropped, generating intermediate results that are never
260 /// used.
261 /// Thus, [`spawn`] is disallowed, and instead use [`SpawnedTask`].
262 ///
263 /// For more details see [`SpawnedTask`], [`JoinSet`] and [`RecordBatchReceiverStreamBuilder`]
264 /// for structures to help ensure all background tasks are cancelled.
265 ///
266 /// [`spawn`]: tokio::task::spawn
267 /// [`JoinSet`]: tokio::task::JoinSet
268 /// [`SpawnedTask`]: datafusion_common_runtime::SpawnedTask
269 /// [`RecordBatchReceiverStreamBuilder`]: crate::stream::RecordBatchReceiverStreamBuilder
270 ///
271 /// # Implementation Examples
272 ///
273 /// While `async` `Stream`s have a non trivial learning curve, the
274 /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
275 /// which help simplify many common operations.
276 ///
277 /// Here are some common patterns:
278 ///
279 /// ## Return Precomputed `RecordBatch`
280 ///
281 /// We can return a precomputed `RecordBatch` as a `Stream`:
282 ///
283 /// ```
284 /// # use std::sync::Arc;
285 /// # use arrow::array::RecordBatch;
286 /// # use arrow::datatypes::SchemaRef;
287 /// # use datafusion_common::Result;
288 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
289 /// # use datafusion_physical_plan::memory::MemoryStream;
290 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
291 /// struct MyPlan {
292 /// batch: RecordBatch,
293 /// }
294 ///
295 /// impl MyPlan {
296 /// fn execute(
297 /// &self,
298 /// partition: usize,
299 /// context: Arc<TaskContext>
300 /// ) -> Result<SendableRecordBatchStream> {
301 /// // use functions from futures crate convert the batch into a stream
302 /// let fut = futures::future::ready(Ok(self.batch.clone()));
303 /// let stream = futures::stream::once(fut);
304 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
305 /// }
306 /// }
307 /// ```
308 ///
309 /// ## Lazily (async) Compute `RecordBatch`
310 ///
311 /// We can also lazily compute a `RecordBatch` when the returned `Stream` is polled
312 ///
313 /// ```
314 /// # use std::sync::Arc;
315 /// # use arrow::array::RecordBatch;
316 /// # use arrow::datatypes::SchemaRef;
317 /// # use datafusion_common::Result;
318 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
319 /// # use datafusion_physical_plan::memory::MemoryStream;
320 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
321 /// struct MyPlan {
322 /// schema: SchemaRef,
323 /// }
324 ///
325 /// /// Returns a single batch when the returned stream is polled
326 /// async fn get_batch() -> Result<RecordBatch> {
327 /// todo!()
328 /// }
329 ///
330 /// impl MyPlan {
331 /// fn execute(
332 /// &self,
333 /// partition: usize,
334 /// context: Arc<TaskContext>
335 /// ) -> Result<SendableRecordBatchStream> {
336 /// let fut = get_batch();
337 /// let stream = futures::stream::once(fut);
338 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
339 /// }
340 /// }
341 /// ```
342 ///
343 /// ## Lazily (async) create a Stream
344 ///
345 /// If you need to create the return `Stream` using an `async` function,
346 /// you can do so by flattening the result:
347 ///
348 /// ```
349 /// # use std::sync::Arc;
350 /// # use arrow::array::RecordBatch;
351 /// # use arrow::datatypes::SchemaRef;
352 /// # use futures::TryStreamExt;
353 /// # use datafusion_common::Result;
354 /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
355 /// # use datafusion_physical_plan::memory::MemoryStream;
356 /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
357 /// struct MyPlan {
358 /// schema: SchemaRef,
359 /// }
360 ///
361 /// /// async function that returns a stream
362 /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
363 /// todo!()
364 /// }
365 ///
366 /// impl MyPlan {
367 /// fn execute(
368 /// &self,
369 /// partition: usize,
370 /// context: Arc<TaskContext>
371 /// ) -> Result<SendableRecordBatchStream> {
372 /// // A future that yields a stream
373 /// let fut = get_batch_stream();
374 /// // Use TryStreamExt::try_flatten to flatten the stream of streams
375 /// let stream = futures::stream::once(fut).try_flatten();
376 /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
377 /// }
378 /// }
379 /// ```
380 fn execute(
381 &self,
382 partition: usize,
383 context: Arc<TaskContext>,
384 ) -> Result<SendableRecordBatchStream>;
385
386 /// Return a snapshot of the set of [`Metric`]s for this
387 /// [`ExecutionPlan`]. If no `Metric`s are available, return None.
388 ///
389 /// While the values of the metrics in the returned
390 /// [`MetricsSet`]s may change as execution progresses, the
391 /// specific metrics will not.
392 ///
393 /// Once `self.execute()` has returned (technically the future is
394 /// resolved) for all available partitions, the set of metrics
395 /// should be complete. If this function is called prior to
396 /// `execute()` new metrics may appear in subsequent calls.
397 fn metrics(&self) -> Option<MetricsSet> {
398 None
399 }
400
401 /// Returns statistics for this `ExecutionPlan` node. If statistics are not
402 /// available, should return [`Statistics::new_unknown`] (the default), not
403 /// an error.
404 ///
405 /// For TableScan executors, which supports filter pushdown, special attention
406 /// needs to be paid to whether the stats returned by this method are exact or not
407 fn statistics(&self) -> Result<Statistics> {
408 Ok(Statistics::new_unknown(&self.schema()))
409 }
410
411 /// Returns `true` if a limit can be safely pushed down through this
412 /// `ExecutionPlan` node.
413 ///
414 /// If this method returns `true`, and the query plan contains a limit at
415 /// the output of this node, DataFusion will push the limit to the input
416 /// of this node.
417 fn supports_limit_pushdown(&self) -> bool {
418 false
419 }
420
421 /// Returns a fetching variant of this `ExecutionPlan` node, if it supports
422 /// fetch limits. Returns `None` otherwise.
423 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
424 None
425 }
426
427 /// Gets the fetch count for the operator, `None` means there is no fetch.
428 fn fetch(&self) -> Option<usize> {
429 None
430 }
431
432 /// Gets the effect on cardinality, if known
433 fn cardinality_effect(&self) -> CardinalityEffect {
434 CardinalityEffect::Unknown
435 }
436
437 /// Attempts to push down the given projection into the input of this `ExecutionPlan`.
438 ///
439 /// If the operator supports this optimization, the resulting plan will be:
440 /// `self_new <- projection <- source`, starting from `projection <- self <- source`.
441 /// Otherwise, it returns the current `ExecutionPlan` as-is.
442 ///
443 /// Returns `Ok(Some(...))` if pushdown is applied, `Ok(None)` if it is not supported
444 /// or not possible, or `Err` on failure.
445 fn try_swapping_with_projection(
446 &self,
447 _projection: &ProjectionExec,
448 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
449 Ok(None)
450 }
451}
452
453/// [`ExecutionPlan`] Invariant Level
454///
455/// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan`
456///
457/// [Invariant]: https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science
458#[derive(Clone, Copy)]
459pub enum InvariantLevel {
460 /// Invariants that are always true for the [`ExecutionPlan`] node
461 /// such as the number of expected children.
462 Always,
463 /// Invariants that must hold true for the [`ExecutionPlan`] node
464 /// to be "executable", such as ordering and/or distribution requirements
465 /// being fulfilled.
466 Executable,
467}
468
469/// Extension trait provides an easy API to fetch various properties of
470/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
471pub trait ExecutionPlanProperties {
472 /// Specifies how the output of this `ExecutionPlan` is split into
473 /// partitions.
474 fn output_partitioning(&self) -> &Partitioning;
475
476 /// If the output of this `ExecutionPlan` within each partition is sorted,
477 /// returns `Some(keys)` describing the ordering. A `None` return value
478 /// indicates no assumptions should be made on the output ordering.
479 ///
480 /// For example, `SortExec` (obviously) produces sorted output as does
481 /// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
482 /// output if its input is sorted as it does not reorder the input rows.
483 fn output_ordering(&self) -> Option<&LexOrdering>;
484
485 /// Boundedness information of the stream corresponding to this `ExecutionPlan`.
486 /// For more details, see [`Boundedness`].
487 fn boundedness(&self) -> Boundedness;
488
489 /// Indicates how the stream of this `ExecutionPlan` emits its results.
490 /// For more details, see [`EmissionType`].
491 fn pipeline_behavior(&self) -> EmissionType;
492
493 /// Get the [`EquivalenceProperties`] within the plan.
494 ///
495 /// Equivalence properties tell DataFusion what columns are known to be
496 /// equal, during various optimization passes. By default, this returns "no
497 /// known equivalences" which is always correct, but may cause DataFusion to
498 /// unnecessarily resort data.
499 ///
500 /// If this ExecutionPlan makes no changes to the schema of the rows flowing
501 /// through it or how columns within each row relate to each other, it
502 /// should return the equivalence properties of its input. For
503 /// example, since `FilterExec` may remove rows from its input, but does not
504 /// otherwise modify them, it preserves its input equivalence properties.
505 /// However, since `ProjectionExec` may calculate derived expressions, it
506 /// needs special handling.
507 ///
508 /// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
509 /// for related concepts.
510 fn equivalence_properties(&self) -> &EquivalenceProperties;
511}
512
513impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
514 fn output_partitioning(&self) -> &Partitioning {
515 self.properties().output_partitioning()
516 }
517
518 fn output_ordering(&self) -> Option<&LexOrdering> {
519 self.properties().output_ordering()
520 }
521
522 fn boundedness(&self) -> Boundedness {
523 self.properties().boundedness
524 }
525
526 fn pipeline_behavior(&self) -> EmissionType {
527 self.properties().emission_type
528 }
529
530 fn equivalence_properties(&self) -> &EquivalenceProperties {
531 self.properties().equivalence_properties()
532 }
533}
534
535impl ExecutionPlanProperties for &dyn ExecutionPlan {
536 fn output_partitioning(&self) -> &Partitioning {
537 self.properties().output_partitioning()
538 }
539
540 fn output_ordering(&self) -> Option<&LexOrdering> {
541 self.properties().output_ordering()
542 }
543
544 fn boundedness(&self) -> Boundedness {
545 self.properties().boundedness
546 }
547
548 fn pipeline_behavior(&self) -> EmissionType {
549 self.properties().emission_type
550 }
551
552 fn equivalence_properties(&self) -> &EquivalenceProperties {
553 self.properties().equivalence_properties()
554 }
555}
556
557/// Represents whether a stream of data **generated** by an operator is bounded (finite)
558/// or unbounded (infinite).
559///
560/// This is used to determine whether an execution plan will eventually complete
561/// processing all its data (bounded) or could potentially run forever (unbounded).
562///
563/// For unbounded streams, it also tracks whether the operator requires finite memory
564/// to process the stream or if memory usage could grow unbounded.
565///
566/// Boundedness of the output stream is based on the the boundedness of the input stream and the nature of
567/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream.
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum Boundedness {
570 /// The data stream is bounded (finite) and will eventually complete
571 Bounded,
572 /// The data stream is unbounded (infinite) and could run forever
573 Unbounded {
574 /// Whether this operator requires infinite memory to process the unbounded stream.
575 /// If false, the operator can process an infinite stream with bounded memory.
576 /// If true, memory usage may grow unbounded while processing the stream.
577 ///
578 /// For example, `Median` requires infinite memory to compute the median of an unbounded stream.
579 /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered.
580 requires_infinite_memory: bool,
581 },
582}
583
584impl Boundedness {
585 pub fn is_unbounded(&self) -> bool {
586 matches!(self, Boundedness::Unbounded { .. })
587 }
588}
589
590/// Represents how an operator emits its output records.
591///
592/// This is used to determine whether an operator emits records incrementally as they arrive,
593/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows
594/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted.
595///
596/// For example, in the following plan:
597/// ```text
598/// SortExec [EmissionType::Final]
599/// |_ on: [col1 ASC]
600/// FilterExec [EmissionType::Incremental]
601/// |_ pred: col2 > 100
602/// DataSourceExec [EmissionType::Incremental]
603/// |_ file: "data.csv"
604/// ```
605/// - DataSourceExec emits records incrementally as it reads from the file
606/// - FilterExec processes and emits filtered records incrementally as they arrive
607/// - SortExec must wait for all input records before it can emit the sorted result,
608/// since it needs to see all values to determine their final order
609///
610/// Left joins can emit both incrementally and finally:
611/// - Incrementally emit matches as they are found
612/// - Finally emit non-matches after all input is processed
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614pub enum EmissionType {
615 /// Records are emitted incrementally as they arrive and are processed
616 Incremental,
617 /// Records are only emitted once all input has been processed
618 Final,
619 /// Records can be emitted both incrementally and as a final result
620 Both,
621}
622
623/// Utility to determine an operator's boundedness based on its children's boundedness.
624///
625/// Assumes boundedness can be inferred from child operators:
626/// - Unbounded (requires_infinite_memory: true) takes precedence.
627/// - Unbounded (requires_infinite_memory: false) is considered next.
628/// - Otherwise, the operator is bounded.
629///
630/// **Note:** This is a general-purpose utility and may not apply to
631/// all multi-child operators. Ensure your operator's behavior aligns
632/// with these assumptions before using.
633pub(crate) fn boundedness_from_children<'a>(
634 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
635) -> Boundedness {
636 let mut unbounded_with_finite_mem = false;
637
638 for child in children {
639 match child.boundedness() {
640 Boundedness::Unbounded {
641 requires_infinite_memory: true,
642 } => {
643 return Boundedness::Unbounded {
644 requires_infinite_memory: true,
645 }
646 }
647 Boundedness::Unbounded {
648 requires_infinite_memory: false,
649 } => {
650 unbounded_with_finite_mem = true;
651 }
652 Boundedness::Bounded => {}
653 }
654 }
655
656 if unbounded_with_finite_mem {
657 Boundedness::Unbounded {
658 requires_infinite_memory: false,
659 }
660 } else {
661 Boundedness::Bounded
662 }
663}
664
665/// Determines the emission type of an operator based on its children's pipeline behavior.
666///
667/// The precedence of emission types is:
668/// - `Final` has the highest precedence.
669/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present.
670/// - `Incremental` is the default if all children emit incremental results.
671///
672/// **Note:** This is a general-purpose utility and may not apply to
673/// all multi-child operators. Verify your operator's behavior aligns
674/// with these assumptions.
675pub(crate) fn emission_type_from_children<'a>(
676 children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
677) -> EmissionType {
678 let mut inc_and_final = false;
679
680 for child in children {
681 match child.pipeline_behavior() {
682 EmissionType::Final => return EmissionType::Final,
683 EmissionType::Both => inc_and_final = true,
684 EmissionType::Incremental => continue,
685 }
686 }
687
688 if inc_and_final {
689 EmissionType::Both
690 } else {
691 EmissionType::Incremental
692 }
693}
694
695/// Stores certain, often expensive to compute, plan properties used in query
696/// optimization.
697///
698/// These properties are stored a single structure to permit this information to
699/// be computed once and then those cached results used multiple times without
700/// recomputation (aka a cache)
701#[derive(Debug, Clone)]
702pub struct PlanProperties {
703 /// See [ExecutionPlanProperties::equivalence_properties]
704 pub eq_properties: EquivalenceProperties,
705 /// See [ExecutionPlanProperties::output_partitioning]
706 pub partitioning: Partitioning,
707 /// See [ExecutionPlanProperties::pipeline_behavior]
708 pub emission_type: EmissionType,
709 /// See [ExecutionPlanProperties::boundedness]
710 pub boundedness: Boundedness,
711 /// See [ExecutionPlanProperties::output_ordering]
712 output_ordering: Option<LexOrdering>,
713}
714
715impl PlanProperties {
716 /// Construct a new `PlanPropertiesCache` from the
717 pub fn new(
718 eq_properties: EquivalenceProperties,
719 partitioning: Partitioning,
720 emission_type: EmissionType,
721 boundedness: Boundedness,
722 ) -> Self {
723 // Output ordering can be derived from `eq_properties`.
724 let output_ordering = eq_properties.output_ordering();
725 Self {
726 eq_properties,
727 partitioning,
728 emission_type,
729 boundedness,
730 output_ordering,
731 }
732 }
733
734 /// Overwrite output partitioning with its new value.
735 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
736 self.partitioning = partitioning;
737 self
738 }
739
740 /// Overwrite equivalence properties with its new value.
741 pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
742 // Changing equivalence properties also changes output ordering, so
743 // make sure to overwrite it:
744 self.output_ordering = eq_properties.output_ordering();
745 self.eq_properties = eq_properties;
746 self
747 }
748
749 /// Overwrite boundedness with its new value.
750 pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
751 self.boundedness = boundedness;
752 self
753 }
754
755 /// Overwrite emission type with its new value.
756 pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
757 self.emission_type = emission_type;
758 self
759 }
760
761 /// Overwrite constraints with its new value.
762 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
763 self.eq_properties = self.eq_properties.with_constraints(constraints);
764 self
765 }
766
767 pub fn equivalence_properties(&self) -> &EquivalenceProperties {
768 &self.eq_properties
769 }
770
771 pub fn output_partitioning(&self) -> &Partitioning {
772 &self.partitioning
773 }
774
775 pub fn output_ordering(&self) -> Option<&LexOrdering> {
776 self.output_ordering.as_ref()
777 }
778
779 /// Get schema of the node.
780 pub(crate) fn schema(&self) -> &SchemaRef {
781 self.eq_properties.schema()
782 }
783}
784
785/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
786/// especially for the distributed engine to judge whether need to deal with shuffling.
787/// Currently there are 3 kinds of execution plan which needs data exchange
788/// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s
789/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
790/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
791pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
792 if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
793 !matches!(
794 repartition.properties().output_partitioning(),
795 Partitioning::RoundRobinBatch(_)
796 )
797 } else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
798 {
799 coalesce.input().output_partitioning().partition_count() > 1
800 } else if let Some(sort_preserving_merge) =
801 plan.as_any().downcast_ref::<SortPreservingMergeExec>()
802 {
803 sort_preserving_merge
804 .input()
805 .output_partitioning()
806 .partition_count()
807 > 1
808 } else {
809 false
810 }
811}
812
813/// Returns a copy of this plan if we change any child according to the pointer comparison.
814/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
815pub fn with_new_children_if_necessary(
816 plan: Arc<dyn ExecutionPlan>,
817 children: Vec<Arc<dyn ExecutionPlan>>,
818) -> Result<Arc<dyn ExecutionPlan>> {
819 let old_children = plan.children();
820 if children.len() != old_children.len() {
821 internal_err!("Wrong number of children")
822 } else if children.is_empty()
823 || children
824 .iter()
825 .zip(old_children.iter())
826 .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
827 {
828 plan.with_new_children(children)
829 } else {
830 Ok(plan)
831 }
832}
833
834/// Return a [`DisplayableExecutionPlan`] wrapper around an
835/// [`ExecutionPlan`] which can be displayed in various easier to
836/// understand ways.
837///
838/// See examples on [`DisplayableExecutionPlan`]
839pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
840 DisplayableExecutionPlan::new(plan)
841}
842
843/// Execute the [ExecutionPlan] and collect the results in memory
844pub async fn collect(
845 plan: Arc<dyn ExecutionPlan>,
846 context: Arc<TaskContext>,
847) -> Result<Vec<RecordBatch>> {
848 let stream = execute_stream(plan, context)?;
849 crate::common::collect(stream).await
850}
851
852/// Execute the [ExecutionPlan] and return a single stream of `RecordBatch`es.
853///
854/// See [collect] to buffer the `RecordBatch`es in memory.
855///
856/// # Aborting Execution
857///
858/// Dropping the stream will abort the execution of the query, and free up
859/// any allocated resources
860pub fn execute_stream(
861 plan: Arc<dyn ExecutionPlan>,
862 context: Arc<TaskContext>,
863) -> Result<SendableRecordBatchStream> {
864 match plan.output_partitioning().partition_count() {
865 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
866 1 => plan.execute(0, context),
867 2.. => {
868 // merge into a single partition
869 let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
870 // CoalescePartitionsExec must produce a single partition
871 assert_eq!(1, plan.properties().output_partitioning().partition_count());
872 plan.execute(0, context)
873 }
874 }
875}
876
877/// Execute the [ExecutionPlan] and collect the results in memory
878pub async fn collect_partitioned(
879 plan: Arc<dyn ExecutionPlan>,
880 context: Arc<TaskContext>,
881) -> Result<Vec<Vec<RecordBatch>>> {
882 let streams = execute_stream_partitioned(plan, context)?;
883
884 let mut join_set = JoinSet::new();
885 // Execute the plan and collect the results into batches.
886 streams.into_iter().enumerate().for_each(|(idx, stream)| {
887 join_set.spawn(async move {
888 let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
889 (idx, result)
890 });
891 });
892
893 let mut batches = vec![];
894 // Note that currently this doesn't identify the thread that panicked
895 //
896 // TODO: Replace with [join_next_with_id](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html#method.join_next_with_id
897 // once it is stable
898 while let Some(result) = join_set.join_next().await {
899 match result {
900 Ok((idx, res)) => batches.push((idx, res?)),
901 Err(e) => {
902 if e.is_panic() {
903 std::panic::resume_unwind(e.into_panic());
904 } else {
905 unreachable!();
906 }
907 }
908 }
909 }
910
911 batches.sort_by_key(|(idx, _)| *idx);
912 let batches = batches.into_iter().map(|(_, batch)| batch).collect();
913
914 Ok(batches)
915}
916
917/// Execute the [ExecutionPlan] and return a vec with one stream per output
918/// partition
919///
920/// # Aborting Execution
921///
922/// Dropping the stream will abort the execution of the query, and free up
923/// any allocated resources
924pub fn execute_stream_partitioned(
925 plan: Arc<dyn ExecutionPlan>,
926 context: Arc<TaskContext>,
927) -> Result<Vec<SendableRecordBatchStream>> {
928 let num_partitions = plan.output_partitioning().partition_count();
929 let mut streams = Vec::with_capacity(num_partitions);
930 for i in 0..num_partitions {
931 streams.push(plan.execute(i, Arc::clone(&context))?);
932 }
933 Ok(streams)
934}
935
936/// Executes an input stream and ensures that the resulting stream adheres to
937/// the `not null` constraints specified in the `sink_schema`.
938///
939/// # Arguments
940///
941/// * `input` - An execution plan
942/// * `sink_schema` - The schema to be applied to the output stream
943/// * `partition` - The partition index to be executed
944/// * `context` - The task context
945///
946/// # Returns
947///
948/// * `Result<SendableRecordBatchStream>` - A stream of `RecordBatch`es if successful
949///
950/// This function first executes the given input plan for the specified partition
951/// and context. It then checks if there are any columns in the input that might
952/// violate the `not null` constraints specified in the `sink_schema`. If there are
953/// such columns, it wraps the resulting stream to enforce the `not null` constraints
954/// by invoking the [`check_not_null_constraints`] function on each batch of the stream.
955pub fn execute_input_stream(
956 input: Arc<dyn ExecutionPlan>,
957 sink_schema: SchemaRef,
958 partition: usize,
959 context: Arc<TaskContext>,
960) -> Result<SendableRecordBatchStream> {
961 let input_stream = input.execute(partition, context)?;
962
963 debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
964
965 // Find input columns that may violate the not null constraint.
966 let risky_columns: Vec<_> = sink_schema
967 .fields()
968 .iter()
969 .zip(input.schema().fields().iter())
970 .enumerate()
971 .filter_map(|(idx, (sink_field, input_field))| {
972 (!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
973 })
974 .collect();
975
976 if risky_columns.is_empty() {
977 Ok(input_stream)
978 } else {
979 // Check not null constraint on the input stream
980 Ok(Box::pin(RecordBatchStreamAdapter::new(
981 sink_schema,
982 input_stream
983 .map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
984 )))
985 }
986}
987
988/// Checks a `RecordBatch` for `not null` constraints on specified columns.
989///
990/// # Arguments
991///
992/// * `batch` - The `RecordBatch` to be checked
993/// * `column_indices` - A vector of column indices that should be checked for
994/// `not null` constraints.
995///
996/// # Returns
997///
998/// * `Result<RecordBatch>` - The original `RecordBatch` if all constraints are met
999///
1000/// This function iterates over the specified column indices and ensures that none
1001/// of the columns contain null values. If any column contains null values, an error
1002/// is returned.
1003pub fn check_not_null_constraints(
1004 batch: RecordBatch,
1005 column_indices: &Vec<usize>,
1006) -> Result<RecordBatch> {
1007 for &index in column_indices {
1008 if batch.num_columns() <= index {
1009 return exec_err!(
1010 "Invalid batch column count {} expected > {}",
1011 batch.num_columns(),
1012 index
1013 );
1014 }
1015
1016 if batch
1017 .column(index)
1018 .logical_nulls()
1019 .map(|nulls| nulls.null_count())
1020 .unwrap_or_default()
1021 > 0
1022 {
1023 return exec_err!(
1024 "Invalid batch column at '{}' has null but schema specifies non-nullable",
1025 index
1026 );
1027 }
1028 }
1029
1030 Ok(batch)
1031}
1032
1033/// Utility function yielding a string representation of the given [`ExecutionPlan`].
1034pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
1035 let formatted = displayable(plan.as_ref()).indent(true).to_string();
1036 let actual: Vec<&str> = formatted.trim().lines().collect();
1037 actual.iter().map(|elem| elem.to_string()).collect()
1038}
1039
1040/// Indicates the effect an execution plan operator will have on the cardinality
1041/// of its input stream
1042pub enum CardinalityEffect {
1043 /// Unknown effect. This is the default
1044 Unknown,
1045 /// The operator is guaranteed to produce exactly one row for
1046 /// each input row
1047 Equal,
1048 /// The operator may produce fewer output rows than it receives input rows
1049 LowerEqual,
1050 /// The operator may produce more output rows than it receives input rows
1051 GreaterEqual,
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057 use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
1058 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1059 use std::any::Any;
1060 use std::sync::Arc;
1061
1062 use datafusion_common::{Result, Statistics};
1063 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
1064
1065 use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
1066
1067 #[derive(Debug)]
1068 pub struct EmptyExec;
1069
1070 impl EmptyExec {
1071 pub fn new(_schema: SchemaRef) -> Self {
1072 Self
1073 }
1074 }
1075
1076 impl DisplayAs for EmptyExec {
1077 fn fmt_as(
1078 &self,
1079 _t: DisplayFormatType,
1080 _f: &mut std::fmt::Formatter,
1081 ) -> std::fmt::Result {
1082 unimplemented!()
1083 }
1084 }
1085
1086 impl ExecutionPlan for EmptyExec {
1087 fn name(&self) -> &'static str {
1088 Self::static_name()
1089 }
1090
1091 fn as_any(&self) -> &dyn Any {
1092 self
1093 }
1094
1095 fn properties(&self) -> &PlanProperties {
1096 unimplemented!()
1097 }
1098
1099 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1100 vec![]
1101 }
1102
1103 fn with_new_children(
1104 self: Arc<Self>,
1105 _: Vec<Arc<dyn ExecutionPlan>>,
1106 ) -> Result<Arc<dyn ExecutionPlan>> {
1107 unimplemented!()
1108 }
1109
1110 fn execute(
1111 &self,
1112 _partition: usize,
1113 _context: Arc<TaskContext>,
1114 ) -> Result<SendableRecordBatchStream> {
1115 unimplemented!()
1116 }
1117
1118 fn statistics(&self) -> Result<Statistics> {
1119 unimplemented!()
1120 }
1121 }
1122
1123 #[derive(Debug)]
1124 pub struct RenamedEmptyExec;
1125
1126 impl RenamedEmptyExec {
1127 pub fn new(_schema: SchemaRef) -> Self {
1128 Self
1129 }
1130 }
1131
1132 impl DisplayAs for RenamedEmptyExec {
1133 fn fmt_as(
1134 &self,
1135 _t: DisplayFormatType,
1136 _f: &mut std::fmt::Formatter,
1137 ) -> std::fmt::Result {
1138 unimplemented!()
1139 }
1140 }
1141
1142 impl ExecutionPlan for RenamedEmptyExec {
1143 fn name(&self) -> &'static str {
1144 Self::static_name()
1145 }
1146
1147 fn static_name() -> &'static str
1148 where
1149 Self: Sized,
1150 {
1151 "MyRenamedEmptyExec"
1152 }
1153
1154 fn as_any(&self) -> &dyn Any {
1155 self
1156 }
1157
1158 fn properties(&self) -> &PlanProperties {
1159 unimplemented!()
1160 }
1161
1162 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1163 vec![]
1164 }
1165
1166 fn with_new_children(
1167 self: Arc<Self>,
1168 _: Vec<Arc<dyn ExecutionPlan>>,
1169 ) -> Result<Arc<dyn ExecutionPlan>> {
1170 unimplemented!()
1171 }
1172
1173 fn execute(
1174 &self,
1175 _partition: usize,
1176 _context: Arc<TaskContext>,
1177 ) -> Result<SendableRecordBatchStream> {
1178 unimplemented!()
1179 }
1180
1181 fn statistics(&self) -> Result<Statistics> {
1182 unimplemented!()
1183 }
1184 }
1185
1186 #[test]
1187 fn test_execution_plan_name() {
1188 let schema1 = Arc::new(Schema::empty());
1189 let default_name_exec = EmptyExec::new(schema1);
1190 assert_eq!(default_name_exec.name(), "EmptyExec");
1191
1192 let schema2 = Arc::new(Schema::empty());
1193 let renamed_exec = RenamedEmptyExec::new(schema2);
1194 assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
1195 assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
1196 }
1197
1198 /// A compilation test to ensure that the `ExecutionPlan::name()` method can
1199 /// be called from a trait object.
1200 /// Related ticket: https://github.com/apache/datafusion/pull/11047
1201 #[allow(dead_code)]
1202 fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
1203 let _ = plan.name();
1204 }
1205
1206 #[test]
1207 fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
1208 check_not_null_constraints(
1209 RecordBatch::try_new(
1210 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1211 vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
1212 )?,
1213 &vec![0],
1214 )?;
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn test_check_not_null_constraints_reject_null() -> Result<()> {
1220 let result = check_not_null_constraints(
1221 RecordBatch::try_new(
1222 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
1223 vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
1224 )?,
1225 &vec![0],
1226 );
1227 assert!(result.is_err());
1228 assert_eq!(
1229 result.err().unwrap().strip_backtrace(),
1230 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1231 );
1232 Ok(())
1233 }
1234
1235 #[test]
1236 fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
1237 // some null value inside REE array
1238 let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
1239 let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
1240 let run_end_array = RunArray::try_new(&run_ends, &values)?;
1241 let result = check_not_null_constraints(
1242 RecordBatch::try_new(
1243 Arc::new(Schema::new(vec![Field::new(
1244 "a",
1245 run_end_array.data_type().to_owned(),
1246 true,
1247 )])),
1248 vec![Arc::new(run_end_array)],
1249 )?,
1250 &vec![0],
1251 );
1252 assert!(result.is_err());
1253 assert_eq!(
1254 result.err().unwrap().strip_backtrace(),
1255 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1256 );
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
1262 let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
1263 let keys = Int32Array::from(vec![0, 1, 2, 3]);
1264 let dictionary = DictionaryArray::new(keys, values);
1265 let result = check_not_null_constraints(
1266 RecordBatch::try_new(
1267 Arc::new(Schema::new(vec![Field::new(
1268 "a",
1269 dictionary.data_type().to_owned(),
1270 true,
1271 )])),
1272 vec![Arc::new(dictionary)],
1273 )?,
1274 &vec![0],
1275 );
1276 assert!(result.is_err());
1277 assert_eq!(
1278 result.err().unwrap().strip_backtrace(),
1279 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1280 );
1281 Ok(())
1282 }
1283
1284 #[test]
1285 fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
1286 // some null value marked out by dictionary array
1287 let values = Arc::new(Int32Array::from(vec![
1288 Some(1),
1289 None, // this null value is masked by dictionary keys
1290 Some(3),
1291 Some(4),
1292 ]));
1293 let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
1294 let dictionary = DictionaryArray::new(keys, values);
1295 check_not_null_constraints(
1296 RecordBatch::try_new(
1297 Arc::new(Schema::new(vec![Field::new(
1298 "a",
1299 dictionary.data_type().to_owned(),
1300 true,
1301 )])),
1302 vec![Arc::new(dictionary)],
1303 )?,
1304 &vec![0],
1305 )?;
1306 Ok(())
1307 }
1308
1309 #[test]
1310 fn test_check_not_null_constraints_on_null_type() -> Result<()> {
1311 // null value of Null type
1312 let result = check_not_null_constraints(
1313 RecordBatch::try_new(
1314 Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
1315 vec![Arc::new(NullArray::new(3))],
1316 )?,
1317 &vec![0],
1318 );
1319 assert!(result.is_err());
1320 assert_eq!(
1321 result.err().unwrap().strip_backtrace(),
1322 "Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
1323 );
1324 Ok(())
1325 }
1326}