datafusion_physical_plan/coalesce/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{
19    builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch,
20    RecordBatchOptions,
21};
22use arrow::compute::concat_batches;
23use arrow::datatypes::SchemaRef;
24use std::sync::Arc;
25
26/// Concatenate multiple [`RecordBatch`]es
27///
28/// `BatchCoalescer` concatenates multiple small [`RecordBatch`]es, produced by
29/// operations such as `FilterExec` and `RepartitionExec`, into larger ones for
30/// more efficient processing by subsequent operations.
31///
32/// # Background
33///
34/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
35/// than smaller record batches (until the CPU cache is exceeded) because there
36/// is fixed processing overhead per batch. DataFusion tries to operate on
37/// batches of `target_batch_size` rows to amortize this overhead
38///
39/// ```text
40/// ┌────────────────────┐
41/// │    RecordBatch     │
42/// │   num_rows = 23    │
43/// └────────────────────┘                 ┌────────────────────┐
44///                                        │                    │
45/// ┌────────────────────┐     Coalesce    │                    │
46/// │                    │      Batches    │                    │
47/// │    RecordBatch     │                 │                    │
48/// │   num_rows = 50    │  ─ ─ ─ ─ ─ ─ ▶  │                    │
49/// │                    │                 │    RecordBatch     │
50/// │                    │                 │   num_rows = 106   │
51/// └────────────────────┘                 │                    │
52///                                        │                    │
53/// ┌────────────────────┐                 │                    │
54/// │                    │                 │                    │
55/// │    RecordBatch     │                 │                    │
56/// │   num_rows = 33    │                 └────────────────────┘
57/// │                    │
58/// └────────────────────┘
59/// ```
60///
61/// # Notes:
62///
63/// 1. Output rows are produced in the same order as the input rows
64///
65/// 2. The output is a sequence of batches, with all but the last being at least
66///    `target_batch_size` rows.
67///
68/// 3. Eventually this may also be able to handle other optimizations such as a
69///    combined filter/coalesce operation.
70///
71#[derive(Debug)]
72pub struct BatchCoalescer {
73    /// The input schema
74    schema: SchemaRef,
75    /// Minimum number of rows for coalesces batches
76    target_batch_size: usize,
77    /// Total number of rows returned so far
78    total_rows: usize,
79    /// Buffered batches
80    buffer: Vec<RecordBatch>,
81    /// Buffered row count
82    buffered_rows: usize,
83    /// Limit: maximum number of rows to fetch, `None` means fetch all rows
84    fetch: Option<usize>,
85}
86
87impl BatchCoalescer {
88    /// Create a new `BatchCoalescer`
89    ///
90    /// # Arguments
91    /// - `schema` - the schema of the output batches
92    /// - `target_batch_size` - the minimum number of rows for each
93    ///    output batch (until limit reached)
94    /// - `fetch` - the maximum number of rows to fetch, `None` means fetch all rows
95    pub fn new(
96        schema: SchemaRef,
97        target_batch_size: usize,
98        fetch: Option<usize>,
99    ) -> Self {
100        Self {
101            schema,
102            target_batch_size,
103            total_rows: 0,
104            buffer: vec![],
105            buffered_rows: 0,
106            fetch,
107        }
108    }
109
110    /// Return the schema of the output batches
111    pub fn schema(&self) -> SchemaRef {
112        Arc::clone(&self.schema)
113    }
114
115    /// Push next batch, and returns [`CoalescerState`] indicating the current
116    /// state of the buffer.
117    pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
118        let batch = gc_string_view_batch(&batch);
119        if self.limit_reached(&batch) {
120            CoalescerState::LimitReached
121        } else if self.target_reached(batch) {
122            CoalescerState::TargetReached
123        } else {
124            CoalescerState::Continue
125        }
126    }
127
128    /// Return true if the there is no data buffered
129    pub fn is_empty(&self) -> bool {
130        self.buffer.is_empty()
131    }
132
133    /// Checks if the buffer will reach the specified limit after getting
134    /// `batch`.
135    ///
136    /// If fetch would be exceeded, slices the received batch, updates the
137    /// buffer with it, and returns `true`.
138    ///
139    /// Otherwise: does nothing and returns `false`.
140    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
141        match self.fetch {
142            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
143                // Limit is reached
144                let remaining_rows = fetch - self.total_rows;
145                debug_assert!(remaining_rows > 0);
146
147                let batch = batch.slice(0, remaining_rows);
148                self.buffered_rows += batch.num_rows();
149                self.total_rows = fetch;
150                self.buffer.push(batch);
151                true
152            }
153            _ => false,
154        }
155    }
156
157    /// Updates the buffer with the given batch.
158    ///
159    /// If the target batch size is reached, returns `true`. Otherwise, returns
160    /// `false`.
161    fn target_reached(&mut self, batch: RecordBatch) -> bool {
162        if batch.num_rows() == 0 {
163            false
164        } else {
165            self.total_rows += batch.num_rows();
166            self.buffered_rows += batch.num_rows();
167            self.buffer.push(batch);
168            self.buffered_rows >= self.target_batch_size
169        }
170    }
171
172    /// Concatenates and returns all buffered batches, and clears the buffer.
173    pub fn finish_batch(&mut self) -> datafusion_common::Result<RecordBatch> {
174        let batch = concat_batches(&self.schema, &self.buffer)?;
175        self.buffer.clear();
176        self.buffered_rows = 0;
177        Ok(batch)
178    }
179}
180
181/// Indicates the state of the [`BatchCoalescer`] buffer after the
182/// [`BatchCoalescer::push_batch()`] operation.
183///
184/// The caller should take different actions, depending on the variant returned.
185pub enum CoalescerState {
186    /// Neither the limit nor the target batch size is reached.
187    ///
188    /// Action: continue pushing batches.
189    Continue,
190    /// The limit has been reached.
191    ///
192    /// Action: call [`BatchCoalescer::finish_batch()`] to get the final
193    /// buffered results as a batch and finish the query.
194    LimitReached,
195    /// The specified minimum number of rows a batch should have is reached.
196    ///
197    /// Action: call [`BatchCoalescer::finish_batch()`] to get the current
198    /// buffered results as a batch and then continue pushing batches.
199    TargetReached,
200}
201
202/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
203///
204/// Decides when to consolidate the StringView into a new buffer to reduce
205/// memory usage and improve string locality for better performance.
206///
207/// This differs from `StringViewArray::gc` because:
208/// 1. It may not compact the array depending on a heuristic.
209/// 2. It uses a precise block size to reduce the number of buffers to track.
210///
211/// # Heuristic
212///
213/// If the average size of each view is larger than 32 bytes, we compact the array.
214///
215/// `StringViewArray` include pointers to buffer that hold the underlying data.
216/// One of the great benefits of `StringViewArray` is that many operations
217/// (e.g., `filter`) can be done without copying the underlying data.
218///
219/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
220/// `StringViewArray` may only refer to a small portion of the buffer,
221/// significantly increasing memory usage.
222fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
223    let new_columns: Vec<ArrayRef> = batch
224        .columns()
225        .iter()
226        .map(|c| {
227            // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
228            let Some(s) = c.as_string_view_opt() else {
229                return Arc::clone(c);
230            };
231            let ideal_buffer_size: usize = s
232                .views()
233                .iter()
234                .map(|v| {
235                    let len = (*v as u32) as usize;
236                    if len > 12 {
237                        len
238                    } else {
239                        0
240                    }
241                })
242                .sum();
243            let actual_buffer_size = s.get_buffer_memory_size();
244
245            // Re-creating the array copies data and can be time consuming.
246            // We only do it if the array is sparse
247            if actual_buffer_size > (ideal_buffer_size * 2) {
248                // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
249                // See https://github.com/apache/arrow-rs/issues/6094 for more details.
250                let mut builder = StringViewBuilder::with_capacity(s.len());
251                if ideal_buffer_size > 0 {
252                    builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
253                }
254
255                for v in s.iter() {
256                    builder.append_option(v);
257                }
258
259                let gc_string = builder.finish();
260
261                debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
262
263                Arc::new(gc_string)
264            } else {
265                Arc::clone(c)
266            }
267        })
268        .collect();
269    let mut options = RecordBatchOptions::new();
270    options = options.with_row_count(Some(batch.num_rows()));
271    RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
272        .expect("Failed to re-create the gc'ed record batch")
273}
274
275#[cfg(test)]
276mod tests {
277    use std::ops::Range;
278
279    use super::*;
280
281    use arrow::array::{builder::ArrayBuilder, StringViewArray, UInt32Array};
282    use arrow::datatypes::{DataType, Field, Schema};
283
284    #[test]
285    fn test_coalesce() {
286        let batch = uint32_batch(0..8);
287        Test::new()
288            .with_batches(std::iter::repeat(batch).take(10))
289            // expected output is batches of at least 20 rows (except for the final batch)
290            .with_target_batch_size(21)
291            .with_expected_output_sizes(vec![24, 24, 24, 8])
292            .run()
293    }
294
295    #[test]
296    fn test_coalesce_with_fetch_larger_than_input_size() {
297        let batch = uint32_batch(0..8);
298        Test::new()
299            .with_batches(std::iter::repeat(batch).take(10))
300            // input is 10 batches x 8 rows (80 rows) with fetch limit of 100
301            // expected to behave the same as `test_concat_batches`
302            .with_target_batch_size(21)
303            .with_fetch(Some(100))
304            .with_expected_output_sizes(vec![24, 24, 24, 8])
305            .run();
306    }
307
308    #[test]
309    fn test_coalesce_with_fetch_less_than_input_size() {
310        let batch = uint32_batch(0..8);
311        Test::new()
312            .with_batches(std::iter::repeat(batch).take(10))
313            // input is 10 batches x 8 rows (80 rows) with fetch limit of 50
314            .with_target_batch_size(21)
315            .with_fetch(Some(50))
316            .with_expected_output_sizes(vec![24, 24, 2])
317            .run();
318    }
319
320    #[test]
321    fn test_coalesce_with_fetch_less_than_target_and_no_remaining_rows() {
322        let batch = uint32_batch(0..8);
323        Test::new()
324            .with_batches(std::iter::repeat(batch).take(10))
325            // input is 10 batches x 8 rows (80 rows) with fetch limit of 48
326            .with_target_batch_size(21)
327            .with_fetch(Some(48))
328            .with_expected_output_sizes(vec![24, 24])
329            .run();
330    }
331
332    #[test]
333    fn test_coalesce_with_fetch_less_target_batch_size() {
334        let batch = uint32_batch(0..8);
335        Test::new()
336            .with_batches(std::iter::repeat(batch).take(10))
337            // input is 10 batches x 8 rows (80 rows) with fetch limit of 10
338            .with_target_batch_size(21)
339            .with_fetch(Some(10))
340            .with_expected_output_sizes(vec![10])
341            .run();
342    }
343
344    #[test]
345    fn test_coalesce_single_large_batch_over_fetch() {
346        let large_batch = uint32_batch(0..100);
347        Test::new()
348            .with_batch(large_batch)
349            .with_target_batch_size(20)
350            .with_fetch(Some(7))
351            .with_expected_output_sizes(vec![7])
352            .run()
353    }
354
355    /// Test for [`BatchCoalescer`]
356    ///
357    /// Pushes the input batches to the coalescer and verifies that the resulting
358    /// batches have the expected number of rows and contents.
359    #[derive(Debug, Clone, Default)]
360    struct Test {
361        /// Batches to feed to the coalescer. Tests must have at least one
362        /// schema
363        input_batches: Vec<RecordBatch>,
364        /// Expected output sizes of the resulting batches
365        expected_output_sizes: Vec<usize>,
366        /// target batch size
367        target_batch_size: usize,
368        /// Fetch (limit)
369        fetch: Option<usize>,
370    }
371
372    impl Test {
373        fn new() -> Self {
374            Self::default()
375        }
376
377        /// Set the target batch size
378        fn with_target_batch_size(mut self, target_batch_size: usize) -> Self {
379            self.target_batch_size = target_batch_size;
380            self
381        }
382
383        /// Set the fetch (limit)
384        fn with_fetch(mut self, fetch: Option<usize>) -> Self {
385            self.fetch = fetch;
386            self
387        }
388
389        /// Extend the input batches with `batch`
390        fn with_batch(mut self, batch: RecordBatch) -> Self {
391            self.input_batches.push(batch);
392            self
393        }
394
395        /// Extends the input batches with `batches`
396        fn with_batches(
397            mut self,
398            batches: impl IntoIterator<Item = RecordBatch>,
399        ) -> Self {
400            self.input_batches.extend(batches);
401            self
402        }
403
404        /// Extends `sizes` to expected output sizes
405        fn with_expected_output_sizes(
406            mut self,
407            sizes: impl IntoIterator<Item = usize>,
408        ) -> Self {
409            self.expected_output_sizes.extend(sizes);
410            self
411        }
412
413        /// Runs the test -- see documentation on [`Test`] for details
414        fn run(self) {
415            let Self {
416                input_batches,
417                target_batch_size,
418                fetch,
419                expected_output_sizes,
420            } = self;
421
422            let schema = input_batches[0].schema();
423
424            // create a single large input batch for output comparison
425            let single_input_batch = concat_batches(&schema, &input_batches).unwrap();
426
427            let mut coalescer =
428                BatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch);
429
430            let mut output_batches = vec![];
431            for batch in input_batches {
432                match coalescer.push_batch(batch) {
433                    CoalescerState::Continue => {}
434                    CoalescerState::LimitReached => {
435                        output_batches.push(coalescer.finish_batch().unwrap());
436                        break;
437                    }
438                    CoalescerState::TargetReached => {
439                        coalescer.buffered_rows = 0;
440                        output_batches.push(coalescer.finish_batch().unwrap());
441                    }
442                }
443            }
444            if coalescer.buffered_rows != 0 {
445                output_batches.extend(coalescer.buffer);
446            }
447
448            // make sure we got the expected number of output batches and content
449            let mut starting_idx = 0;
450            assert_eq!(expected_output_sizes.len(), output_batches.len());
451            for (i, (expected_size, batch)) in
452                expected_output_sizes.iter().zip(output_batches).enumerate()
453            {
454                assert_eq!(
455                    *expected_size,
456                    batch.num_rows(),
457                    "Unexpected number of rows in Batch {i}"
458                );
459
460                // compare the contents of the batch (using `==` compares the
461                // underlying memory layout too)
462                let expected_batch =
463                    single_input_batch.slice(starting_idx, *expected_size);
464                let batch_strings = batch_to_pretty_strings(&batch);
465                let expected_batch_strings = batch_to_pretty_strings(&expected_batch);
466                let batch_strings = batch_strings.lines().collect::<Vec<_>>();
467                let expected_batch_strings =
468                    expected_batch_strings.lines().collect::<Vec<_>>();
469                assert_eq!(
470                    expected_batch_strings, batch_strings,
471                    "Unexpected content in Batch {i}:\
472                    \n\nExpected:\n{expected_batch_strings:#?}\n\nActual:\n{batch_strings:#?}"
473                );
474                starting_idx += *expected_size;
475            }
476        }
477    }
478
479    /// Return a batch of  UInt32 with the specified range
480    fn uint32_batch(range: Range<u32>) -> RecordBatch {
481        let schema =
482            Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
483
484        RecordBatch::try_new(
485            Arc::clone(&schema),
486            vec![Arc::new(UInt32Array::from_iter_values(range))],
487        )
488        .unwrap()
489    }
490
491    #[test]
492    fn test_gc_string_view_batch_small_no_compact() {
493        // view with only short strings (no buffers) --> no need to compact
494        let array = StringViewTest {
495            rows: 1000,
496            strings: vec![Some("a"), Some("b"), Some("c")],
497        }
498        .build();
499
500        let gc_array = do_gc(array.clone());
501        compare_string_array_values(&array, &gc_array);
502        assert_eq!(array.data_buffers().len(), 0);
503        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
504    }
505
506    #[test]
507    fn test_gc_string_view_test_batch_empty() {
508        let schema = Schema::empty();
509        let batch = RecordBatch::new_empty(schema.into());
510        let output_batch = gc_string_view_batch(&batch);
511        assert_eq!(batch.num_columns(), output_batch.num_columns());
512        assert_eq!(batch.num_rows(), output_batch.num_rows());
513    }
514
515    #[test]
516    fn test_gc_string_view_batch_large_no_compact() {
517        // view with large strings (has buffers) but full --> no need to compact
518        let array = StringViewTest {
519            rows: 1000,
520            strings: vec![Some("This string is longer than 12 bytes")],
521        }
522        .build();
523
524        let gc_array = do_gc(array.clone());
525        compare_string_array_values(&array, &gc_array);
526        assert_eq!(array.data_buffers().len(), 5);
527        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
528    }
529
530    #[test]
531    fn test_gc_string_view_batch_large_slice_compact() {
532        // view with large strings (has buffers) and only partially used  --> no need to compact
533        let array = StringViewTest {
534            rows: 1000,
535            strings: vec![Some("this string is longer than 12 bytes")],
536        }
537        .build();
538
539        // slice only 11 rows, so most of the buffer is not used
540        let array = array.slice(11, 22);
541
542        let gc_array = do_gc(array.clone());
543        compare_string_array_values(&array, &gc_array);
544        assert_eq!(array.data_buffers().len(), 5);
545        assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer
546    }
547
548    /// Compares the values of two string view arrays
549    fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) {
550        assert_eq!(arr1.len(), arr2.len());
551        for (s1, s2) in arr1.iter().zip(arr2.iter()) {
552            assert_eq!(s1, s2);
553        }
554    }
555
556    /// runs garbage collection on string view array
557    /// and ensures the number of rows are the same
558    fn do_gc(array: StringViewArray) -> StringViewArray {
559        let batch =
560            RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
561        let gc_batch = gc_string_view_batch(&batch);
562        assert_eq!(batch.num_rows(), gc_batch.num_rows());
563        assert_eq!(batch.schema(), gc_batch.schema());
564        gc_batch
565            .column(0)
566            .as_any()
567            .downcast_ref::<StringViewArray>()
568            .unwrap()
569            .clone()
570    }
571
572    /// Describes parameters for creating a `StringViewArray`
573    struct StringViewTest {
574        /// The number of rows in the array
575        rows: usize,
576        /// The strings to use in the array (repeated over and over
577        strings: Vec<Option<&'static str>>,
578    }
579
580    impl StringViewTest {
581        /// Create a `StringViewArray` with the parameters specified in this struct
582        fn build(self) -> StringViewArray {
583            let mut builder =
584                StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
585            loop {
586                for &v in self.strings.iter() {
587                    builder.append_option(v);
588                    if builder.len() >= self.rows {
589                        return builder.finish();
590                    }
591                }
592            }
593        }
594    }
595    fn batch_to_pretty_strings(batch: &RecordBatch) -> String {
596        arrow::util::pretty::pretty_format_batches(&[batch.clone()])
597            .unwrap()
598            .to_string()
599    }
600}