use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::ptr::NonNull;
use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;
use datafusion_common::{exec_datafusion_err, HashSet, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;
use crate::common::IPCWriter;
use crate::stream::RecordBatchReceiverStream;
pub(crate) fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();
builder.spawn_blocking(move || read_spill(sender, path.path()));
Ok(builder.build())
}
pub(crate) fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}
fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}
pub fn spill_record_batch_by_size(
batch: &RecordBatch,
path: PathBuf,
schema: SchemaRef,
batch_size_rows: usize,
) -> Result<()> {
let mut offset = 0;
let total_rows = batch.num_rows();
let mut writer = IPCWriter::new(&path, schema.as_ref())?;
while offset < total_rows {
let length = std::cmp::min(total_rows - offset, batch_size_rows);
let batch = batch.slice(offset, length);
offset += batch.num_rows();
writer.write(&batch)?;
}
writer.finish()?;
Ok(())
}
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
let mut total_size = 0;
for array in batch.columns() {
let array_data = array.to_data();
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
}
total_size
}
fn count_array_data_memory_size(
array_data: &ArrayData,
counted_buffers: &mut HashSet<NonNull<u8>>,
total_size: &mut usize,
) {
for buffer in array_data.buffers() {
if counted_buffers.insert(buffer.data_ptr()) {
*total_size += buffer.capacity();
} }
if let Some(null_buffer) = array_data.nulls() {
if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
*total_size += null_buffer.inner().inner().capacity();
}
}
for child in array_data.child_data() {
count_array_data_memory_size(child, counted_buffers, total_size);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
use crate::test::build_table_i32;
use arrow::array::{Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
#[test]
fn test_batch_spill_and_read() -> Result<()> {
let batch1 = build_table_i32(
("a2", &vec![0, 1, 2]),
("b2", &vec![3, 4, 5]),
("c2", &vec![4, 5, 6]),
);
let batch2 = build_table_i32(
("a2", &vec![10, 11, 12]),
("b2", &vec![13, 14, 15]),
("c2", &vec![14, 15, 16]),
);
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let schema = batch1.schema();
let num_rows = batch1.num_rows() + batch2.num_rows();
let cnt = spill_record_batches(
vec![batch1, batch2],
spill_file.path().into(),
Arc::clone(&schema),
);
assert_eq!(cnt.unwrap(), num_rows);
let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);
Ok(())
}
#[test]
fn test_batch_spill_by_size() -> Result<()> {
let batch1 = build_table_i32(
("a2", &vec![0, 1, 2, 3]),
("b2", &vec![3, 4, 5, 6]),
("c2", &vec![4, 5, 6, 7]),
);
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
let spill_file = disk_manager.create_tmp_file("Test Spill")?;
let schema = batch1.schema();
spill_record_batch_by_size(
&batch1,
spill_file.path().into(),
Arc::clone(&schema),
1,
)?;
let file = BufReader::new(File::open(spill_file.path())?);
let reader = FileReader::try_new(file, None)?;
assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);
Ok(())
}
#[test]
fn test_get_record_batch_memory_size() {
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));
let int_array =
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();
let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 60);
}
#[test]
fn test_get_record_batch_memory_size_with_null() {
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));
let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();
let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 100);
}
#[test]
fn test_get_record_batch_memory_size_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"ints",
DataType::Int32,
false,
)]));
let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();
let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 0, "Empty batch should have 0 memory size");
}
#[test]
fn test_get_record_batch_memory_size_shared_buffer() {
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);
let schema_origin = Arc::new(Schema::new(vec![Field::new(
"origin_col",
DataType::Int32,
false,
)]));
let batch_origin =
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("slice1", DataType::Int32, false),
Field::new("slice2", DataType::Int32, false),
]));
let batch_sliced =
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
.unwrap();
let size_origin = get_record_batch_memory_size(&batch_origin);
let size_sliced = get_record_batch_memory_size(&batch_sliced);
assert_eq!(size_origin, size_sliced);
}
#[test]
fn test_get_record_batch_memory_size_nested_array() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"nested_int",
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
false,
),
Field::new(
"nested_int2",
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
false,
),
]));
let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
]);
let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(4), Some(5), Some(6)]),
]);
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
)
.unwrap();
let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 8320);
}
}