datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35
36use crate::coalesce::{BatchCoalescer, CoalescerState};
37use crate::execution_plan::CardinalityEffect;
38use futures::ready;
39use futures::stream::{Stream, StreamExt};
40
41/// `CoalesceBatchesExec` combines small batches into larger batches for more
42/// efficient vectorized processing by later operators.
43///
44/// The operator buffers batches until it collects `target_batch_size` rows and
45/// then emits a single concatenated batch. When only a limited number of rows
46/// are necessary (specified by the `fetch` parameter), the operator will stop
47/// buffering and returns the final batch once the number of collected rows
48/// reaches the `fetch` value.
49///
50/// See [`BatchCoalescer`] for more information
51#[derive(Debug, Clone)]
52pub struct CoalesceBatchesExec {
53    /// The input plan
54    input: Arc<dyn ExecutionPlan>,
55    /// Minimum number of rows for coalescing batches
56    target_batch_size: usize,
57    /// Maximum number of rows to fetch, `None` means fetching all rows
58    fetch: Option<usize>,
59    /// Execution metrics
60    metrics: ExecutionPlanMetricsSet,
61    cache: PlanProperties,
62}
63
64impl CoalesceBatchesExec {
65    /// Create a new CoalesceBatchesExec
66    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
67        let cache = Self::compute_properties(&input);
68        Self {
69            input,
70            target_batch_size,
71            fetch: None,
72            metrics: ExecutionPlanMetricsSet::new(),
73            cache,
74        }
75    }
76
77    /// Update fetch with the argument
78    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
79        self.fetch = fetch;
80        self
81    }
82
83    /// The input plan
84    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
85        &self.input
86    }
87
88    /// Minimum number of rows for coalesces batches
89    pub fn target_batch_size(&self) -> usize {
90        self.target_batch_size
91    }
92
93    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
94    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
95        // The coalesce batches operator does not make any changes to the
96        // partitioning of its input.
97        PlanProperties::new(
98            input.equivalence_properties().clone(), // Equivalence Properties
99            input.output_partitioning().clone(),    // Output Partitioning
100            input.pipeline_behavior(),
101            input.boundedness(),
102        )
103    }
104}
105
106impl DisplayAs for CoalesceBatchesExec {
107    fn fmt_as(
108        &self,
109        t: DisplayFormatType,
110        f: &mut std::fmt::Formatter,
111    ) -> std::fmt::Result {
112        match t {
113            DisplayFormatType::Default | DisplayFormatType::Verbose => {
114                write!(
115                    f,
116                    "CoalesceBatchesExec: target_batch_size={}",
117                    self.target_batch_size,
118                )?;
119                if let Some(fetch) = self.fetch {
120                    write!(f, ", fetch={fetch}")?;
121                };
122
123                Ok(())
124            }
125        }
126    }
127}
128
129impl ExecutionPlan for CoalesceBatchesExec {
130    fn name(&self) -> &'static str {
131        "CoalesceBatchesExec"
132    }
133
134    /// Return a reference to Any that can be used for downcasting
135    fn as_any(&self) -> &dyn Any {
136        self
137    }
138
139    fn properties(&self) -> &PlanProperties {
140        &self.cache
141    }
142
143    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
144        vec![&self.input]
145    }
146
147    fn maintains_input_order(&self) -> Vec<bool> {
148        vec![true]
149    }
150
151    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
152        vec![false]
153    }
154
155    fn with_new_children(
156        self: Arc<Self>,
157        children: Vec<Arc<dyn ExecutionPlan>>,
158    ) -> Result<Arc<dyn ExecutionPlan>> {
159        Ok(Arc::new(
160            CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
161                .with_fetch(self.fetch),
162        ))
163    }
164
165    fn execute(
166        &self,
167        partition: usize,
168        context: Arc<TaskContext>,
169    ) -> Result<SendableRecordBatchStream> {
170        Ok(Box::pin(CoalesceBatchesStream {
171            input: self.input.execute(partition, context)?,
172            coalescer: BatchCoalescer::new(
173                self.input.schema(),
174                self.target_batch_size,
175                self.fetch,
176            ),
177            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
178            // Start by pulling data
179            inner_state: CoalesceBatchesStreamState::Pull,
180        }))
181    }
182
183    fn metrics(&self) -> Option<MetricsSet> {
184        Some(self.metrics.clone_inner())
185    }
186
187    fn statistics(&self) -> Result<Statistics> {
188        Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
189    }
190
191    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
192        Some(Arc::new(CoalesceBatchesExec {
193            input: Arc::clone(&self.input),
194            target_batch_size: self.target_batch_size,
195            fetch: limit,
196            metrics: self.metrics.clone(),
197            cache: self.cache.clone(),
198        }))
199    }
200
201    fn fetch(&self) -> Option<usize> {
202        self.fetch
203    }
204
205    fn cardinality_effect(&self) -> CardinalityEffect {
206        CardinalityEffect::Equal
207    }
208}
209
210/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
211struct CoalesceBatchesStream {
212    /// The input plan
213    input: SendableRecordBatchStream,
214    /// Buffer for combining batches
215    coalescer: BatchCoalescer,
216    /// Execution metrics
217    baseline_metrics: BaselineMetrics,
218    /// The current inner state of the stream. This state dictates the current
219    /// action or operation to be performed in the streaming process.
220    inner_state: CoalesceBatchesStreamState,
221}
222
223impl Stream for CoalesceBatchesStream {
224    type Item = Result<RecordBatch>;
225
226    fn poll_next(
227        mut self: Pin<&mut Self>,
228        cx: &mut Context<'_>,
229    ) -> Poll<Option<Self::Item>> {
230        let poll = self.poll_next_inner(cx);
231        self.baseline_metrics.record_poll(poll)
232    }
233
234    fn size_hint(&self) -> (usize, Option<usize>) {
235        // we can't predict the size of incoming batches so re-use the size hint from the input
236        self.input.size_hint()
237    }
238}
239
240/// Enumeration of possible states for `CoalesceBatchesStream`.
241/// It represents different stages in the lifecycle of a stream of record batches.
242///
243/// An example of state transition:
244/// Notation:
245/// `[3000]`: A batch with size 3000
246/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches buffered
247/// Input of `CoalesceBatchStream` will generate three batches `[2000], [3000], [4000]`
248/// The coalescing procedure will go through the following steps with 4096 coalescing threshold:
249/// 1. Read the first batch and get it buffered.
250/// - initial state: `Pull`
251/// - initial buffer: `{}`
252/// - updated buffer: `{[2000]}`
253/// - next state: `Pull`
254/// 2. Read the second batch, the coalescing target is reached since 2000 + 3000 > 4096
255/// - initial state: `Pull`
256/// - initial buffer: `{[2000]}`
257/// - updated buffer: `{[2000], [3000]}`
258/// - next state: `ReturnBuffer`
259/// 4. Two batches in the batch get merged and consumed by the upstream operator.
260/// - initial state: `ReturnBuffer`
261/// - initial buffer: `{[2000], [3000]}`
262/// - updated buffer: `{}`
263/// - next state: `Pull`
264/// 5. Read the third input batch.
265/// - initial state: `Pull`
266/// - initial buffer: `{}`
267/// - updated buffer: `{[4000]}`
268/// - next state: `Pull`
269/// 5. The input is ended now. Jump to exhaustion state preparing the finalized data.
270/// - initial state: `Pull`
271/// - initial buffer: `{[4000]}`
272/// - updated buffer: `{[4000]}`
273/// - next state: `Exhausted`
274#[derive(Debug, Clone, Eq, PartialEq)]
275enum CoalesceBatchesStreamState {
276    /// State to pull a new batch from the input stream.
277    Pull,
278    /// State to return a buffered batch.
279    ReturnBuffer,
280    /// State indicating that the stream is exhausted.
281    Exhausted,
282}
283
284impl CoalesceBatchesStream {
285    fn poll_next_inner(
286        self: &mut Pin<&mut Self>,
287        cx: &mut Context<'_>,
288    ) -> Poll<Option<Result<RecordBatch>>> {
289        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
290        loop {
291            match &self.inner_state {
292                CoalesceBatchesStreamState::Pull => {
293                    // Attempt to pull the next batch from the input stream.
294                    let input_batch = ready!(self.input.poll_next_unpin(cx));
295                    // Start timing the operation. The timer records time upon being dropped.
296                    let _timer = cloned_time.timer();
297
298                    match input_batch {
299                        Some(Ok(batch)) => match self.coalescer.push_batch(batch) {
300                            CoalescerState::Continue => {}
301                            CoalescerState::LimitReached => {
302                                self.inner_state = CoalesceBatchesStreamState::Exhausted;
303                            }
304                            CoalescerState::TargetReached => {
305                                self.inner_state =
306                                    CoalesceBatchesStreamState::ReturnBuffer;
307                            }
308                        },
309                        None => {
310                            // End of input stream, but buffered batches might still be present.
311                            self.inner_state = CoalesceBatchesStreamState::Exhausted;
312                        }
313                        other => return Poll::Ready(other),
314                    }
315                }
316                CoalesceBatchesStreamState::ReturnBuffer => {
317                    // Combine buffered batches into one batch and return it.
318                    let batch = self.coalescer.finish_batch()?;
319                    // Set to pull state for the next iteration.
320                    self.inner_state = CoalesceBatchesStreamState::Pull;
321                    return Poll::Ready(Some(Ok(batch)));
322                }
323                CoalesceBatchesStreamState::Exhausted => {
324                    // Handle the end of the input stream.
325                    return if self.coalescer.is_empty() {
326                        // If buffer is empty, return None indicating the stream is fully consumed.
327                        Poll::Ready(None)
328                    } else {
329                        // If the buffer still contains batches, prepare to return them.
330                        let batch = self.coalescer.finish_batch()?;
331                        Poll::Ready(Some(Ok(batch)))
332                    };
333                }
334            }
335        }
336    }
337}
338
339impl RecordBatchStream for CoalesceBatchesStream {
340    fn schema(&self) -> SchemaRef {
341        self.coalescer.schema()
342    }
343}