datafusion_physical_plan/sorts/
streaming_merge.rs1use crate::metrics::BaselineMetrics;
22use crate::sorts::{
23 merge::SortPreservingMergeStream,
24 stream::{FieldCursorStream, RowCursorStream},
25};
26use crate::SendableRecordBatchStream;
27use arrow::array::*;
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{internal_err, Result};
30use datafusion_execution::memory_pool::MemoryReservation;
31use datafusion_physical_expr_common::sort_expr::LexOrdering;
32
33macro_rules! primitive_merge_helper {
34 ($t:ty, $($v:ident),+) => {
35 merge_helper!(PrimitiveArray<$t>, $($v),+)
36 };
37}
38
39macro_rules! merge_helper {
40 ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{
41 let streams =
42 FieldCursorStream::<$t>::new($sort, $streams, $reservation.new_empty());
43 return Ok(Box::pin(SortPreservingMergeStream::new(
44 Box::new(streams),
45 $schema,
46 $tracking_metrics,
47 $batch_size,
48 $fetch,
49 $reservation,
50 $enable_round_robin_tie_breaker,
51 )));
52 }};
53}
54
55pub struct StreamingMergeBuilder<'a> {
56 streams: Vec<SendableRecordBatchStream>,
57 schema: Option<SchemaRef>,
58 expressions: &'a LexOrdering,
59 metrics: Option<BaselineMetrics>,
60 batch_size: Option<usize>,
61 fetch: Option<usize>,
62 reservation: Option<MemoryReservation>,
63 enable_round_robin_tie_breaker: bool,
64}
65
66impl Default for StreamingMergeBuilder<'_> {
67 fn default() -> Self {
68 Self {
69 streams: vec![],
70 schema: None,
71 expressions: LexOrdering::empty(),
72 metrics: None,
73 batch_size: None,
74 fetch: None,
75 reservation: None,
76 enable_round_robin_tie_breaker: false,
77 }
78 }
79}
80
81impl<'a> StreamingMergeBuilder<'a> {
82 pub fn new() -> Self {
83 Self {
84 enable_round_robin_tie_breaker: true,
85 ..Default::default()
86 }
87 }
88
89 pub fn with_streams(mut self, streams: Vec<SendableRecordBatchStream>) -> Self {
90 self.streams = streams;
91 self
92 }
93
94 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
95 self.schema = Some(schema);
96 self
97 }
98
99 pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
100 self.expressions = expressions;
101 self
102 }
103
104 pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self {
105 self.metrics = Some(metrics);
106 self
107 }
108
109 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
110 self.batch_size = Some(batch_size);
111 self
112 }
113
114 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
115 self.fetch = fetch;
116 self
117 }
118
119 pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
120 self.reservation = Some(reservation);
121 self
122 }
123
124 pub fn with_round_robin_tie_breaker(
129 mut self,
130 enable_round_robin_tie_breaker: bool,
131 ) -> Self {
132 self.enable_round_robin_tie_breaker = enable_round_robin_tie_breaker;
133 self
134 }
135
136 pub fn build(self) -> Result<SendableRecordBatchStream> {
137 let Self {
138 streams,
139 schema,
140 metrics,
141 batch_size,
142 reservation,
143 fetch,
144 expressions,
145 enable_round_robin_tie_breaker,
146 } = self;
147
148 let checks = [
150 (
151 streams.is_empty(),
152 "Streams cannot be empty for streaming merge",
153 ),
154 (
155 expressions.is_empty(),
156 "Sort expressions cannot be empty for streaming merge",
157 ),
158 ];
159
160 if let Some((_, error_message)) = checks.iter().find(|(condition, _)| *condition)
161 {
162 return internal_err!("{}", error_message);
163 }
164
165 let schema = schema.expect("Schema cannot be empty for streaming merge");
167 let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
168 let batch_size =
169 batch_size.expect("Batch size cannot be empty for streaming merge");
170 let reservation =
171 reservation.expect("Reservation cannot be empty for streaming merge");
172
173 if expressions.len() == 1 {
175 let sort = expressions[0].clone();
176 let data_type = sort.expr.data_type(schema.as_ref())?;
177 downcast_primitive! {
178 data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
179 DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
180 DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
181 DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
182 DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
183 _ => {}
184 }
185 }
186
187 let streams = RowCursorStream::try_new(
188 schema.as_ref(),
189 expressions,
190 streams,
191 reservation.new_empty(),
192 )?;
193 Ok(Box::pin(SortPreservingMergeStream::new(
194 Box::new(streams),
195 schema,
196 metrics,
197 batch_size,
198 fetch,
199 reservation,
200 enable_round_robin_tie_breaker,
201 )))
202 }
203}