datafusion_physical_plan/sorts/
streaming_merge.rsuse crate::metrics::BaselineMetrics;
use crate::sorts::{
merge::SortPreservingMergeStream,
stream::{FieldCursorStream, RowCursorStream},
};
use crate::SendableRecordBatchStream;
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
merge_helper!(PrimitiveArray<$t>, $($v),+)
};
}
macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
$enable_round_robin_tie_breaker,
)));
}};
}
#[derive(Default)]
pub struct StreamingMergeBuilder<'a> {
streams: Vec<SendableRecordBatchStream>,
schema: Option<SchemaRef>,
expressions: LexOrderingRef<'a>,
metrics: Option<BaselineMetrics>,
batch_size: Option<usize>,
fetch: Option<usize>,
reservation: Option<MemoryReservation>,
enable_round_robin_tie_breaker: bool,
}
impl<'a> StreamingMergeBuilder<'a> {
pub fn new() -> Self {
Self {
enable_round_robin_tie_breaker: true,
..Default::default()
}
}
pub fn with_streams(mut self, streams: Vec<SendableRecordBatchStream>) -> Self {
self.streams = streams;
self
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn with_expressions(mut self, expressions: LexOrderingRef<'a>) -> Self {
self.expressions = expressions;
self
}
pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self {
self.metrics = Some(metrics);
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}
pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
self.reservation = Some(reservation);
self
}
pub fn with_round_robin_tie_breaker(
mut self,
enable_round_robin_tie_breaker: bool,
) -> Self {
self.enable_round_robin_tie_breaker = enable_round_robin_tie_breaker;
self
}
pub fn build(self) -> Result<SendableRecordBatchStream> {
let Self {
streams,
schema,
metrics,
batch_size,
reservation,
fetch,
expressions,
enable_round_robin_tie_breaker,
} = self;
let checks = [
(
streams.is_empty(),
"Streams cannot be empty for streaming merge",
),
(
expressions.is_empty(),
"Sort expressions cannot be empty for streaming merge",
),
];
if let Some((_, error_message)) = checks.iter().find(|(condition, _)| *condition)
{
return internal_err!("{}", error_message);
}
let schema = schema.expect("Schema cannot be empty for streaming merge");
let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
let batch_size =
batch_size.expect("Batch size cannot be empty for streaming merge");
let reservation =
reservation.expect("Reservation cannot be empty for streaming merge");
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
_ => {}
}
}
let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;
Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
enable_round_robin_tie_breaker,
)))
}
}