datafusion_physical_plan/coalesce/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19 builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch,
20 RecordBatchOptions,
21};
22use arrow::compute::concat_batches;
23use arrow::datatypes::SchemaRef;
24use std::sync::Arc;
25
26/// Concatenate multiple [`RecordBatch`]es
27///
28/// `BatchCoalescer` concatenates multiple small [`RecordBatch`]es, produced by
29/// operations such as `FilterExec` and `RepartitionExec`, into larger ones for
30/// more efficient processing by subsequent operations.
31///
32/// # Background
33///
34/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
35/// than smaller record batches (until the CPU cache is exceeded) because there
36/// is fixed processing overhead per batch. DataFusion tries to operate on
37/// batches of `target_batch_size` rows to amortize this overhead
38///
39/// ```text
40/// ┌────────────────────┐
41/// │ RecordBatch │
42/// │ num_rows = 23 │
43/// └────────────────────┘ ┌────────────────────┐
44/// │ │
45/// ┌────────────────────┐ Coalesce │ │
46/// │ │ Batches │ │
47/// │ RecordBatch │ │ │
48/// │ num_rows = 50 │ ─ ─ ─ ─ ─ ─ ▶ │ │
49/// │ │ │ RecordBatch │
50/// │ │ │ num_rows = 106 │
51/// └────────────────────┘ │ │
52/// │ │
53/// ┌────────────────────┐ │ │
54/// │ │ │ │
55/// │ RecordBatch │ │ │
56/// │ num_rows = 33 │ └────────────────────┘
57/// │ │
58/// └────────────────────┘
59/// ```
60///
61/// # Notes:
62///
63/// 1. Output rows are produced in the same order as the input rows
64///
65/// 2. The output is a sequence of batches, with all but the last being at least
66/// `target_batch_size` rows.
67///
68/// 3. Eventually this may also be able to handle other optimizations such as a
69/// combined filter/coalesce operation.
70///
71#[derive(Debug)]
72pub struct BatchCoalescer {
73 /// The input schema
74 schema: SchemaRef,
75 /// Minimum number of rows for coalesces batches
76 target_batch_size: usize,
77 /// Total number of rows returned so far
78 total_rows: usize,
79 /// Buffered batches
80 buffer: Vec<RecordBatch>,
81 /// Buffered row count
82 buffered_rows: usize,
83 /// Limit: maximum number of rows to fetch, `None` means fetch all rows
84 fetch: Option<usize>,
85}
86
87impl BatchCoalescer {
88 /// Create a new `BatchCoalescer`
89 ///
90 /// # Arguments
91 /// - `schema` - the schema of the output batches
92 /// - `target_batch_size` - the minimum number of rows for each
93 /// output batch (until limit reached)
94 /// - `fetch` - the maximum number of rows to fetch, `None` means fetch all rows
95 pub fn new(
96 schema: SchemaRef,
97 target_batch_size: usize,
98 fetch: Option<usize>,
99 ) -> Self {
100 Self {
101 schema,
102 target_batch_size,
103 total_rows: 0,
104 buffer: vec![],
105 buffered_rows: 0,
106 fetch,
107 }
108 }
109
110 /// Return the schema of the output batches
111 pub fn schema(&self) -> SchemaRef {
112 Arc::clone(&self.schema)
113 }
114
115 /// Push next batch, and returns [`CoalescerState`] indicating the current
116 /// state of the buffer.
117 pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
118 let batch = gc_string_view_batch(&batch);
119 if self.limit_reached(&batch) {
120 CoalescerState::LimitReached
121 } else if self.target_reached(batch) {
122 CoalescerState::TargetReached
123 } else {
124 CoalescerState::Continue
125 }
126 }
127
128 /// Return true if the there is no data buffered
129 pub fn is_empty(&self) -> bool {
130 self.buffer.is_empty()
131 }
132
133 /// Checks if the buffer will reach the specified limit after getting
134 /// `batch`.
135 ///
136 /// If fetch would be exceeded, slices the received batch, updates the
137 /// buffer with it, and returns `true`.
138 ///
139 /// Otherwise: does nothing and returns `false`.
140 fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
141 match self.fetch {
142 Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
143 // Limit is reached
144 let remaining_rows = fetch - self.total_rows;
145 debug_assert!(remaining_rows > 0);
146
147 let batch = batch.slice(0, remaining_rows);
148 self.buffered_rows += batch.num_rows();
149 self.total_rows = fetch;
150 self.buffer.push(batch);
151 true
152 }
153 _ => false,
154 }
155 }
156
157 /// Updates the buffer with the given batch.
158 ///
159 /// If the target batch size is reached, returns `true`. Otherwise, returns
160 /// `false`.
161 fn target_reached(&mut self, batch: RecordBatch) -> bool {
162 if batch.num_rows() == 0 {
163 false
164 } else {
165 self.total_rows += batch.num_rows();
166 self.buffered_rows += batch.num_rows();
167 self.buffer.push(batch);
168 self.buffered_rows >= self.target_batch_size
169 }
170 }
171
172 /// Concatenates and returns all buffered batches, and clears the buffer.
173 pub fn finish_batch(&mut self) -> datafusion_common::Result<RecordBatch> {
174 let batch = concat_batches(&self.schema, &self.buffer)?;
175 self.buffer.clear();
176 self.buffered_rows = 0;
177 Ok(batch)
178 }
179}
180
181/// Indicates the state of the [`BatchCoalescer`] buffer after the
182/// [`BatchCoalescer::push_batch()`] operation.
183///
184/// The caller should take different actions, depending on the variant returned.
185pub enum CoalescerState {
186 /// Neither the limit nor the target batch size is reached.
187 ///
188 /// Action: continue pushing batches.
189 Continue,
190 /// The limit has been reached.
191 ///
192 /// Action: call [`BatchCoalescer::finish_batch()`] to get the final
193 /// buffered results as a batch and finish the query.
194 LimitReached,
195 /// The specified minimum number of rows a batch should have is reached.
196 ///
197 /// Action: call [`BatchCoalescer::finish_batch()`] to get the current
198 /// buffered results as a batch and then continue pushing batches.
199 TargetReached,
200}
201
202/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
203///
204/// Decides when to consolidate the StringView into a new buffer to reduce
205/// memory usage and improve string locality for better performance.
206///
207/// This differs from `StringViewArray::gc` because:
208/// 1. It may not compact the array depending on a heuristic.
209/// 2. It uses a precise block size to reduce the number of buffers to track.
210///
211/// # Heuristic
212///
213/// If the average size of each view is larger than 32 bytes, we compact the array.
214///
215/// `StringViewArray` include pointers to buffer that hold the underlying data.
216/// One of the great benefits of `StringViewArray` is that many operations
217/// (e.g., `filter`) can be done without copying the underlying data.
218///
219/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
220/// `StringViewArray` may only refer to a small portion of the buffer,
221/// significantly increasing memory usage.
222fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
223 let new_columns: Vec<ArrayRef> = batch
224 .columns()
225 .iter()
226 .map(|c| {
227 // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
228 let Some(s) = c.as_string_view_opt() else {
229 return Arc::clone(c);
230 };
231 let ideal_buffer_size: usize = s
232 .views()
233 .iter()
234 .map(|v| {
235 let len = (*v as u32) as usize;
236 if len > 12 {
237 len
238 } else {
239 0
240 }
241 })
242 .sum();
243 let actual_buffer_size = s.get_buffer_memory_size();
244
245 // Re-creating the array copies data and can be time consuming.
246 // We only do it if the array is sparse
247 if actual_buffer_size > (ideal_buffer_size * 2) {
248 // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
249 // See https://github.com/apache/arrow-rs/issues/6094 for more details.
250 let mut builder = StringViewBuilder::with_capacity(s.len());
251 if ideal_buffer_size > 0 {
252 builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
253 }
254
255 for v in s.iter() {
256 builder.append_option(v);
257 }
258
259 let gc_string = builder.finish();
260
261 debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
262
263 Arc::new(gc_string)
264 } else {
265 Arc::clone(c)
266 }
267 })
268 .collect();
269 let mut options = RecordBatchOptions::new();
270 options = options.with_row_count(Some(batch.num_rows()));
271 RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
272 .expect("Failed to re-create the gc'ed record batch")
273}
274
275#[cfg(test)]
276mod tests {
277 use std::ops::Range;
278
279 use super::*;
280
281 use arrow::array::{builder::ArrayBuilder, StringViewArray, UInt32Array};
282 use arrow::datatypes::{DataType, Field, Schema};
283
284 #[test]
285 fn test_coalesce() {
286 let batch = uint32_batch(0..8);
287 Test::new()
288 .with_batches(std::iter::repeat(batch).take(10))
289 // expected output is batches of at least 20 rows (except for the final batch)
290 .with_target_batch_size(21)
291 .with_expected_output_sizes(vec![24, 24, 24, 8])
292 .run()
293 }
294
295 #[test]
296 fn test_coalesce_with_fetch_larger_than_input_size() {
297 let batch = uint32_batch(0..8);
298 Test::new()
299 .with_batches(std::iter::repeat(batch).take(10))
300 // input is 10 batches x 8 rows (80 rows) with fetch limit of 100
301 // expected to behave the same as `test_concat_batches`
302 .with_target_batch_size(21)
303 .with_fetch(Some(100))
304 .with_expected_output_sizes(vec![24, 24, 24, 8])
305 .run();
306 }
307
308 #[test]
309 fn test_coalesce_with_fetch_less_than_input_size() {
310 let batch = uint32_batch(0..8);
311 Test::new()
312 .with_batches(std::iter::repeat(batch).take(10))
313 // input is 10 batches x 8 rows (80 rows) with fetch limit of 50
314 .with_target_batch_size(21)
315 .with_fetch(Some(50))
316 .with_expected_output_sizes(vec![24, 24, 2])
317 .run();
318 }
319
320 #[test]
321 fn test_coalesce_with_fetch_less_than_target_and_no_remaining_rows() {
322 let batch = uint32_batch(0..8);
323 Test::new()
324 .with_batches(std::iter::repeat(batch).take(10))
325 // input is 10 batches x 8 rows (80 rows) with fetch limit of 48
326 .with_target_batch_size(21)
327 .with_fetch(Some(48))
328 .with_expected_output_sizes(vec![24, 24])
329 .run();
330 }
331
332 #[test]
333 fn test_coalesce_with_fetch_less_target_batch_size() {
334 let batch = uint32_batch(0..8);
335 Test::new()
336 .with_batches(std::iter::repeat(batch).take(10))
337 // input is 10 batches x 8 rows (80 rows) with fetch limit of 10
338 .with_target_batch_size(21)
339 .with_fetch(Some(10))
340 .with_expected_output_sizes(vec![10])
341 .run();
342 }
343
344 #[test]
345 fn test_coalesce_single_large_batch_over_fetch() {
346 let large_batch = uint32_batch(0..100);
347 Test::new()
348 .with_batch(large_batch)
349 .with_target_batch_size(20)
350 .with_fetch(Some(7))
351 .with_expected_output_sizes(vec![7])
352 .run()
353 }
354
355 /// Test for [`BatchCoalescer`]
356 ///
357 /// Pushes the input batches to the coalescer and verifies that the resulting
358 /// batches have the expected number of rows and contents.
359 #[derive(Debug, Clone, Default)]
360 struct Test {
361 /// Batches to feed to the coalescer. Tests must have at least one
362 /// schema
363 input_batches: Vec<RecordBatch>,
364 /// Expected output sizes of the resulting batches
365 expected_output_sizes: Vec<usize>,
366 /// target batch size
367 target_batch_size: usize,
368 /// Fetch (limit)
369 fetch: Option<usize>,
370 }
371
372 impl Test {
373 fn new() -> Self {
374 Self::default()
375 }
376
377 /// Set the target batch size
378 fn with_target_batch_size(mut self, target_batch_size: usize) -> Self {
379 self.target_batch_size = target_batch_size;
380 self
381 }
382
383 /// Set the fetch (limit)
384 fn with_fetch(mut self, fetch: Option<usize>) -> Self {
385 self.fetch = fetch;
386 self
387 }
388
389 /// Extend the input batches with `batch`
390 fn with_batch(mut self, batch: RecordBatch) -> Self {
391 self.input_batches.push(batch);
392 self
393 }
394
395 /// Extends the input batches with `batches`
396 fn with_batches(
397 mut self,
398 batches: impl IntoIterator<Item = RecordBatch>,
399 ) -> Self {
400 self.input_batches.extend(batches);
401 self
402 }
403
404 /// Extends `sizes` to expected output sizes
405 fn with_expected_output_sizes(
406 mut self,
407 sizes: impl IntoIterator<Item = usize>,
408 ) -> Self {
409 self.expected_output_sizes.extend(sizes);
410 self
411 }
412
413 /// Runs the test -- see documentation on [`Test`] for details
414 fn run(self) {
415 let Self {
416 input_batches,
417 target_batch_size,
418 fetch,
419 expected_output_sizes,
420 } = self;
421
422 let schema = input_batches[0].schema();
423
424 // create a single large input batch for output comparison
425 let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
426
427 let mut coalescer =
428 BatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch);
429
430 let mut output_batches = vec![];
431 for batch in input_batches {
432 match coalescer.push_batch(batch) {
433 CoalescerState::Continue => {}
434 CoalescerState::LimitReached => {
435 output_batches.push(coalescer.finish_batch().unwrap());
436 break;
437 }
438 CoalescerState::TargetReached => {
439 coalescer.buffered_rows = 0;
440 output_batches.push(coalescer.finish_batch().unwrap());
441 }
442 }
443 }
444 if coalescer.buffered_rows != 0 {
445 output_batches.extend(coalescer.buffer);
446 }
447
448 // make sure we got the expected number of output batches and content
449 let mut starting_idx = 0;
450 assert_eq!(expected_output_sizes.len(), output_batches.len());
451 for (i, (expected_size, batch)) in
452 expected_output_sizes.iter().zip(output_batches).enumerate()
453 {
454 assert_eq!(
455 *expected_size,
456 batch.num_rows(),
457 "Unexpected number of rows in Batch {i}"
458 );
459
460 // compare the contents of the batch (using `==` compares the
461 // underlying memory layout too)
462 let expected_batch =
463 single_input_batch.slice(starting_idx, *expected_size);
464 let batch_strings = batch_to_pretty_strings(&batch);
465 let expected_batch_strings = batch_to_pretty_strings(&expected_batch);
466 let batch_strings = batch_strings.lines().collect::<Vec<_>>();
467 let expected_batch_strings =
468 expected_batch_strings.lines().collect::<Vec<_>>();
469 assert_eq!(
470 expected_batch_strings, batch_strings,
471 "Unexpected content in Batch {i}:\
472 \n\nExpected:\n{expected_batch_strings:#?}\n\nActual:\n{batch_strings:#?}"
473 );
474 starting_idx += *expected_size;
475 }
476 }
477 }
478
479 /// Return a batch of UInt32 with the specified range
480 fn uint32_batch(range: Range<u32>) -> RecordBatch {
481 let schema =
482 Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
483
484 RecordBatch::try_new(
485 Arc::clone(&schema),
486 vec![Arc::new(UInt32Array::from_iter_values(range))],
487 )
488 .unwrap()
489 }
490
491 #[test]
492 fn test_gc_string_view_batch_small_no_compact() {
493 // view with only short strings (no buffers) --> no need to compact
494 let array = StringViewTest {
495 rows: 1000,
496 strings: vec![Some("a"), Some("b"), Some("c")],
497 }
498 .build();
499
500 let gc_array = do_gc(array.clone());
501 compare_string_array_values(&array, &gc_array);
502 assert_eq!(array.data_buffers().len(), 0);
503 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
504 }
505
506 #[test]
507 fn test_gc_string_view_test_batch_empty() {
508 let schema = Schema::empty();
509 let batch = RecordBatch::new_empty(schema.into());
510 let output_batch = gc_string_view_batch(&batch);
511 assert_eq!(batch.num_columns(), output_batch.num_columns());
512 assert_eq!(batch.num_rows(), output_batch.num_rows());
513 }
514
515 #[test]
516 fn test_gc_string_view_batch_large_no_compact() {
517 // view with large strings (has buffers) but full --> no need to compact
518 let array = StringViewTest {
519 rows: 1000,
520 strings: vec![Some("This string is longer than 12 bytes")],
521 }
522 .build();
523
524 let gc_array = do_gc(array.clone());
525 compare_string_array_values(&array, &gc_array);
526 assert_eq!(array.data_buffers().len(), 5);
527 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
528 }
529
530 #[test]
531 fn test_gc_string_view_batch_large_slice_compact() {
532 // view with large strings (has buffers) and only partially used --> no need to compact
533 let array = StringViewTest {
534 rows: 1000,
535 strings: vec![Some("this string is longer than 12 bytes")],
536 }
537 .build();
538
539 // slice only 11 rows, so most of the buffer is not used
540 let array = array.slice(11, 22);
541
542 let gc_array = do_gc(array.clone());
543 compare_string_array_values(&array, &gc_array);
544 assert_eq!(array.data_buffers().len(), 5);
545 assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
546 }
547
548 /// Compares the values of two string view arrays
549 fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
550 assert_eq!(arr1.len(), arr2.len());
551 for (s1, s2) in arr1.iter().zip(arr2.iter()) {
552 assert_eq!(s1, s2);
553 }
554 }
555
556 /// runs garbage collection on string view array
557 /// and ensures the number of rows are the same
558 fn do_gc(array: StringViewArray) -> StringViewArray {
559 let batch =
560 RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
561 let gc_batch = gc_string_view_batch(&batch);
562 assert_eq!(batch.num_rows(), gc_batch.num_rows());
563 assert_eq!(batch.schema(), gc_batch.schema());
564 gc_batch
565 .column(0)
566 .as_any()
567 .downcast_ref::<StringViewArray>()
568 .unwrap()
569 .clone()
570 }
571
572 /// Describes parameters for creating a `StringViewArray`
573 struct StringViewTest {
574 /// The number of rows in the array
575 rows: usize,
576 /// The strings to use in the array (repeated over and over
577 strings: Vec<Option<&'static str>>,
578 }
579
580 impl StringViewTest {
581 /// Create a `StringViewArray` with the parameters specified in this struct
582 fn build(self) -> StringViewArray {
583 let mut builder =
584 StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
585 loop {
586 for &v in self.strings.iter() {
587 builder.append_option(v);
588 if builder.len() >= self.rows {
589 return builder.finish();
590 }
591 }
592 }
593 }
594 }
595 fn batch_to_pretty_strings(batch: &RecordBatch) -> String {
596 arrow::util::pretty::pretty_format_batches(&[batch.clone()])
597 .unwrap()
598 .to_string()
599 }
600}