datafusion_physical_plan/sorts/
streaming_merge.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Merge that deals with an arbitrary size of streaming inputs.
19//! This is an order-preserving merge.
20
21use crate::metrics::BaselineMetrics;
22use crate::sorts::{
23    merge::SortPreservingMergeStream,
24    stream::{FieldCursorStream, RowCursorStream},
25};
26use crate::SendableRecordBatchStream;
27use arrow::array::*;
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{internal_err, Result};
30use datafusion_execution::memory_pool::MemoryReservation;
31use datafusion_physical_expr_common::sort_expr::LexOrdering;
32
33macro_rules! primitive_merge_helper {
34    ($t:ty, $($v:ident),+) => {
35        merge_helper!(PrimitiveArray<$t>, $($v),+)
36    };
37}
38
39macro_rules! merge_helper {
40    ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident, $enable_round_robin_tie_breaker:ident) => {{
41        let streams =
42            FieldCursorStream::<$t>::new($sort, $streams, $reservation.new_empty());
43        return Ok(Box::pin(SortPreservingMergeStream::new(
44            Box::new(streams),
45            $schema,
46            $tracking_metrics,
47            $batch_size,
48            $fetch,
49            $reservation,
50            $enable_round_robin_tie_breaker,
51        )));
52    }};
53}
54
55pub struct StreamingMergeBuilder<'a> {
56    streams: Vec<SendableRecordBatchStream>,
57    schema: Option<SchemaRef>,
58    expressions: &'a LexOrdering,
59    metrics: Option<BaselineMetrics>,
60    batch_size: Option<usize>,
61    fetch: Option<usize>,
62    reservation: Option<MemoryReservation>,
63    enable_round_robin_tie_breaker: bool,
64}
65
66impl Default for StreamingMergeBuilder<'_> {
67    fn default() -> Self {
68        Self {
69            streams: vec![],
70            schema: None,
71            expressions: LexOrdering::empty(),
72            metrics: None,
73            batch_size: None,
74            fetch: None,
75            reservation: None,
76            enable_round_robin_tie_breaker: false,
77        }
78    }
79}
80
81impl<'a> StreamingMergeBuilder<'a> {
82    pub fn new() -> Self {
83        Self {
84            enable_round_robin_tie_breaker: true,
85            ..Default::default()
86        }
87    }
88
89    pub fn with_streams(mut self, streams: Vec<SendableRecordBatchStream>) -> Self {
90        self.streams = streams;
91        self
92    }
93
94    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
95        self.schema = Some(schema);
96        self
97    }
98
99    pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
100        self.expressions = expressions;
101        self
102    }
103
104    pub fn with_metrics(mut self, metrics: BaselineMetrics) -> Self {
105        self.metrics = Some(metrics);
106        self
107    }
108
109    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
110        self.batch_size = Some(batch_size);
111        self
112    }
113
114    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
115        self.fetch = fetch;
116        self
117    }
118
119    pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
120        self.reservation = Some(reservation);
121        self
122    }
123
124    /// See [SortPreservingMergeExec::with_round_robin_repartition] for more
125    /// information.
126    ///
127    /// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition
128    pub fn with_round_robin_tie_breaker(
129        mut self,
130        enable_round_robin_tie_breaker: bool,
131    ) -> Self {
132        self.enable_round_robin_tie_breaker = enable_round_robin_tie_breaker;
133        self
134    }
135
136    pub fn build(self) -> Result<SendableRecordBatchStream> {
137        let Self {
138            streams,
139            schema,
140            metrics,
141            batch_size,
142            reservation,
143            fetch,
144            expressions,
145            enable_round_robin_tie_breaker,
146        } = self;
147
148        // Early return if streams or expressions are empty
149        let checks = [
150            (
151                streams.is_empty(),
152                "Streams cannot be empty for streaming merge",
153            ),
154            (
155                expressions.is_empty(),
156                "Sort expressions cannot be empty for streaming merge",
157            ),
158        ];
159
160        if let Some((_, error_message)) = checks.iter().find(|(condition, _)| *condition)
161        {
162            return internal_err!("{}", error_message);
163        }
164
165        // Unwrapping mandatory fields
166        let schema = schema.expect("Schema cannot be empty for streaming merge");
167        let metrics = metrics.expect("Metrics cannot be empty for streaming merge");
168        let batch_size =
169            batch_size.expect("Batch size cannot be empty for streaming merge");
170        let reservation =
171            reservation.expect("Reservation cannot be empty for streaming merge");
172
173        // Special case single column comparisons with optimized cursor implementations
174        if expressions.len() == 1 {
175            let sort = expressions[0].clone();
176            let data_type = sort.expr.data_type(schema.as_ref())?;
177            downcast_primitive! {
178                data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
179                DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
180                DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
181                DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
182                DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
183                _ => {}
184            }
185        }
186
187        let streams = RowCursorStream::try_new(
188            schema.as_ref(),
189            expressions,
190            streams,
191            reservation.new_empty(),
192        )?;
193        Ok(Box::pin(SortPreservingMergeStream::new(
194            Box::new(streams),
195            schema,
196            metrics,
197            batch_size,
198            fetch,
199            reservation,
200            enable_round_robin_tie_breaker,
201        )))
202    }
203}