1use std::fs::File;
21use std::io::BufReader;
22use std::path::{Path, PathBuf};
23use std::ptr::NonNull;
24
25use arrow::array::ArrayData;
26use arrow::datatypes::{Schema, SchemaRef};
27use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
28use arrow::record_batch::RecordBatch;
29use log::debug;
30use tokio::sync::mpsc::Sender;
31
32use datafusion_common::{exec_datafusion_err, HashSet, Result};
33use datafusion_execution::disk_manager::RefCountedTempFile;
34use datafusion_execution::memory_pool::human_readable_size;
35use datafusion_execution::SendableRecordBatchStream;
36
37use crate::stream::RecordBatchReceiverStream;
38
39pub(crate) fn read_spill_as_stream(
45 path: RefCountedTempFile,
46 schema: SchemaRef,
47 buffer: usize,
48) -> Result<SendableRecordBatchStream> {
49 let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
50 let sender = builder.tx();
51
52 builder.spawn_blocking(move || read_spill(sender, path.path()));
53
54 Ok(builder.build())
55}
56
57pub(crate) fn spill_record_batches(
61 batches: &[RecordBatch],
62 path: PathBuf,
63 schema: SchemaRef,
64) -> Result<(usize, usize)> {
65 let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?;
66 for batch in batches {
67 writer.write(batch)?;
68 }
69 writer.finish()?;
70 debug!(
71 "Spilled {} batches of total {} rows to disk, memory released {}",
72 writer.num_batches,
73 writer.num_rows,
74 human_readable_size(writer.num_bytes),
75 );
76 Ok((writer.num_rows, writer.num_bytes))
77}
78
79fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
80 let file = BufReader::new(File::open(path)?);
81 let reader = StreamReader::try_new(file, None)?;
82 for batch in reader {
83 sender
84 .blocking_send(batch.map_err(Into::into))
85 .map_err(|e| exec_datafusion_err!("{e}"))?;
86 }
87 Ok(())
88}
89
90pub fn spill_record_batch_by_size(
93 batch: &RecordBatch,
94 path: PathBuf,
95 schema: SchemaRef,
96 batch_size_rows: usize,
97) -> Result<()> {
98 let mut offset = 0;
99 let total_rows = batch.num_rows();
100 let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;
101
102 while offset < total_rows {
103 let length = std::cmp::min(total_rows - offset, batch_size_rows);
104 let batch = batch.slice(offset, length);
105 offset += batch.num_rows();
106 writer.write(&batch)?;
107 }
108 writer.finish()?;
109
110 Ok(())
111}
112
113pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
143 let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
146 let mut total_size = 0;
147
148 for array in batch.columns() {
149 let array_data = array.to_data();
150 count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
151 }
152
153 total_size
154}
155
156fn count_array_data_memory_size(
158 array_data: &ArrayData,
159 counted_buffers: &mut HashSet<NonNull<u8>>,
160 total_size: &mut usize,
161) {
162 for buffer in array_data.buffers() {
164 if counted_buffers.insert(buffer.data_ptr()) {
165 *total_size += buffer.capacity();
166 } }
168
169 if let Some(null_buffer) = array_data.nulls() {
170 if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
171 *total_size += null_buffer.inner().inner().capacity();
172 }
173 }
174
175 for child in array_data.child_data() {
177 count_array_data_memory_size(child, counted_buffers, total_size);
178 }
179}
180
181struct IPCStreamWriter {
186 pub writer: StreamWriter<File>,
188 pub num_batches: usize,
190 pub num_rows: usize,
192 pub num_bytes: usize,
194}
195
196impl IPCStreamWriter {
197 pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
199 let file = File::create(path).map_err(|e| {
200 exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
201 })?;
202 Ok(Self {
203 num_batches: 0,
204 num_rows: 0,
205 num_bytes: 0,
206 writer: StreamWriter::try_new(file, schema)?,
207 })
208 }
209
210 pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
212 self.writer.write(batch)?;
213 self.num_batches += 1;
214 self.num_rows += batch.num_rows();
215 let num_bytes: usize = batch.get_array_memory_size();
216 self.num_bytes += num_bytes;
217 Ok(())
218 }
219
220 pub fn finish(&mut self) -> Result<()> {
222 self.writer.finish().map_err(Into::into)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::spill::{spill_record_batch_by_size, spill_record_batches};
230 use crate::test::build_table_i32;
231 use arrow::array::{Float64Array, Int32Array, ListArray};
232 use arrow::compute::cast;
233 use arrow::datatypes::{DataType, Field, Int32Type, Schema};
234 use arrow::record_batch::RecordBatch;
235 use datafusion_common::Result;
236 use datafusion_execution::disk_manager::DiskManagerConfig;
237 use datafusion_execution::DiskManager;
238 use itertools::Itertools;
239 use std::fs::File;
240 use std::io::BufReader;
241 use std::sync::Arc;
242
243 #[test]
244 fn test_batch_spill_and_read() -> Result<()> {
245 let batch1 = build_table_i32(
246 ("a2", &vec![0, 1, 2]),
247 ("b2", &vec![3, 4, 5]),
248 ("c2", &vec![4, 5, 6]),
249 );
250
251 let batch2 = build_table_i32(
252 ("a2", &vec![10, 11, 12]),
253 ("b2", &vec![13, 14, 15]),
254 ("c2", &vec![14, 15, 16]),
255 );
256
257 let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
258
259 let spill_file = disk_manager.create_tmp_file("Test Spill")?;
260 let schema = batch1.schema();
261 let num_rows = batch1.num_rows() + batch2.num_rows();
262 let (spilled_rows, _) = spill_record_batches(
263 &[batch1, batch2],
264 spill_file.path().into(),
265 Arc::clone(&schema),
266 )?;
267 assert_eq!(spilled_rows, num_rows);
268
269 let file = BufReader::new(File::open(spill_file.path())?);
270 let reader = StreamReader::try_new(file, None)?;
271
272 assert_eq!(reader.schema(), schema);
273
274 let batches = reader.collect_vec();
275 assert!(batches.len() == 2);
276
277 Ok(())
278 }
279
280 #[test]
281 fn test_batch_spill_and_read_dictionary_arrays() -> Result<()> {
282 let batch1 = build_table_i32(
285 ("a2", &vec![0, 1, 2]),
286 ("b2", &vec![3, 4, 5]),
287 ("c2", &vec![4, 5, 6]),
288 );
289
290 let batch2 = build_table_i32(
291 ("a2", &vec![10, 11, 12]),
292 ("b2", &vec![13, 14, 15]),
293 ("c2", &vec![14, 15, 16]),
294 );
295
296 let dict_type =
298 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
299 let dict_schema = Arc::new(Schema::new(vec![
300 Field::new("a2", dict_type.clone(), true),
301 Field::new("b2", dict_type.clone(), true),
302 Field::new("c2", dict_type.clone(), true),
303 ]));
304
305 let batch1 = RecordBatch::try_new(
306 Arc::clone(&dict_schema),
307 batch1
308 .columns()
309 .iter()
310 .map(|array| cast(array, &dict_type))
311 .collect::<Result<_, _>>()?,
312 )?;
313
314 let batch2 = RecordBatch::try_new(
315 Arc::clone(&dict_schema),
316 batch2
317 .columns()
318 .iter()
319 .map(|array| cast(array, &dict_type))
320 .collect::<Result<_, _>>()?,
321 )?;
322
323 let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
324
325 let spill_file = disk_manager.create_tmp_file("Test Spill")?;
326 let num_rows = batch1.num_rows() + batch2.num_rows();
327 let (spilled_rows, _) = spill_record_batches(
328 &[batch1, batch2],
329 spill_file.path().into(),
330 Arc::clone(&dict_schema),
331 )?;
332 assert_eq!(spilled_rows, num_rows);
333
334 let file = BufReader::new(File::open(spill_file.path())?);
335 let reader = StreamReader::try_new(file, None)?;
336
337 assert_eq!(reader.schema(), dict_schema);
338
339 let batches = reader.collect_vec();
340 assert!(batches.len() == 2);
341
342 Ok(())
343 }
344
345 #[test]
346 fn test_batch_spill_by_size() -> Result<()> {
347 let batch1 = build_table_i32(
348 ("a2", &vec![0, 1, 2, 3]),
349 ("b2", &vec![3, 4, 5, 6]),
350 ("c2", &vec![4, 5, 6, 7]),
351 );
352
353 let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
354
355 let spill_file = disk_manager.create_tmp_file("Test Spill")?;
356 let schema = batch1.schema();
357 spill_record_batch_by_size(
358 &batch1,
359 spill_file.path().into(),
360 Arc::clone(&schema),
361 1,
362 )?;
363
364 let file = BufReader::new(File::open(spill_file.path())?);
365 let reader = StreamReader::try_new(file, None)?;
366
367 assert_eq!(reader.schema(), schema);
368
369 let batches = reader.collect_vec();
370 assert!(batches.len() == 4);
371
372 Ok(())
373 }
374
375 #[test]
376 fn test_get_record_batch_memory_size() {
377 let schema = Arc::new(Schema::new(vec![
379 Field::new("ints", DataType::Int32, true),
380 Field::new("float64", DataType::Float64, false),
381 ]));
382
383 let int_array =
384 Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
385 let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
386
387 let batch = RecordBatch::try_new(
388 schema,
389 vec![Arc::new(int_array), Arc::new(float64_array)],
390 )
391 .unwrap();
392
393 let size = get_record_batch_memory_size(&batch);
394 assert_eq!(size, 60);
395 }
396
397 #[test]
398 fn test_get_record_batch_memory_size_with_null() {
399 let schema = Arc::new(Schema::new(vec![
401 Field::new("ints", DataType::Int32, true),
402 Field::new("float64", DataType::Float64, false),
403 ]));
404
405 let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
406 let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);
407
408 let batch = RecordBatch::try_new(
409 schema,
410 vec![Arc::new(int_array), Arc::new(float64_array)],
411 )
412 .unwrap();
413
414 let size = get_record_batch_memory_size(&batch);
415 assert_eq!(size, 100);
416 }
417
418 #[test]
419 fn test_get_record_batch_memory_size_empty() {
420 let schema = Arc::new(Schema::new(vec![Field::new(
422 "ints",
423 DataType::Int32,
424 false,
425 )]));
426
427 let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
428 let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
429
430 let size = get_record_batch_memory_size(&batch);
431 assert_eq!(size, 0, "Empty batch should have 0 memory size");
432 }
433
434 #[test]
435 fn test_get_record_batch_memory_size_shared_buffer() {
436 let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
438 let slice1 = original.slice(0, 3);
439 let slice2 = original.slice(2, 3);
440
441 let schema_origin = Arc::new(Schema::new(vec![Field::new(
444 "origin_col",
445 DataType::Int32,
446 false,
447 )]));
448 let batch_origin =
449 RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();
450
451 let schema = Arc::new(Schema::new(vec![
454 Field::new("slice1", DataType::Int32, false),
455 Field::new("slice2", DataType::Int32, false),
456 ]));
457
458 let batch_sliced =
459 RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
460 .unwrap();
461
462 let size_origin = get_record_batch_memory_size(&batch_origin);
464 let size_sliced = get_record_batch_memory_size(&batch_sliced);
465
466 assert_eq!(size_origin, size_sliced);
467 }
468
469 #[test]
470 fn test_get_record_batch_memory_size_nested_array() {
471 let schema = Arc::new(Schema::new(vec![
472 Field::new(
473 "nested_int",
474 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
475 false,
476 ),
477 Field::new(
478 "nested_int2",
479 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
480 false,
481 ),
482 ]));
483
484 let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
485 Some(vec![Some(1), Some(2), Some(3)]),
486 ]);
487
488 let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
489 Some(vec![Some(4), Some(5), Some(6)]),
490 ]);
491
492 let batch = RecordBatch::try_new(
493 schema,
494 vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
495 )
496 .unwrap();
497
498 let size = get_record_batch_memory_size(&batch);
499 assert_eq!(size, 8320);
500 }
501}