use crate::metrics::BaselineMetrics;
use crate::sorts::{
merge::SortPreservingMergeStream,
stream::{FieldCursorStream, RowCursorStream},
};
use crate::{PhysicalSortExpr, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
merge_helper!(PrimitiveArray<$t>, $($v),+)
};
}
macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}
pub fn streaming_merge(
streams: Vec<SendableRecordBatchStream>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
if expressions.is_empty() {
return internal_err!("Sort expressions cannot be empty for streaming merge");
}
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}
let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;
Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}