datafusion_physical_plan/
coalesce_partitions.rs1use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27 DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28 Statistics,
29};
30use crate::execution_plan::CardinalityEffect;
31use crate::projection::{make_with_child, ProjectionExec};
32use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
33
34use datafusion_common::{internal_err, Result};
35use datafusion_execution::TaskContext;
36
37#[derive(Debug, Clone)]
40pub struct CoalescePartitionsExec {
41 input: Arc<dyn ExecutionPlan>,
43 metrics: ExecutionPlanMetricsSet,
45 cache: PlanProperties,
46 pub(crate) fetch: Option<usize>,
48}
49
50impl CoalescePartitionsExec {
51 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
53 let cache = Self::compute_properties(&input);
54 CoalescePartitionsExec {
55 input,
56 metrics: ExecutionPlanMetricsSet::new(),
57 cache,
58 fetch: None,
59 }
60 }
61
62 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
64 &self.input
65 }
66
67 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
69 let mut eq_properties = input.equivalence_properties().clone();
71 eq_properties.clear_orderings();
72 eq_properties.clear_per_partition_constants();
73 PlanProperties::new(
74 eq_properties, Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
77 input.boundedness(),
78 )
79 }
80}
81
82impl DisplayAs for CoalescePartitionsExec {
83 fn fmt_as(
84 &self,
85 t: DisplayFormatType,
86 f: &mut std::fmt::Formatter,
87 ) -> std::fmt::Result {
88 match t {
89 DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
90 Some(fetch) => {
91 write!(f, "CoalescePartitionsExec: fetch={fetch}")
92 }
93 None => write!(f, "CoalescePartitionsExec"),
94 },
95 }
96 }
97}
98
99impl ExecutionPlan for CoalescePartitionsExec {
100 fn name(&self) -> &'static str {
101 "CoalescePartitionsExec"
102 }
103
104 fn as_any(&self) -> &dyn Any {
106 self
107 }
108
109 fn properties(&self) -> &PlanProperties {
110 &self.cache
111 }
112
113 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
114 vec![&self.input]
115 }
116
117 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
118 vec![false]
119 }
120
121 fn with_new_children(
122 self: Arc<Self>,
123 children: Vec<Arc<dyn ExecutionPlan>>,
124 ) -> Result<Arc<dyn ExecutionPlan>> {
125 let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
126 plan.fetch = self.fetch;
127 Ok(Arc::new(plan))
128 }
129
130 fn execute(
131 &self,
132 partition: usize,
133 context: Arc<TaskContext>,
134 ) -> Result<SendableRecordBatchStream> {
135 if 0 != partition {
137 return internal_err!("CoalescePartitionsExec invalid partition {partition}");
138 }
139
140 let input_partitions = self.input.output_partitioning().partition_count();
141 match input_partitions {
142 0 => internal_err!(
143 "CoalescePartitionsExec requires at least one input partition"
144 ),
145 1 => {
146 self.input.execute(0, context)
148 }
149 _ => {
150 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
151 let elapsed_compute = baseline_metrics.elapsed_compute().clone();
154 let _timer = elapsed_compute.timer();
155
156 let mut builder =
160 RecordBatchReceiverStream::builder(self.schema(), input_partitions);
161
162 for part_i in 0..input_partitions {
165 builder.run_input(
166 Arc::clone(&self.input),
167 part_i,
168 Arc::clone(&context),
169 );
170 }
171
172 let stream = builder.build();
173 Ok(Box::pin(ObservedStream::new(
174 stream,
175 baseline_metrics,
176 self.fetch,
177 )))
178 }
179 }
180 }
181
182 fn metrics(&self) -> Option<MetricsSet> {
183 Some(self.metrics.clone_inner())
184 }
185
186 fn statistics(&self) -> Result<Statistics> {
187 Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
188 }
189
190 fn supports_limit_pushdown(&self) -> bool {
191 true
192 }
193
194 fn cardinality_effect(&self) -> CardinalityEffect {
195 CardinalityEffect::Equal
196 }
197
198 fn try_swapping_with_projection(
202 &self,
203 projection: &ProjectionExec,
204 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
205 if projection.expr().len() >= projection.input().schema().fields().len() {
207 return Ok(None);
208 }
209 make_with_child(projection, projection.input().children()[0]).map(|e| {
211 if self.fetch.is_some() {
212 let mut plan = CoalescePartitionsExec::new(e);
213 plan.fetch = self.fetch;
214 Some(Arc::new(plan) as _)
215 } else {
216 Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
217 }
218 })
219 }
220
221 fn fetch(&self) -> Option<usize> {
222 self.fetch
223 }
224
225 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
226 Some(Arc::new(CoalescePartitionsExec {
227 input: Arc::clone(&self.input),
228 fetch: limit,
229 metrics: self.metrics.clone(),
230 cache: self.cache.clone(),
231 }))
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::test::exec::{
239 assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
240 };
241 use crate::test::{self, assert_is_pending};
242 use crate::{collect, common};
243
244 use arrow::datatypes::{DataType, Field, Schema};
245
246 use futures::FutureExt;
247
248 #[tokio::test]
249 async fn merge() -> Result<()> {
250 let task_ctx = Arc::new(TaskContext::default());
251
252 let num_partitions = 4;
253 let csv = test::scan_partitioned(num_partitions);
254
255 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
257
258 let merge = CoalescePartitionsExec::new(csv);
259
260 assert_eq!(
262 merge.properties().output_partitioning().partition_count(),
263 1
264 );
265
266 let iter = merge.execute(0, task_ctx)?;
268 let batches = common::collect(iter).await?;
269 assert_eq!(batches.len(), num_partitions);
270
271 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
273 assert_eq!(row_count, 400);
274
275 Ok(())
276 }
277
278 #[tokio::test]
279 async fn test_drop_cancel() -> Result<()> {
280 let task_ctx = Arc::new(TaskContext::default());
281 let schema =
282 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
283
284 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
285 let refs = blocking_exec.refs();
286 let coalesce_partitions_exec =
287 Arc::new(CoalescePartitionsExec::new(blocking_exec));
288
289 let fut = collect(coalesce_partitions_exec, task_ctx);
290 let mut fut = fut.boxed();
291
292 assert_is_pending(&mut fut);
293 drop(fut);
294 assert_strong_count_converges_to_zero(refs).await;
295
296 Ok(())
297 }
298
299 #[tokio::test]
300 #[should_panic(expected = "PanickingStream did panic")]
301 async fn test_panic() {
302 let task_ctx = Arc::new(TaskContext::default());
303 let schema =
304 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
305
306 let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
307 let coalesce_partitions_exec =
308 Arc::new(CoalescePartitionsExec::new(panicking_exec));
309
310 collect(coalesce_partitions_exec, task_ctx).await.unwrap();
311 }
312}