datafusion_physical_plan/
spill.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the spilling functions
19
20use std::fs::File;
21use std::io::BufReader;
22use std::path::{Path, PathBuf};
23use std::ptr::NonNull;
24
25use arrow::array::ArrayData;
26use arrow::datatypes::{Schema, SchemaRef};
27use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
28use arrow::record_batch::RecordBatch;
29use log::debug;
30use tokio::sync::mpsc::Sender;
31
32use datafusion_common::{exec_datafusion_err, HashSet, Result};
33use datafusion_execution::disk_manager::RefCountedTempFile;
34use datafusion_execution::memory_pool::human_readable_size;
35use datafusion_execution::SendableRecordBatchStream;
36
37use crate::stream::RecordBatchReceiverStream;
38
39/// Read spilled batches from the disk
40///
41/// `path` - temp file
42/// `schema` - batches schema, should be the same across batches
43/// `buffer` - internal buffer of capacity batches
44pub(crate) fn read_spill_as_stream(
45    path: RefCountedTempFile,
46    schema: SchemaRef,
47    buffer: usize,
48) -> Result<SendableRecordBatchStream> {
49    let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
50    let sender = builder.tx();
51
52    builder.spawn_blocking(move || read_spill(sender, path.path()));
53
54    Ok(builder.build())
55}
56
57/// Spills in-memory `batches` to disk.
58///
59/// Returns total number of the rows spilled to disk.
60pub(crate) fn spill_record_batches(
61    batches: &[RecordBatch],
62    path: PathBuf,
63    schema: SchemaRef,
64) -> Result<(usize, usize)> {
65    let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
66    for batch in batches {
67        writer.write(batch)?;
68    }
69    writer.finish()?;
70    debug!(
71        "Spilled {} batches of total {} rows to disk, memory released {}",
72        writer.num_batches,
73        writer.num_rows,
74        human_readable_size(writer.num_bytes),
75    );
76    Ok((writer.num_rows, writer.num_bytes))
77}
78
79fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
80    let file = BufReader::new(File::open(path)?);
81    let reader = StreamReader::try_new(file, None)?;
82    for batch in reader {
83        sender
84            .blocking_send(batch.map_err(Into::into))
85            .map_err(|e| exec_datafusion_err!("{e}"))?;
86    }
87    Ok(())
88}
89
90/// Spill the `RecordBatch` to disk as smaller batches
91/// split by `batch_size_rows`
92pub fn spill_record_batch_by_size(
93    batch: &RecordBatch,
94    path: PathBuf,
95    schema: SchemaRef,
96    batch_size_rows: usize,
97) -> Result<()> {
98    let mut offset = 0;
99    let total_rows = batch.num_rows();
100    let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;
101
102    while offset < total_rows {
103        let length = std::cmp::min(total_rows - offset, batch_size_rows);
104        let batch = batch.slice(offset, length);
105        offset += batch.num_rows();
106        writer.write(&batch)?;
107    }
108    writer.finish()?;
109
110    Ok(())
111}
112
113/// Calculate total used memory of this batch.
114///
115/// This function is used to estimate the physical memory usage of the `RecordBatch`.
116/// It only counts the memory of large data `Buffer`s, and ignores metadata like
117/// types and pointers.
118/// The implementation will add up all unique `Buffer`'s memory
119/// size, due to:
120/// - The data pointer inside `Buffer` are memory regions returned by global memory
121///   allocator, those regions can't have overlap.
122/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
123///   or reuse the same `Buffer`. For example: taking a slice from `Array`.
124///
125/// Example:
126/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
127/// to a sub-region of the same buffer.
128///
129/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
130///       ^    ^  ^    ^
131///       |    |  |    |
132/// col1->{    }  |    |
133/// col2--------->{    }
134///
135/// In the above case, `get_record_batch_memory_size` will return the size of
136/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
137///
138/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
139/// buffer memory size if multiple arrays within the batch are sharing the same
140/// `Buffer`. This method provides temporary fix until the issue is resolved:
141/// <https://github.com/apache/arrow-rs/issues/6439>
142pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
143    // Store pointers to `Buffer`'s start memory address (instead of actual
144    // used data region's pointer represented by current `Array`)
145    let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
146    let mut total_size = 0;
147
148    for array in batch.columns() {
149        let array_data = array.to_data();
150        count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
151    }
152
153    total_size
154}
155
156/// Count the memory usage of `array_data` and its children recursively.
157fn count_array_data_memory_size(
158    array_data: &ArrayData,
159    counted_buffers: &mut HashSet<NonNull<u8>>,
160    total_size: &mut usize,
161) {
162    // Count memory usage for `array_data`
163    for buffer in array_data.buffers() {
164        if counted_buffers.insert(buffer.data_ptr()) {
165            *total_size += buffer.capacity();
166        } // Otherwise the buffer's memory is already counted
167    }
168
169    if let Some(null_buffer) = array_data.nulls() {
170        if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
171            *total_size += null_buffer.inner().inner().capacity();
172        }
173    }
174
175    // Count all children `ArrayData` recursively
176    for child in array_data.child_data() {
177        count_array_data_memory_size(child, counted_buffers, total_size);
178    }
179}
180
181/// Write in Arrow IPC Stream format to a file.
182///
183/// Stream format is used for spill because it supports dictionary replacement, and the random
184/// access of IPC File format is not needed (IPC File format doesn't support dictionary replacement).
185struct IPCStreamWriter {
186    /// Inner writer
187    pub writer: StreamWriter<File>,
188    /// Batches written
189    pub num_batches: usize,
190    /// Rows written
191    pub num_rows: usize,
192    /// Bytes written
193    pub num_bytes: usize,
194}
195
196impl IPCStreamWriter {
197    /// Create new writer
198    pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
199        let file = File::create(path).map_err(|e| {
200            exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
201        })?;
202        Ok(Self {
203            num_batches: 0,
204            num_rows: 0,
205            num_bytes: 0,
206            writer: StreamWriter::try_new(file, schema)?,
207        })
208    }
209
210    /// Write one single batch
211    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
212        self.writer.write(batch)?;
213        self.num_batches += 1;
214        self.num_rows += batch.num_rows();
215        let num_bytes: usize = batch.get_array_memory_size();
216        self.num_bytes += num_bytes;
217        Ok(())
218    }
219
220    /// Finish the writer
221    pub fn finish(&mut self) -> Result<()> {
222        self.writer.finish().map_err(Into::into)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::spill::{spill_record_batch_by_size, spill_record_batches};
230    use crate::test::build_table_i32;
231    use arrow::array::{Float64Array, Int32Array, ListArray};
232    use arrow::compute::cast;
233    use arrow::datatypes::{DataType, Field, Int32Type, Schema};
234    use arrow::record_batch::RecordBatch;
235    use datafusion_common::Result;
236    use datafusion_execution::disk_manager::DiskManagerConfig;
237    use datafusion_execution::DiskManager;
238    use itertools::Itertools;
239    use std::fs::File;
240    use std::io::BufReader;
241    use std::sync::Arc;
242
243    #[test]
244    fn test_batch_spill_and_read() -> Result<()> {
245        let batch1 = build_table_i32(
246            ("a2", &vec![0, 1, 2]),
247            ("b2", &vec![3, 4, 5]),
248            ("c2", &vec![4, 5, 6]),
249        );
250
251        let batch2 = build_table_i32(
252            ("a2", &vec![10, 11, 12]),
253            ("b2", &vec![13, 14, 15]),
254            ("c2", &vec![14, 15, 16]),
255        );
256
257        let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
258
259        let spill_file = disk_manager.create_tmp_file("Test Spill")?;
260        let schema = batch1.schema();
261        let num_rows = batch1.num_rows() + batch2.num_rows();
262        let (spilled_rows, _) = spill_record_batches(
263            &[batch1, batch2],
264            spill_file.path().into(),
265            Arc::clone(&schema),
266        )?;
267        assert_eq!(spilled_rows, num_rows);
268
269        let file = BufReader::new(File::open(spill_file.path())?);
270        let reader = StreamReader::try_new(file, None)?;
271
272        assert_eq!(reader.schema(), schema);
273
274        let batches = reader.collect_vec();
275        assert!(batches.len() == 2);
276
277        Ok(())
278    }
279
280    #[test]
281    fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> {
282        // See https://github.com/apache/datafusion/issues/4658
283
284        let batch1 = build_table_i32(
285            ("a2", &vec![0, 1, 2]),
286            ("b2", &vec![3, 4, 5]),
287            ("c2", &vec![4, 5, 6]),
288        );
289
290        let batch2 = build_table_i32(
291            ("a2", &vec![10, 11, 12]),
292            ("b2", &vec![13, 14, 15]),
293            ("c2", &vec![14, 15, 16]),
294        );
295
296        // Dictionary encode the arrays
297        let dict_type =
298            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
299        let dict_schema = Arc::new(Schema::new(vec![
300            Field::new("a2", dict_type.clone(), true),
301            Field::new("b2", dict_type.clone(), true),
302            Field::new("c2", dict_type.clone(), true),
303        ]));
304
305        let batch1 = RecordBatch::try_new(
306            Arc::clone(&dict_schema),
307            batch1
308                .columns()
309                .iter()
310                .map(|array| cast(array, &dict_type))
311                .collect::<Result<_, _>>()?,
312        )?;
313
314        let batch2 = RecordBatch::try_new(
315            Arc::clone(&dict_schema),
316            batch2
317                .columns()
318                .iter()
319                .map(|array| cast(array, &dict_type))
320                .collect::<Result<_, _>>()?,
321        )?;
322
323        let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
324
325        let spill_file = disk_manager.create_tmp_file("Test Spill")?;
326        let num_rows = batch1.num_rows() + batch2.num_rows();
327        let (spilled_rows, _) = spill_record_batches(
328            &[batch1, batch2],
329            spill_file.path().into(),
330            Arc::clone(&dict_schema),
331        )?;
332        assert_eq!(spilled_rows, num_rows);
333
334        let file = BufReader::new(File::open(spill_file.path())?);
335        let reader = StreamReader::try_new(file, None)?;
336
337        assert_eq!(reader.schema(), dict_schema);
338
339        let batches = reader.collect_vec();
340        assert!(batches.len() == 2);
341
342        Ok(())
343    }
344
345    #[test]
346    fn test_batch_spill_by_size() -> Result<()> {
347        let batch1 = build_table_i32(
348            ("a2", &vec![0, 1, 2, 3]),
349            ("b2", &vec![3, 4, 5, 6]),
350            ("c2", &vec![4, 5, 6, 7]),
351        );
352
353        let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
354
355        let spill_file = disk_manager.create_tmp_file("Test Spill")?;
356        let schema = batch1.schema();
357        spill_record_batch_by_size(
358            &batch1,
359            spill_file.path().into(),
360            Arc::clone(&schema),
361            1,
362        )?;
363
364        let file = BufReader::new(File::open(spill_file.path())?);
365        let reader = StreamReader::try_new(file, None)?;
366
367        assert_eq!(reader.schema(), schema);
368
369        let batches = reader.collect_vec();
370        assert!(batches.len() == 4);
371
372        Ok(())
373    }
374
375    #[test]
376    fn test_get_record_batch_memory_size() {
377        // Create a simple record batch with two columns
378        let schema = Arc::new(Schema::new(vec![
379            Field::new("ints", DataType::Int32, true),
380            Field::new("float64", DataType::Float64, false),
381        ]));
382
383        let int_array =
384            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
385        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
386
387        let batch = RecordBatch::try_new(
388            schema,
389            vec![Arc::new(int_array), Arc::new(float64_array)],
390        )
391        .unwrap();
392
393        let size = get_record_batch_memory_size(&batch);
394        assert_eq!(size, 60);
395    }
396
397    #[test]
398    fn test_get_record_batch_memory_size_with_null() {
399        // Create a simple record batch with two columns
400        let schema = Arc::new(Schema::new(vec![
401            Field::new("ints", DataType::Int32, true),
402            Field::new("float64", DataType::Float64, false),
403        ]));
404
405        let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
406        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);
407
408        let batch = RecordBatch::try_new(
409            schema,
410            vec![Arc::new(int_array), Arc::new(float64_array)],
411        )
412        .unwrap();
413
414        let size = get_record_batch_memory_size(&batch);
415        assert_eq!(size, 100);
416    }
417
418    #[test]
419    fn test_get_record_batch_memory_size_empty() {
420        // Test with empty record batch
421        let schema = Arc::new(Schema::new(vec![Field::new(
422            "ints",
423            DataType::Int32,
424            false,
425        )]));
426
427        let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
428        let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
429
430        let size = get_record_batch_memory_size(&batch);
431        assert_eq!(size, 0, "Empty batch should have 0 memory size");
432    }
433
434    #[test]
435    fn test_get_record_batch_memory_size_shared_buffer() {
436        // Test with slices that share the same underlying buffer
437        let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
438        let slice1 = original.slice(0, 3);
439        let slice2 = original.slice(2, 3);
440
441        // `RecordBatch` with `original` array
442        // ----
443        let schema_origin = Arc::new(Schema::new(vec![Field::new(
444            "origin_col",
445            DataType::Int32,
446            false,
447        )]));
448        let batch_origin =
449            RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();
450
451        // `RecordBatch` with all columns are reference to `original` array
452        // ----
453        let schema = Arc::new(Schema::new(vec![
454            Field::new("slice1", DataType::Int32, false),
455            Field::new("slice2", DataType::Int32, false),
456        ]));
457
458        let batch_sliced =
459            RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
460                .unwrap();
461
462        // Two sizes should all be only counting the buffer in `original` array
463        let size_origin = get_record_batch_memory_size(&batch_origin);
464        let size_sliced = get_record_batch_memory_size(&batch_sliced);
465
466        assert_eq!(size_origin, size_sliced);
467    }
468
469    #[test]
470    fn test_get_record_batch_memory_size_nested_array() {
471        let schema = Arc::new(Schema::new(vec![
472            Field::new(
473                "nested_int",
474                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
475                false,
476            ),
477            Field::new(
478                "nested_int2",
479                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
480                false,
481            ),
482        ]));
483
484        let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
485            Some(vec![Some(1), Some(2), Some(3)]),
486        ]);
487
488        let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
489            Some(vec![Some(4), Some(5), Some(6)]),
490        ]);
491
492        let batch = RecordBatch::try_new(
493            schema,
494            vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
495        )
496        .unwrap();
497
498        let size = get_record_batch_memory_size(&batch);
499        assert_eq!(size, 8320);
500    }
501}