datafusion_physical_plan/coalesce_batches.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35
36use crate::coalesce::{BatchCoalescer, CoalescerState};
37use crate::execution_plan::CardinalityEffect;
38use futures::ready;
39use futures::stream::{Stream, StreamExt};
40
41/// `CoalesceBatchesExec` combines small batches into larger batches for more
42/// efficient vectorized processing by later operators.
43///
44/// The operator buffers batches until it collects `target_batch_size` rows and
45/// then emits a single concatenated batch. When only a limited number of rows
46/// are necessary (specified by the `fetch` parameter), the operator will stop
47/// buffering and returns the final batch once the number of collected rows
48/// reaches the `fetch` value.
49///
50/// See [`BatchCoalescer`] for more information
51#[derive(Debug, Clone)]
52pub struct CoalesceBatchesExec {
53 /// The input plan
54 input: Arc<dyn ExecutionPlan>,
55 /// Minimum number of rows for coalescing batches
56 target_batch_size: usize,
57 /// Maximum number of rows to fetch, `None` means fetching all rows
58 fetch: Option<usize>,
59 /// Execution metrics
60 metrics: ExecutionPlanMetricsSet,
61 cache: PlanProperties,
62}
63
64impl CoalesceBatchesExec {
65 /// Create a new CoalesceBatchesExec
66 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
67 let cache = Self::compute_properties(&input);
68 Self {
69 input,
70 target_batch_size,
71 fetch: None,
72 metrics: ExecutionPlanMetricsSet::new(),
73 cache,
74 }
75 }
76
77 /// Update fetch with the argument
78 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
79 self.fetch = fetch;
80 self
81 }
82
83 /// The input plan
84 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
85 &self.input
86 }
87
88 /// Minimum number of rows for coalesces batches
89 pub fn target_batch_size(&self) -> usize {
90 self.target_batch_size
91 }
92
93 /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
94 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
95 // The coalesce batches operator does not make any changes to the
96 // partitioning of its input.
97 PlanProperties::new(
98 input.equivalence_properties().clone(), // Equivalence Properties
99 input.output_partitioning().clone(), // Output Partitioning
100 input.pipeline_behavior(),
101 input.boundedness(),
102 )
103 }
104}
105
106impl DisplayAs for CoalesceBatchesExec {
107 fn fmt_as(
108 &self,
109 t: DisplayFormatType,
110 f: &mut std::fmt::Formatter,
111 ) -> std::fmt::Result {
112 match t {
113 DisplayFormatType::Default | DisplayFormatType::Verbose => {
114 write!(
115 f,
116 "CoalesceBatchesExec: target_batch_size={}",
117 self.target_batch_size,
118 )?;
119 if let Some(fetch) = self.fetch {
120 write!(f, ", fetch={fetch}")?;
121 };
122
123 Ok(())
124 }
125 }
126 }
127}
128
129impl ExecutionPlan for CoalesceBatchesExec {
130 fn name(&self) -> &'static str {
131 "CoalesceBatchesExec"
132 }
133
134 /// Return a reference to Any that can be used for downcasting
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138
139 fn properties(&self) -> &PlanProperties {
140 &self.cache
141 }
142
143 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
144 vec![&self.input]
145 }
146
147 fn maintains_input_order(&self) -> Vec<bool> {
148 vec![true]
149 }
150
151 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
152 vec![false]
153 }
154
155 fn with_new_children(
156 self: Arc<Self>,
157 children: Vec<Arc<dyn ExecutionPlan>>,
158 ) -> Result<Arc<dyn ExecutionPlan>> {
159 Ok(Arc::new(
160 CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
161 .with_fetch(self.fetch),
162 ))
163 }
164
165 fn execute(
166 &self,
167 partition: usize,
168 context: Arc<TaskContext>,
169 ) -> Result<SendableRecordBatchStream> {
170 Ok(Box::pin(CoalesceBatchesStream {
171 input: self.input.execute(partition, context)?,
172 coalescer: BatchCoalescer::new(
173 self.input.schema(),
174 self.target_batch_size,
175 self.fetch,
176 ),
177 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
178 // Start by pulling data
179 inner_state: CoalesceBatchesStreamState::Pull,
180 }))
181 }
182
183 fn metrics(&self) -> Option<MetricsSet> {
184 Some(self.metrics.clone_inner())
185 }
186
187 fn statistics(&self) -> Result<Statistics> {
188 Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
189 }
190
191 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
192 Some(Arc::new(CoalesceBatchesExec {
193 input: Arc::clone(&self.input),
194 target_batch_size: self.target_batch_size,
195 fetch: limit,
196 metrics: self.metrics.clone(),
197 cache: self.cache.clone(),
198 }))
199 }
200
201 fn fetch(&self) -> Option<usize> {
202 self.fetch
203 }
204
205 fn cardinality_effect(&self) -> CardinalityEffect {
206 CardinalityEffect::Equal
207 }
208}
209
210/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
211struct CoalesceBatchesStream {
212 /// The input plan
213 input: SendableRecordBatchStream,
214 /// Buffer for combining batches
215 coalescer: BatchCoalescer,
216 /// Execution metrics
217 baseline_metrics: BaselineMetrics,
218 /// The current inner state of the stream. This state dictates the current
219 /// action or operation to be performed in the streaming process.
220 inner_state: CoalesceBatchesStreamState,
221}
222
223impl Stream for CoalesceBatchesStream {
224 type Item = Result<RecordBatch>;
225
226 fn poll_next(
227 mut self: Pin<&mut Self>,
228 cx: &mut Context<'_>,
229 ) -> Poll<Option<Self::Item>> {
230 let poll = self.poll_next_inner(cx);
231 self.baseline_metrics.record_poll(poll)
232 }
233
234 fn size_hint(&self) -> (usize, Option<usize>) {
235 // we can't predict the size of incoming batches so re-use the size hint from the input
236 self.input.size_hint()
237 }
238}
239
240/// Enumeration of possible states for `CoalesceBatchesStream`.
241/// It represents different stages in the lifecycle of a stream of record batches.
242///
243/// An example of state transition:
244/// Notation:
245/// `[3000]`: A batch with size 3000
246/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches buffered
247/// Input of `CoalesceBatchStream` will generate three batches `[2000], [3000], [4000]`
248/// The coalescing procedure will go through the following steps with 4096 coalescing threshold:
249/// 1. Read the first batch and get it buffered.
250/// - initial state: `Pull`
251/// - initial buffer: `{}`
252/// - updated buffer: `{[2000]}`
253/// - next state: `Pull`
254/// 2. Read the second batch, the coalescing target is reached since 2000 + 3000 > 4096
255/// - initial state: `Pull`
256/// - initial buffer: `{[2000]}`
257/// - updated buffer: `{[2000], [3000]}`
258/// - next state: `ReturnBuffer`
259/// 4. Two batches in the batch get merged and consumed by the upstream operator.
260/// - initial state: `ReturnBuffer`
261/// - initial buffer: `{[2000], [3000]}`
262/// - updated buffer: `{}`
263/// - next state: `Pull`
264/// 5. Read the third input batch.
265/// - initial state: `Pull`
266/// - initial buffer: `{}`
267/// - updated buffer: `{[4000]}`
268/// - next state: `Pull`
269/// 5. The input is ended now. Jump to exhaustion state preparing the finalized data.
270/// - initial state: `Pull`
271/// - initial buffer: `{[4000]}`
272/// - updated buffer: `{[4000]}`
273/// - next state: `Exhausted`
274#[derive(Debug, Clone, Eq, PartialEq)]
275enum CoalesceBatchesStreamState {
276 /// State to pull a new batch from the input stream.
277 Pull,
278 /// State to return a buffered batch.
279 ReturnBuffer,
280 /// State indicating that the stream is exhausted.
281 Exhausted,
282}
283
284impl CoalesceBatchesStream {
285 fn poll_next_inner(
286 self: &mut Pin<&mut Self>,
287 cx: &mut Context<'_>,
288 ) -> Poll<Option<Result<RecordBatch>>> {
289 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
290 loop {
291 match &self.inner_state {
292 CoalesceBatchesStreamState::Pull => {
293 // Attempt to pull the next batch from the input stream.
294 let input_batch = ready!(self.input.poll_next_unpin(cx));
295 // Start timing the operation. The timer records time upon being dropped.
296 let _timer = cloned_time.timer();
297
298 match input_batch {
299 Some(Ok(batch)) => match self.coalescer.push_batch(batch) {
300 CoalescerState::Continue => {}
301 CoalescerState::LimitReached => {
302 self.inner_state = CoalesceBatchesStreamState::Exhausted;
303 }
304 CoalescerState::TargetReached => {
305 self.inner_state =
306 CoalesceBatchesStreamState::ReturnBuffer;
307 }
308 },
309 None => {
310 // End of input stream, but buffered batches might still be present.
311 self.inner_state = CoalesceBatchesStreamState::Exhausted;
312 }
313 other => return Poll::Ready(other),
314 }
315 }
316 CoalesceBatchesStreamState::ReturnBuffer => {
317 // Combine buffered batches into one batch and return it.
318 let batch = self.coalescer.finish_batch()?;
319 // Set to pull state for the next iteration.
320 self.inner_state = CoalesceBatchesStreamState::Pull;
321 return Poll::Ready(Some(Ok(batch)));
322 }
323 CoalesceBatchesStreamState::Exhausted => {
324 // Handle the end of the input stream.
325 return if self.coalescer.is_empty() {
326 // If buffer is empty, return None indicating the stream is fully consumed.
327 Poll::Ready(None)
328 } else {
329 // If the buffer still contains batches, prepare to return them.
330 let batch = self.coalescer.finish_batch()?;
331 Poll::Ready(Some(Ok(batch)))
332 };
333 }
334 }
335 }
336 }
337}
338
339impl RecordBatchStream for CoalesceBatchesStream {
340 fn schema(&self) -> SchemaRef {
341 self.coalescer.schema()
342 }
343}