datafusion_physical_plan/
coalesce_partitions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the merge plan for executing partitions in parallel and then merging the results
19//! into a single partition
20
21use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27    DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28    Statistics,
29};
30use crate::execution_plan::CardinalityEffect;
31use crate::projection::{make_with_child, ProjectionExec};
32use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
33
34use datafusion_common::{internal_err, Result};
35use datafusion_execution::TaskContext;
36
37/// Merge execution plan executes partitions in parallel and combines them into a single
38/// partition. No guarantees are made about the order of the resulting partition.
39#[derive(Debug, Clone)]
40pub struct CoalescePartitionsExec {
41    /// Input execution plan
42    input: Arc<dyn ExecutionPlan>,
43    /// Execution metrics
44    metrics: ExecutionPlanMetricsSet,
45    cache: PlanProperties,
46    /// Optional number of rows to fetch. Stops producing rows after this fetch
47    pub(crate) fetch: Option<usize>,
48}
49
50impl CoalescePartitionsExec {
51    /// Create a new CoalescePartitionsExec
52    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
53        let cache = Self::compute_properties(&input);
54        CoalescePartitionsExec {
55            input,
56            metrics: ExecutionPlanMetricsSet::new(),
57            cache,
58            fetch: None,
59        }
60    }
61
62    /// Input execution plan
63    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
64        &self.input
65    }
66
67    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
68    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
69        // Coalescing partitions loses existing orderings:
70        let mut eq_properties = input.equivalence_properties().clone();
71        eq_properties.clear_orderings();
72        eq_properties.clear_per_partition_constants();
73        PlanProperties::new(
74            eq_properties,                        // Equivalence Properties
75            Partitioning::UnknownPartitioning(1), // Output Partitioning
76            input.pipeline_behavior(),
77            input.boundedness(),
78        )
79    }
80}
81
82impl DisplayAs for CoalescePartitionsExec {
83    fn fmt_as(
84        &self,
85        t: DisplayFormatType,
86        f: &mut std::fmt::Formatter,
87    ) -> std::fmt::Result {
88        match t {
89            DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
90                Some(fetch) => {
91                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
92                }
93                None => write!(f, "CoalescePartitionsExec"),
94            },
95        }
96    }
97}
98
99impl ExecutionPlan for CoalescePartitionsExec {
100    fn name(&self) -> &'static str {
101        "CoalescePartitionsExec"
102    }
103
104    /// Return a reference to Any that can be used for downcasting
105    fn as_any(&self) -> &dyn Any {
106        self
107    }
108
109    fn properties(&self) -> &PlanProperties {
110        &self.cache
111    }
112
113    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
114        vec![&self.input]
115    }
116
117    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
118        vec![false]
119    }
120
121    fn with_new_children(
122        self: Arc<Self>,
123        children: Vec<Arc<dyn ExecutionPlan>>,
124    ) -> Result<Arc<dyn ExecutionPlan>> {
125        let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
126        plan.fetch = self.fetch;
127        Ok(Arc::new(plan))
128    }
129
130    fn execute(
131        &self,
132        partition: usize,
133        context: Arc<TaskContext>,
134    ) -> Result<SendableRecordBatchStream> {
135        // CoalescePartitionsExec produces a single partition
136        if 0 != partition {
137            return internal_err!("CoalescePartitionsExec invalid partition {partition}");
138        }
139
140        let input_partitions = self.input.output_partitioning().partition_count();
141        match input_partitions {
142            0 => internal_err!(
143                "CoalescePartitionsExec requires at least one input partition"
144            ),
145            1 => {
146                // bypass any threading / metrics if there is a single partition
147                self.input.execute(0, context)
148            }
149            _ => {
150                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
151                // record the (very) minimal work done so that
152                // elapsed_compute is not reported as 0
153                let elapsed_compute = baseline_metrics.elapsed_compute().clone();
154                let _timer = elapsed_compute.timer();
155
156                // use a stream that allows each sender to put in at
157                // least one result in an attempt to maximize
158                // parallelism.
159                let mut builder =
160                    RecordBatchReceiverStream::builder(self.schema(), input_partitions);
161
162                // spawn independent tasks whose resulting streams (of batches)
163                // are sent to the channel for consumption.
164                for part_i in 0..input_partitions {
165                    builder.run_input(
166                        Arc::clone(&self.input),
167                        part_i,
168                        Arc::clone(&context),
169                    );
170                }
171
172                let stream = builder.build();
173                Ok(Box::pin(ObservedStream::new(
174                    stream,
175                    baseline_metrics,
176                    self.fetch,
177                )))
178            }
179        }
180    }
181
182    fn metrics(&self) -> Option<MetricsSet> {
183        Some(self.metrics.clone_inner())
184    }
185
186    fn statistics(&self) -> Result<Statistics> {
187        Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
188    }
189
190    fn supports_limit_pushdown(&self) -> bool {
191        true
192    }
193
194    fn cardinality_effect(&self) -> CardinalityEffect {
195        CardinalityEffect::Equal
196    }
197
198    /// Tries to swap `projection` with its input, which is known to be a
199    /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns
200    /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`.
201    fn try_swapping_with_projection(
202        &self,
203        projection: &ProjectionExec,
204    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
205        // If the projection does not narrow the schema, we should not try to push it down:
206        if projection.expr().len() >= projection.input().schema().fields().len() {
207            return Ok(None);
208        }
209        // CoalescePartitionsExec always has a single child, so zero indexing is safe.
210        make_with_child(projection, projection.input().children()[0]).map(|e| {
211            if self.fetch.is_some() {
212                let mut plan = CoalescePartitionsExec::new(e);
213                plan.fetch = self.fetch;
214                Some(Arc::new(plan) as _)
215            } else {
216                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
217            }
218        })
219    }
220
221    fn fetch(&self) -> Option<usize> {
222        self.fetch
223    }
224
225    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
226        Some(Arc::new(CoalescePartitionsExec {
227            input: Arc::clone(&self.input),
228            fetch: limit,
229            metrics: self.metrics.clone(),
230            cache: self.cache.clone(),
231        }))
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::test::exec::{
239        assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
240    };
241    use crate::test::{self, assert_is_pending};
242    use crate::{collect, common};
243
244    use arrow::datatypes::{DataType, Field, Schema};
245
246    use futures::FutureExt;
247
248    #[tokio::test]
249    async fn merge() -> Result<()> {
250        let task_ctx = Arc::new(TaskContext::default());
251
252        let num_partitions = 4;
253        let csv = test::scan_partitioned(num_partitions);
254
255        // input should have 4 partitions
256        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
257
258        let merge = CoalescePartitionsExec::new(csv);
259
260        // output of CoalescePartitionsExec should have a single partition
261        assert_eq!(
262            merge.properties().output_partitioning().partition_count(),
263            1
264        );
265
266        // the result should contain 4 batches (one per input partition)
267        let iter = merge.execute(0, task_ctx)?;
268        let batches = common::collect(iter).await?;
269        assert_eq!(batches.len(), num_partitions);
270
271        // there should be a total of 400 rows (100 per each partition)
272        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
273        assert_eq!(row_count, 400);
274
275        Ok(())
276    }
277
278    #[tokio::test]
279    async fn test_drop_cancel() -> Result<()> {
280        let task_ctx = Arc::new(TaskContext::default());
281        let schema =
282            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
283
284        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
285        let refs = blocking_exec.refs();
286        let coalesce_partitions_exec =
287            Arc::new(CoalescePartitionsExec::new(blocking_exec));
288
289        let fut = collect(coalesce_partitions_exec, task_ctx);
290        let mut fut = fut.boxed();
291
292        assert_is_pending(&mut fut);
293        drop(fut);
294        assert_strong_count_converges_to_zero(refs).await;
295
296        Ok(())
297    }
298
299    #[tokio::test]
300    #[should_panic(expected = "PanickingStream did panic")]
301    async fn test_panic() {
302        let task_ctx = Arc::new(TaskContext::default());
303        let schema =
304            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
305
306        let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
307        let coalesce_partitions_exec =
308            Arc::new(CoalescePartitionsExec::new(panicking_exec));
309
310        collect(coalesce_partitions_exec, task_ctx).await.unwrap();
311    }
312}