1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Merge that deals with an arbitrary size of streaming inputs.
//! This is an order-preserving merge.

use crate::metrics::BaselineMetrics;
use crate::sorts::{
    merge::SortPreservingMergeStream,
    stream::{FieldCursorStream, RowCursorStream},
};
use crate::{PhysicalSortExpr, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;

macro_rules! primitive_merge_helper {
    ($t:ty, $($v:ident),+) => {
        merge_helper!(PrimitiveArray<$t>, $($v),+)
    };
}

macro_rules! merge_helper {
    ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
        let streams = FieldCursorStream::<$t>::new($sort, $streams);
        return Ok(Box::pin(SortPreservingMergeStream::new(
            Box::new(streams),
            $schema,
            $tracking_metrics,
            $batch_size,
            $fetch,
            $reservation,
        )));
    }};
}

/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions
/// while preserving order.
pub fn streaming_merge(
    streams: Vec<SendableRecordBatchStream>,
    schema: SchemaRef,
    expressions: &[PhysicalSortExpr],
    metrics: BaselineMetrics,
    batch_size: usize,
    fetch: Option<usize>,
    reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
    // If there are no sort expressions, preserving the order
    // doesn't mean anything (and result in infinite loops)
    if expressions.is_empty() {
        return internal_err!("Sort expressions cannot be empty for streaming merge");
    }
    // Special case single column comparisons with optimized cursor implementations
    if expressions.len() == 1 {
        let sort = expressions[0].clone();
        let data_type = sort.expr.data_type(schema.as_ref())?;
        downcast_primitive! {
            data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
            DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
            DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
            DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
            DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
            _ => {}
        }
    }

    let streams = RowCursorStream::try_new(
        schema.as_ref(),
        expressions,
        streams,
        reservation.new_empty(),
    )?;

    Ok(Box::pin(SortPreservingMergeStream::new(
        Box::new(streams),
        schema,
        metrics,
        batch_size,
        fetch,
        reservation,
    )))
}