datafusion_common/utils/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides the bisect function, which implements binary search.
19
20pub mod expr;
21pub mod memory;
22pub mod proxy;
23pub mod string_utils;
24
25use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
26use crate::{DataFusionError, Result, ScalarValue};
27use arrow::array::{
28    cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
29    OffsetSizeTrait,
30};
31use arrow::buffer::OffsetBuffer;
32use arrow::compute::{partition, SortColumn, SortOptions};
33use arrow::datatypes::{DataType, Field, SchemaRef};
34use sqlparser::ast::Ident;
35use sqlparser::dialect::GenericDialect;
36use sqlparser::parser::Parser;
37use std::borrow::{Borrow, Cow};
38use std::cmp::{min, Ordering};
39use std::collections::HashSet;
40use std::num::NonZero;
41use std::ops::Range;
42use std::sync::Arc;
43use std::thread::available_parallelism;
44
45/// Applies an optional projection to a [`SchemaRef`], returning the
46/// projected schema
47///
48/// Example:
49/// ```
50/// use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
51/// use datafusion_common::project_schema;
52///
53/// // Schema with columns 'a', 'b', and 'c'
54/// let schema = SchemaRef::new(Schema::new(vec![
55///   Field::new("a", DataType::Int32, true),
56///   Field::new("b", DataType::Int64, true),
57///   Field::new("c", DataType::Utf8, true),
58/// ]));
59///
60/// // Pick columns 'c' and 'b'
61/// let projection = Some(vec![2,1]);
62/// let projected_schema = project_schema(
63///    &schema,
64///    projection.as_ref()
65///  ).unwrap();
66///
67/// let expected_schema = SchemaRef::new(Schema::new(vec![
68///   Field::new("c", DataType::Utf8, true),
69///   Field::new("b", DataType::Int64, true),
70/// ]));
71///
72/// assert_eq!(projected_schema, expected_schema);
73/// ```
74pub fn project_schema(
75    schema: &SchemaRef,
76    projection: Option<&Vec<usize>>,
77) -> Result<SchemaRef> {
78    let schema = match projection {
79        Some(columns) => Arc::new(schema.project(columns)?),
80        None => Arc::clone(schema),
81    };
82    Ok(schema)
83}
84
85/// Given column vectors, returns row at `idx`.
86pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
87    columns
88        .iter()
89        .map(|arr| ScalarValue::try_from_array(arr, idx))
90        .collect()
91}
92
93/// This function compares two tuples depending on the given sort options.
94pub fn compare_rows(
95    x: &[ScalarValue],
96    y: &[ScalarValue],
97    sort_options: &[SortOptions],
98) -> Result<Ordering> {
99    let zip_it = x.iter().zip(y.iter()).zip(sort_options.iter());
100    // Preserving lexical ordering.
101    for ((lhs, rhs), sort_options) in zip_it {
102        // Consider all combinations of NULLS FIRST/LAST and ASC/DESC configurations.
103        let result = match (lhs.is_null(), rhs.is_null(), sort_options.nulls_first) {
104            (true, false, false) | (false, true, true) => Ordering::Greater,
105            (true, false, true) | (false, true, false) => Ordering::Less,
106            (false, false, _) => if sort_options.descending {
107                rhs.partial_cmp(lhs)
108            } else {
109                lhs.partial_cmp(rhs)
110            }
111            .ok_or_else(|| {
112                _internal_datafusion_err!("Column array shouldn't be empty")
113            })?,
114            (true, true, _) => continue,
115        };
116        if result != Ordering::Equal {
117            return Ok(result);
118        }
119    }
120    Ok(Ordering::Equal)
121}
122
123/// This function searches for a tuple of given values (`target`) among the given
124/// rows (`item_columns`) using the bisection algorithm. It assumes that `item_columns`
125/// is sorted according to `sort_options` and returns the insertion index of `target`.
126/// Template argument `SIDE` being `true`/`false` means left/right insertion.
127pub fn bisect<const SIDE: bool>(
128    item_columns: &[ArrayRef],
129    target: &[ScalarValue],
130    sort_options: &[SortOptions],
131) -> Result<usize> {
132    let low: usize = 0;
133    let high: usize = item_columns
134        .first()
135        .ok_or_else(|| {
136            DataFusionError::Internal("Column array shouldn't be empty".to_string())
137        })?
138        .len();
139    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
140        let cmp = compare_rows(current, target, sort_options)?;
141        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
142    };
143    find_bisect_point(item_columns, target, compare_fn, low, high)
144}
145
146/// This function searches for a tuple of given values (`target`) among a slice of
147/// the given rows (`item_columns`) using the bisection algorithm. The slice starts
148/// at the index `low` and ends at the index `high`. The boolean-valued function
149/// `compare_fn` specifies whether we bisect on the left (by returning `false`),
150/// or on the right (by returning `true`) when we compare the target value with
151/// the current value as we iteratively bisect the input.
152pub fn find_bisect_point<F>(
153    item_columns: &[ArrayRef],
154    target: &[ScalarValue],
155    compare_fn: F,
156    mut low: usize,
157    mut high: usize,
158) -> Result<usize>
159where
160    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
161{
162    while low < high {
163        let mid = ((high - low) / 2) + low;
164        let val = get_row_at_idx(item_columns, mid)?;
165        if compare_fn(&val, target)? {
166            low = mid + 1;
167        } else {
168            high = mid;
169        }
170    }
171    Ok(low)
172}
173
174/// This function searches for a tuple of given values (`target`) among the given
175/// rows (`item_columns`) via a linear scan. It assumes that `item_columns` is sorted
176/// according to `sort_options` and returns the insertion index of `target`.
177/// Template argument `SIDE` being `true`/`false` means left/right insertion.
178pub fn linear_search<const SIDE: bool>(
179    item_columns: &[ArrayRef],
180    target: &[ScalarValue],
181    sort_options: &[SortOptions],
182) -> Result<usize> {
183    let low: usize = 0;
184    let high: usize = item_columns
185        .first()
186        .ok_or_else(|| {
187            DataFusionError::Internal("Column array shouldn't be empty".to_string())
188        })?
189        .len();
190    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
191        let cmp = compare_rows(current, target, sort_options)?;
192        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
193    };
194    search_in_slice(item_columns, target, compare_fn, low, high)
195}
196
197/// This function searches for a tuple of given values (`target`) among a slice of
198/// the given rows (`item_columns`) via a linear scan. The slice starts at the index
199/// `low` and ends at the index `high`. The boolean-valued function `compare_fn`
200/// specifies the stopping criterion.
201pub fn search_in_slice<F>(
202    item_columns: &[ArrayRef],
203    target: &[ScalarValue],
204    compare_fn: F,
205    mut low: usize,
206    high: usize,
207) -> Result<usize>
208where
209    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
210{
211    while low < high {
212        let val = get_row_at_idx(item_columns, low)?;
213        if !compare_fn(&val, target)? {
214            break;
215        }
216        low += 1;
217    }
218    Ok(low)
219}
220
221/// Given a list of 0 or more already sorted columns, finds the
222/// partition ranges that would partition equally across columns.
223///
224/// See [`partition`] for more details.
225pub fn evaluate_partition_ranges(
226    num_rows: usize,
227    partition_columns: &[SortColumn],
228) -> Result<Vec<Range<usize>>> {
229    Ok(if partition_columns.is_empty() {
230        vec![Range {
231            start: 0,
232            end: num_rows,
233        }]
234    } else {
235        let cols: Vec<_> = partition_columns
236            .iter()
237            .map(|x| Arc::clone(&x.values))
238            .collect();
239        partition(&cols)?.ranges()
240    })
241}
242
243/// Wraps identifier string in double quotes, escaping any double quotes in
244/// the identifier by replacing it with two double quotes
245///
246/// e.g. identifier `tab.le"name` becomes `"tab.le""name"`
247pub fn quote_identifier(s: &str) -> Cow<str> {
248    if needs_quotes(s) {
249        Cow::Owned(format!("\"{}\"", s.replace('"', "\"\"")))
250    } else {
251        Cow::Borrowed(s)
252    }
253}
254
255/// returns true if this identifier needs quotes
256fn needs_quotes(s: &str) -> bool {
257    let mut chars = s.chars();
258
259    // first char can not be a number unless escaped
260    if let Some(first_char) = chars.next() {
261        if !(first_char.is_ascii_lowercase() || first_char == '_') {
262            return true;
263        }
264    }
265
266    !chars.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
267}
268
269pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
270    let dialect = GenericDialect;
271    let mut parser = Parser::new(&dialect).try_with_sql(s)?;
272    let idents = parser.parse_multipart_identifier()?;
273    Ok(idents)
274}
275
276pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
277    parse_identifiers(s)
278        .unwrap_or_default()
279        .into_iter()
280        .map(|id| match id.quote_style {
281            Some(_) => id.value,
282            None if ignore_case => id.value,
283            _ => id.value.to_ascii_lowercase(),
284        })
285        .collect::<Vec<_>>()
286}
287
288/// This function "takes" the elements at `indices` from the slice `items`.
289pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
290    items: &[T],
291    indices: impl IntoIterator<Item = I>,
292) -> Result<Vec<T>> {
293    indices
294        .into_iter()
295        .map(|idx| items.get(*idx.borrow()).cloned())
296        .collect::<Option<Vec<T>>>()
297        .ok_or_else(|| {
298            DataFusionError::Execution(
299                "Expects indices to be in the range of searched vector".to_string(),
300            )
301        })
302}
303
304/// This function finds the longest prefix of the form 0, 1, 2, ... within the
305/// collection `sequence`. Examples:
306/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest satisfying
307///   prefix.
308/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
309pub fn longest_consecutive_prefix<T: Borrow<usize>>(
310    sequence: impl IntoIterator<Item = T>,
311) -> usize {
312    let mut count = 0;
313    for item in sequence {
314        if !count.eq(item.borrow()) {
315            break;
316        }
317        count += 1;
318    }
319    count
320}
321
322/// Creates single element [`ListArray`], [`LargeListArray`] and
323/// [`FixedSizeListArray`] from other arrays
324///
325/// For example this builder can convert `[1, 2, 3]` into `[[1, 2, 3]]`
326///
327/// # Example
328/// ```
329/// # use std::sync::Arc;
330/// # use arrow::array::{Array, ListArray};
331/// # use arrow::array::types::Int64Type;
332/// # use datafusion_common::utils::SingleRowListArrayBuilder;
333/// // Array is [1, 2, 3]
334/// let arr = ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
335///       Some(vec![Some(1), Some(2), Some(3)]),
336/// ]);
337/// // Wrap as a list array: [[1, 2, 3]]
338/// let list_arr = SingleRowListArrayBuilder::new(Arc::new(arr)).build_list_array();
339/// assert_eq!(list_arr.len(), 1);
340/// ```
341#[derive(Debug, Clone)]
342pub struct SingleRowListArrayBuilder {
343    /// array to be wrapped
344    arr: ArrayRef,
345    /// Should the resulting array be nullable? Defaults to `true`.
346    nullable: bool,
347    /// Specify the field name for the resulting array. Defaults to value used in
348    /// [`Field::new_list_field`]
349    field_name: Option<String>,
350}
351
352impl SingleRowListArrayBuilder {
353    /// Create a new instance of [`SingleRowListArrayBuilder`]
354    pub fn new(arr: ArrayRef) -> Self {
355        Self {
356            arr,
357            nullable: true,
358            field_name: None,
359        }
360    }
361
362    /// Set the nullable flag
363    pub fn with_nullable(mut self, nullable: bool) -> Self {
364        self.nullable = nullable;
365        self
366    }
367
368    /// sets the field name for the resulting array
369    pub fn with_field_name(mut self, field_name: Option<String>) -> Self {
370        self.field_name = field_name;
371        self
372    }
373
374    /// Copies field name and nullable from the specified field
375    pub fn with_field(self, field: &Field) -> Self {
376        self.with_field_name(Some(field.name().to_owned()))
377            .with_nullable(field.is_nullable())
378    }
379
380    /// Build a single element [`ListArray`]
381    pub fn build_list_array(self) -> ListArray {
382        let (field, arr) = self.into_field_and_arr();
383        let offsets = OffsetBuffer::from_lengths([arr.len()]);
384        ListArray::new(field, offsets, arr, None)
385    }
386
387    /// Build a single element [`ListArray`] and wrap as [`ScalarValue::List`]
388    pub fn build_list_scalar(self) -> ScalarValue {
389        ScalarValue::List(Arc::new(self.build_list_array()))
390    }
391
392    /// Build a single element [`LargeListArray`]
393    pub fn build_large_list_array(self) -> LargeListArray {
394        let (field, arr) = self.into_field_and_arr();
395        let offsets = OffsetBuffer::from_lengths([arr.len()]);
396        LargeListArray::new(field, offsets, arr, None)
397    }
398
399    /// Build a single element [`LargeListArray`] and wrap as [`ScalarValue::LargeList`]
400    pub fn build_large_list_scalar(self) -> ScalarValue {
401        ScalarValue::LargeList(Arc::new(self.build_large_list_array()))
402    }
403
404    /// Build a single element [`FixedSizeListArray`]
405    pub fn build_fixed_size_list_array(self, list_size: usize) -> FixedSizeListArray {
406        let (field, arr) = self.into_field_and_arr();
407        FixedSizeListArray::new(field, list_size as i32, arr, None)
408    }
409
410    /// Build a single element [`FixedSizeListArray`] and wrap as [`ScalarValue::FixedSizeList`]
411    pub fn build_fixed_size_list_scalar(self, list_size: usize) -> ScalarValue {
412        ScalarValue::FixedSizeList(Arc::new(self.build_fixed_size_list_array(list_size)))
413    }
414
415    /// Helper function: convert this builder into a tuple of field and array
416    fn into_field_and_arr(self) -> (Arc<Field>, ArrayRef) {
417        let Self {
418            arr,
419            nullable,
420            field_name,
421        } = self;
422        let data_type = arr.data_type().to_owned();
423        let field = match field_name {
424            Some(name) => Field::new(name, data_type, nullable),
425            None => Field::new_list_field(data_type, nullable),
426        };
427        (Arc::new(field), arr)
428    }
429}
430
431/// Wrap an array into a single element `ListArray`.
432/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
433/// The field in the list array is nullable.
434#[deprecated(
435    since = "44.0.0",
436    note = "please use `SingleRowListArrayBuilder` instead"
437)]
438pub fn array_into_list_array_nullable(arr: ArrayRef) -> ListArray {
439    SingleRowListArrayBuilder::new(arr)
440        .with_nullable(true)
441        .build_list_array()
442}
443
444/// Wrap an array into a single element `ListArray`.
445/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
446#[deprecated(
447    since = "44.0.0",
448    note = "please use `SingleRowListArrayBuilder` instead"
449)]
450pub fn array_into_list_array(arr: ArrayRef, nullable: bool) -> ListArray {
451    SingleRowListArrayBuilder::new(arr)
452        .with_nullable(nullable)
453        .build_list_array()
454}
455
456#[deprecated(
457    since = "44.0.0",
458    note = "please use `SingleRowListArrayBuilder` instead"
459)]
460pub fn array_into_list_array_with_field_name(
461    arr: ArrayRef,
462    nullable: bool,
463    field_name: &str,
464) -> ListArray {
465    SingleRowListArrayBuilder::new(arr)
466        .with_nullable(nullable)
467        .with_field_name(Some(field_name.to_string()))
468        .build_list_array()
469}
470
471/// Wrap an array into a single element `LargeListArray`.
472/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
473#[deprecated(
474    since = "44.0.0",
475    note = "please use `SingleRowListArrayBuilder` instead"
476)]
477pub fn array_into_large_list_array(arr: ArrayRef) -> LargeListArray {
478    SingleRowListArrayBuilder::new(arr).build_large_list_array()
479}
480
481#[deprecated(
482    since = "44.0.0",
483    note = "please use `SingleRowListArrayBuilder` instead"
484)]
485pub fn array_into_large_list_array_with_field_name(
486    arr: ArrayRef,
487    field_name: &str,
488) -> LargeListArray {
489    SingleRowListArrayBuilder::new(arr)
490        .with_field_name(Some(field_name.to_string()))
491        .build_large_list_array()
492}
493
494#[deprecated(
495    since = "44.0.0",
496    note = "please use `SingleRowListArrayBuilder` instead"
497)]
498pub fn array_into_fixed_size_list_array(
499    arr: ArrayRef,
500    list_size: usize,
501) -> FixedSizeListArray {
502    SingleRowListArrayBuilder::new(arr).build_fixed_size_list_array(list_size)
503}
504
505#[deprecated(
506    since = "44.0.0",
507    note = "please use `SingleRowListArrayBuilder` instead"
508)]
509pub fn array_into_fixed_size_list_array_with_field_name(
510    arr: ArrayRef,
511    list_size: usize,
512    field_name: &str,
513) -> FixedSizeListArray {
514    SingleRowListArrayBuilder::new(arr)
515        .with_field_name(Some(field_name.to_string()))
516        .build_fixed_size_list_array(list_size)
517}
518
519/// Wrap arrays into a single element `ListArray`.
520///
521/// Example:
522/// ```
523/// use arrow::array::{Int32Array, ListArray, ArrayRef};
524/// use arrow::datatypes::{Int32Type, Field};
525/// use std::sync::Arc;
526///
527/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
528/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
529///
530/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
531///
532/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
533///    vec![
534///     Some(vec![Some(1), Some(2), Some(3)]),
535///     Some(vec![Some(4), Some(5), Some(6)]),
536///    ]
537/// );
538///
539/// assert_eq!(list_arr, expected);
540pub fn arrays_into_list_array(
541    arr: impl IntoIterator<Item = ArrayRef>,
542) -> Result<ListArray> {
543    let arr = arr.into_iter().collect::<Vec<_>>();
544    if arr.is_empty() {
545        return _internal_err!("Cannot wrap empty array into list array");
546    }
547
548    let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
549    // Assume data type is consistent
550    let data_type = arr[0].data_type().to_owned();
551    let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
552    Ok(ListArray::new(
553        Arc::new(Field::new_list_field(data_type, true)),
554        OffsetBuffer::from_lengths(lens),
555        arrow::compute::concat(values.as_slice())?,
556        None,
557    ))
558}
559
560/// Helper function to convert a ListArray into a vector of ArrayRefs.
561pub fn list_to_arrays<O: OffsetSizeTrait>(a: &ArrayRef) -> Vec<ArrayRef> {
562    a.as_list::<O>().iter().flatten().collect::<Vec<_>>()
563}
564
565/// Helper function to convert a FixedSizeListArray into a vector of ArrayRefs.
566pub fn fixed_size_list_to_arrays(a: &ArrayRef) -> Vec<ArrayRef> {
567    a.as_fixed_size_list().iter().flatten().collect::<Vec<_>>()
568}
569
570/// Get the base type of a data type.
571///
572/// Example
573/// ```
574/// use arrow::datatypes::{DataType, Field};
575/// use datafusion_common::utils::base_type;
576/// use std::sync::Arc;
577///
578/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
579/// assert_eq!(base_type(&data_type), DataType::Int32);
580///
581/// let data_type = DataType::Int32;
582/// assert_eq!(base_type(&data_type), DataType::Int32);
583/// ```
584pub fn base_type(data_type: &DataType) -> DataType {
585    match data_type {
586        DataType::List(field)
587        | DataType::LargeList(field)
588        | DataType::FixedSizeList(field, _) => base_type(field.data_type()),
589        _ => data_type.to_owned(),
590    }
591}
592
593/// Information about how to coerce lists.
594#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
595pub enum ListCoercion {
596    /// [`DataType::FixedSizeList`] should be coerced to [`DataType::List`].
597    FixedSizedListToList,
598}
599
600/// A helper function to coerce base type in List.
601///
602/// Example
603/// ```
604/// use arrow::datatypes::{DataType, Field};
605/// use datafusion_common::utils::coerced_type_with_base_type_only;
606/// use std::sync::Arc;
607///
608/// let data_type = DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
609/// let base_type = DataType::Float64;
610/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type, None);
611/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))));
612pub fn coerced_type_with_base_type_only(
613    data_type: &DataType,
614    base_type: &DataType,
615    array_coercion: Option<&ListCoercion>,
616) -> DataType {
617    match (data_type, array_coercion) {
618        (DataType::List(field), _)
619        | (DataType::FixedSizeList(field, _), Some(ListCoercion::FixedSizedListToList)) =>
620        {
621            let field_type = coerced_type_with_base_type_only(
622                field.data_type(),
623                base_type,
624                array_coercion,
625            );
626
627            DataType::List(Arc::new(Field::new(
628                field.name(),
629                field_type,
630                field.is_nullable(),
631            )))
632        }
633        (DataType::FixedSizeList(field, len), _) => {
634            let field_type = coerced_type_with_base_type_only(
635                field.data_type(),
636                base_type,
637                array_coercion,
638            );
639
640            DataType::FixedSizeList(
641                Arc::new(Field::new(field.name(), field_type, field.is_nullable())),
642                *len,
643            )
644        }
645        (DataType::LargeList(field), _) => {
646            let field_type = coerced_type_with_base_type_only(
647                field.data_type(),
648                base_type,
649                array_coercion,
650            );
651
652            DataType::LargeList(Arc::new(Field::new(
653                field.name(),
654                field_type,
655                field.is_nullable(),
656            )))
657        }
658
659        _ => base_type.clone(),
660    }
661}
662
663/// Recursively coerce and `FixedSizeList` elements to `List`
664pub fn coerced_fixed_size_list_to_list(data_type: &DataType) -> DataType {
665    match data_type {
666        DataType::List(field) | DataType::FixedSizeList(field, _) => {
667            let field_type = coerced_fixed_size_list_to_list(field.data_type());
668
669            DataType::List(Arc::new(Field::new(
670                field.name(),
671                field_type,
672                field.is_nullable(),
673            )))
674        }
675        DataType::LargeList(field) => {
676            let field_type = coerced_fixed_size_list_to_list(field.data_type());
677
678            DataType::LargeList(Arc::new(Field::new(
679                field.name(),
680                field_type,
681                field.is_nullable(),
682            )))
683        }
684
685        _ => data_type.clone(),
686    }
687}
688
689/// Compute the number of dimensions in a list data type.
690pub fn list_ndims(data_type: &DataType) -> u64 {
691    match data_type {
692        DataType::List(field)
693        | DataType::LargeList(field)
694        | DataType::FixedSizeList(field, _) => 1 + list_ndims(field.data_type()),
695        _ => 0,
696    }
697}
698
699/// Adopted from strsim-rs for string similarity metrics
700pub mod datafusion_strsim {
701    // Source: https://github.com/dguo/strsim-rs/blob/master/src/lib.rs
702    // License: https://github.com/dguo/strsim-rs/blob/master/LICENSE
703    use std::cmp::min;
704    use std::str::Chars;
705
706    struct StringWrapper<'a>(&'a str);
707
708    impl<'b> IntoIterator for &StringWrapper<'b> {
709        type Item = char;
710        type IntoIter = Chars<'b>;
711
712        fn into_iter(self) -> Self::IntoIter {
713            self.0.chars()
714        }
715    }
716
717    /// Calculates the minimum number of insertions, deletions, and substitutions
718    /// required to change one sequence into the other.
719    fn generic_levenshtein<'a, 'b, Iter1, Iter2, Elem1, Elem2>(
720        a: &'a Iter1,
721        b: &'b Iter2,
722    ) -> usize
723    where
724        &'a Iter1: IntoIterator<Item = Elem1>,
725        &'b Iter2: IntoIterator<Item = Elem2>,
726        Elem1: PartialEq<Elem2>,
727    {
728        let b_len = b.into_iter().count();
729
730        if a.into_iter().next().is_none() {
731            return b_len;
732        }
733
734        let mut cache: Vec<usize> = (1..b_len + 1).collect();
735
736        let mut result = 0;
737
738        for (i, a_elem) in a.into_iter().enumerate() {
739            result = i + 1;
740            let mut distance_b = i;
741
742            for (j, b_elem) in b.into_iter().enumerate() {
743                let cost = if a_elem == b_elem { 0usize } else { 1usize };
744                let distance_a = distance_b + cost;
745                distance_b = cache[j];
746                result = min(result + 1, min(distance_a, distance_b + 1));
747                cache[j] = result;
748            }
749        }
750
751        result
752    }
753
754    /// Calculates the minimum number of insertions, deletions, and substitutions
755    /// required to change one string into the other.
756    ///
757    /// ```
758    /// use datafusion_common::utils::datafusion_strsim::levenshtein;
759    ///
760    /// assert_eq!(3, levenshtein("kitten", "sitting"));
761    /// ```
762    pub fn levenshtein(a: &str, b: &str) -> usize {
763        generic_levenshtein(&StringWrapper(a), &StringWrapper(b))
764    }
765
766    /// Calculates the normalized Levenshtein distance between two strings.
767    /// The normalized distance is a value between 0.0 and 1.0, where 1.0 indicates
768    /// that the strings are identical and 0.0 indicates no similarity.
769    ///
770    /// ```
771    /// use datafusion_common::utils::datafusion_strsim::normalized_levenshtein;
772    ///
773    /// assert!((normalized_levenshtein("kitten", "sitting") - 0.57142).abs() < 0.00001);
774    ///
775    /// assert!(normalized_levenshtein("", "second").abs() < 0.00001);
776    ///
777    /// assert!((normalized_levenshtein("kitten", "sitten") - 0.833).abs() < 0.001);
778    /// ```
779    pub fn normalized_levenshtein(a: &str, b: &str) -> f64 {
780        if a.is_empty() && b.is_empty() {
781            return 1.0;
782        }
783        1.0 - (levenshtein(a, b) as f64)
784            / (a.chars().count().max(b.chars().count()) as f64)
785    }
786}
787
788/// Merges collections `first` and `second`, removes duplicates and sorts the
789/// result, returning it as a [`Vec`].
790pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
791    first: impl IntoIterator<Item = T>,
792    second: impl IntoIterator<Item = S>,
793) -> Vec<usize> {
794    let mut result: Vec<_> = first
795        .into_iter()
796        .map(|e| *e.borrow())
797        .chain(second.into_iter().map(|e| *e.borrow()))
798        .collect::<HashSet<_>>()
799        .into_iter()
800        .collect();
801    result.sort();
802    result
803}
804
805/// Calculates the set difference between sequences `first` and `second`,
806/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
807pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
808    first: impl IntoIterator<Item = T>,
809    second: impl IntoIterator<Item = S>,
810) -> Vec<usize> {
811    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
812    first
813        .into_iter()
814        .map(|e| *e.borrow())
815        .filter(|e| !set.contains(e))
816        .collect()
817}
818
819/// Checks whether the given index sequence is monotonically non-decreasing.
820#[deprecated(since = "45.0.0", note = "Use std::Iterator::is_sorted instead")]
821pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> bool {
822    // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
823    let mut previous = 0;
824    for item in sequence.into_iter() {
825        let current = *item.borrow();
826        if current < previous {
827            return false;
828        }
829        previous = current;
830    }
831    true
832}
833
834/// Find indices of each element in `targets` inside `items`. If one of the
835/// elements is absent in `items`, returns an error.
836pub fn find_indices<T: PartialEq, S: Borrow<T>>(
837    items: &[T],
838    targets: impl IntoIterator<Item = S>,
839) -> Result<Vec<usize>> {
840    targets
841        .into_iter()
842        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
843        .collect::<Option<_>>()
844        .ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
845}
846
847/// Transposes the given vector of vectors.
848pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
849    match original.as_slice() {
850        [] => vec![],
851        [first, ..] => {
852            let mut result = (0..first.len()).map(|_| vec![]).collect::<Vec<_>>();
853            for row in original {
854                for (item, transposed_row) in row.into_iter().zip(&mut result) {
855                    transposed_row.push(item);
856                }
857            }
858            result
859        }
860    }
861}
862
863/// Computes the `skip` and `fetch` parameters of a single limit that would be
864/// equivalent to two consecutive limits with the given `skip`/`fetch` parameters.
865///
866/// There are multiple cases to consider:
867///
868/// # Case 0: Parent and child are disjoint (`child_fetch <= skip`).
869///
870/// ```text
871///   Before merging:
872///                     |........skip........|---fetch-->|     Parent limit
873///    |...child_skip...|---child_fetch-->|                    Child limit
874/// ```
875///
876///   After merging:
877/// ```text
878///    |.........(child_skip + skip).........|
879/// ```
880///
881/// # Case 1: Parent is beyond child's range (`skip < child_fetch <= skip + fetch`).
882///
883///   Before merging:
884/// ```text
885///                     |...skip...|------------fetch------------>|   Parent limit
886///    |...child_skip...|-------------child_fetch------------>|       Child limit
887/// ```
888///
889///   After merging:
890/// ```text
891///    |....(child_skip + skip)....|---(child_fetch - skip)-->|
892/// ```
893///
894///  # Case 2: Parent is within child's range (`skip + fetch < child_fetch`).
895///
896///   Before merging:
897/// ```text
898///                     |...skip...|---fetch-->|                   Parent limit
899///    |...child_skip...|-------------child_fetch------------>|    Child limit
900/// ```
901///
902///   After merging:
903/// ```text
904///    |....(child_skip + skip)....|---fetch-->|
905/// ```
906pub fn combine_limit(
907    parent_skip: usize,
908    parent_fetch: Option<usize>,
909    child_skip: usize,
910    child_fetch: Option<usize>,
911) -> (usize, Option<usize>) {
912    let combined_skip = child_skip.saturating_add(parent_skip);
913
914    let combined_fetch = match (parent_fetch, child_fetch) {
915        (Some(parent_fetch), Some(child_fetch)) => {
916            Some(min(parent_fetch, child_fetch.saturating_sub(parent_skip)))
917        }
918        (Some(parent_fetch), None) => Some(parent_fetch),
919        (None, Some(child_fetch)) => Some(child_fetch.saturating_sub(parent_skip)),
920        (None, None) => None,
921    };
922
923    (combined_skip, combined_fetch)
924}
925
926/// Returns the estimated number of threads available for parallel execution.
927///
928/// This is a wrapper around `std::thread::available_parallelism`, providing a default value
929/// of `1` if the system's parallelism cannot be determined.
930pub fn get_available_parallelism() -> usize {
931    available_parallelism()
932        .unwrap_or(NonZero::new(1).expect("literal value `1` shouldn't be zero"))
933        .get()
934}
935
936/// Converts a collection of function arguments into an fixed-size array of length N
937/// producing a reasonable error message in case of unexpected number of arguments.
938///
939/// # Example
940/// ```
941/// # use datafusion_common::Result;
942/// # use datafusion_common::utils::take_function_args;
943/// # use datafusion_common::ScalarValue;
944/// fn my_function(args: &[ScalarValue]) -> Result<()> {
945///   // function expects 2 args, so create a 2-element array
946///   let [arg1, arg2] = take_function_args("my_function", args)?;
947///   // ... do stuff..
948///   Ok(())
949/// }
950///
951/// // Calling the function with 1 argument produces an error:
952/// let args = vec![ScalarValue::Int32(Some(10))];
953/// let err = my_function(&args).unwrap_err();
954/// assert_eq!(err.to_string(), "Execution error: my_function function requires 2 arguments, got 1");
955/// // Calling the function with 2 arguments works great
956/// let args = vec![ScalarValue::Int32(Some(10)), ScalarValue::Int32(Some(20))];
957/// my_function(&args).unwrap();
958/// ```
959pub fn take_function_args<const N: usize, T>(
960    function_name: &str,
961    args: impl IntoIterator<Item = T>,
962) -> Result<[T; N]> {
963    let args = args.into_iter().collect::<Vec<_>>();
964    args.try_into().map_err(|v: Vec<T>| {
965        _exec_datafusion_err!(
966            "{} function requires {} {}, got {}",
967            function_name,
968            N,
969            if N == 1 { "argument" } else { "arguments" },
970            v.len()
971        )
972    })
973}
974
975#[cfg(test)]
976mod tests {
977    use super::*;
978    use crate::ScalarValue::Null;
979    use arrow::array::Float64Array;
980    use sqlparser::tokenizer::Span;
981
982    #[test]
983    fn test_bisect_linear_left_and_right() -> Result<()> {
984        let arrays: Vec<ArrayRef> = vec![
985            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
986            Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
987            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
988            Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
989        ];
990        let search_tuple: Vec<ScalarValue> = vec![
991            ScalarValue::Float64(Some(8.0)),
992            ScalarValue::Float64(Some(3.0)),
993            ScalarValue::Float64(Some(8.0)),
994            ScalarValue::Float64(Some(8.0)),
995        ];
996        let ords = [
997            SortOptions {
998                descending: false,
999                nulls_first: true,
1000            },
1001            SortOptions {
1002                descending: false,
1003                nulls_first: true,
1004            },
1005            SortOptions {
1006                descending: false,
1007                nulls_first: true,
1008            },
1009            SortOptions {
1010                descending: true,
1011                nulls_first: true,
1012            },
1013        ];
1014        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1015        assert_eq!(res, 2);
1016        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1017        assert_eq!(res, 3);
1018        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1019        assert_eq!(res, 2);
1020        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1021        assert_eq!(res, 3);
1022        Ok(())
1023    }
1024
1025    #[test]
1026    fn vector_ord() {
1027        assert!(vec![1, 0, 0, 0, 0, 0, 0, 1] < vec![1, 0, 0, 0, 0, 0, 0, 2]);
1028        assert!(vec![1, 0, 0, 0, 0, 0, 1, 1] > vec![1, 0, 0, 0, 0, 0, 0, 2]);
1029        assert!(
1030            vec![
1031                ScalarValue::Int32(Some(2)),
1032                Null,
1033                ScalarValue::Int32(Some(0)),
1034            ] < vec![
1035                ScalarValue::Int32(Some(2)),
1036                Null,
1037                ScalarValue::Int32(Some(1)),
1038            ]
1039        );
1040        assert!(
1041            vec![
1042                ScalarValue::Int32(Some(2)),
1043                ScalarValue::Int32(None),
1044                ScalarValue::Int32(Some(0)),
1045            ] < vec![
1046                ScalarValue::Int32(Some(2)),
1047                ScalarValue::Int32(None),
1048                ScalarValue::Int32(Some(1)),
1049            ]
1050        );
1051    }
1052
1053    #[test]
1054    fn ord_same_type() {
1055        assert!((ScalarValue::Int32(Some(2)) < ScalarValue::Int32(Some(3))));
1056    }
1057
1058    #[test]
1059    fn test_bisect_linear_left_and_right_diff_sort() -> Result<()> {
1060        // Descending, left
1061        let arrays: Vec<ArrayRef> =
1062            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1063        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1064        let ords = [SortOptions {
1065            descending: true,
1066            nulls_first: true,
1067        }];
1068        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1069        assert_eq!(res, 0);
1070        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1071        assert_eq!(res, 0);
1072
1073        // Descending, right
1074        let arrays: Vec<ArrayRef> =
1075            vec![Arc::new(Float64Array::from(vec![4.0, 3.0, 2.0, 1.0, 0.0]))];
1076        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(4.0))];
1077        let ords = [SortOptions {
1078            descending: true,
1079            nulls_first: true,
1080        }];
1081        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1082        assert_eq!(res, 1);
1083        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1084        assert_eq!(res, 1);
1085
1086        // Ascending, left
1087        let arrays: Vec<ArrayRef> =
1088            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1089        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1090        let ords = [SortOptions {
1091            descending: false,
1092            nulls_first: true,
1093        }];
1094        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1095        assert_eq!(res, 1);
1096        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1097        assert_eq!(res, 1);
1098
1099        // Ascending, right
1100        let arrays: Vec<ArrayRef> =
1101            vec![Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.]))];
1102        let search_tuple: Vec<ScalarValue> = vec![ScalarValue::Float64(Some(7.0))];
1103        let ords = [SortOptions {
1104            descending: false,
1105            nulls_first: true,
1106        }];
1107        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1108        assert_eq!(res, 2);
1109        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1110        assert_eq!(res, 2);
1111
1112        let arrays: Vec<ArrayRef> = vec![
1113            Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 8.0, 9., 10.])),
1114            Arc::new(Float64Array::from(vec![10.0, 9.0, 8.0, 7.5, 7., 6.])),
1115        ];
1116        let search_tuple: Vec<ScalarValue> = vec![
1117            ScalarValue::Float64(Some(8.0)),
1118            ScalarValue::Float64(Some(8.0)),
1119        ];
1120        let ords = [
1121            SortOptions {
1122                descending: false,
1123                nulls_first: true,
1124            },
1125            SortOptions {
1126                descending: true,
1127                nulls_first: true,
1128            },
1129        ];
1130        let res = bisect::<false>(&arrays, &search_tuple, &ords)?;
1131        assert_eq!(res, 3);
1132        let res = linear_search::<false>(&arrays, &search_tuple, &ords)?;
1133        assert_eq!(res, 3);
1134
1135        let res = bisect::<true>(&arrays, &search_tuple, &ords)?;
1136        assert_eq!(res, 2);
1137        let res = linear_search::<true>(&arrays, &search_tuple, &ords)?;
1138        assert_eq!(res, 2);
1139        Ok(())
1140    }
1141
1142    #[test]
1143    fn test_evaluate_partition_ranges() -> Result<()> {
1144        let arrays: Vec<ArrayRef> = vec![
1145            Arc::new(Float64Array::from(vec![1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
1146            Arc::new(Float64Array::from(vec![4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
1147        ];
1148        let n_row = arrays[0].len();
1149        let options: Vec<SortOptions> = vec![
1150            SortOptions {
1151                descending: false,
1152                nulls_first: false,
1153            },
1154            SortOptions {
1155                descending: true,
1156                nulls_first: false,
1157            },
1158        ];
1159        let sort_columns = arrays
1160            .into_iter()
1161            .zip(options)
1162            .map(|(values, options)| SortColumn {
1163                values,
1164                options: Some(options),
1165            })
1166            .collect::<Vec<_>>();
1167        let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
1168        assert_eq!(ranges.len(), 4);
1169        assert_eq!(ranges[0], Range { start: 0, end: 2 });
1170        assert_eq!(ranges[1], Range { start: 2, end: 3 });
1171        assert_eq!(ranges[2], Range { start: 3, end: 4 });
1172        assert_eq!(ranges[3], Range { start: 4, end: 6 });
1173        Ok(())
1174    }
1175
1176    #[test]
1177    fn test_quote_identifier() -> Result<()> {
1178        let cases = vec![
1179            ("foo", r#"foo"#),
1180            ("_foo", r#"_foo"#),
1181            ("foo_bar", r#"foo_bar"#),
1182            ("foo-bar", r#""foo-bar""#),
1183            // name itself has a period, needs to be quoted
1184            ("foo.bar", r#""foo.bar""#),
1185            ("Foo", r#""Foo""#),
1186            ("Foo.Bar", r#""Foo.Bar""#),
1187            // name starting with a number needs to be quoted
1188            ("test1", r#"test1"#),
1189            ("1test", r#""1test""#),
1190        ];
1191
1192        for (identifier, quoted_identifier) in cases {
1193            println!("input: \n{identifier}\nquoted_identifier:\n{quoted_identifier}");
1194
1195            assert_eq!(quote_identifier(identifier), quoted_identifier);
1196
1197            // When parsing the quoted identifier, it should be a
1198            // a single identifier without normalization, and not in multiple parts
1199            let quote_style = if quoted_identifier.starts_with('"') {
1200                Some('"')
1201            } else {
1202                None
1203            };
1204
1205            let expected_parsed = vec![Ident {
1206                value: identifier.to_string(),
1207                quote_style,
1208                span: Span::empty(),
1209            }];
1210
1211            assert_eq!(
1212                parse_identifiers(quoted_identifier).unwrap(),
1213                expected_parsed
1214            );
1215        }
1216
1217        Ok(())
1218    }
1219
1220    #[test]
1221    fn test_get_at_indices() -> Result<()> {
1222        let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
1223        assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
1224        assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
1225        // 7 is outside the range
1226        assert!(get_at_indices(&in_vec, [7]).is_err());
1227        Ok(())
1228    }
1229
1230    #[test]
1231    fn test_longest_consecutive_prefix() {
1232        assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
1233        assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
1234        assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
1235        assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
1236    }
1237
1238    #[test]
1239    fn test_merge_and_order_indices() {
1240        assert_eq!(
1241            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
1242            vec![0, 1, 3, 4, 5]
1243        );
1244        // Result should be ordered, even if inputs are not
1245        assert_eq!(
1246            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
1247            vec![0, 1, 3, 4, 5]
1248        );
1249    }
1250
1251    #[test]
1252    fn test_set_difference() {
1253        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
1254        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
1255        // return value should have same ordering with the in1
1256        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
1257        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
1258        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
1259    }
1260
1261    #[test]
1262    #[expect(deprecated)]
1263    fn test_is_sorted() {
1264        assert!(is_sorted::<usize>([]));
1265        assert!(is_sorted([0]));
1266        assert!(is_sorted([0, 3, 4]));
1267        assert!(is_sorted([0, 1, 2]));
1268        assert!(is_sorted([0, 1, 4]));
1269        assert!(is_sorted([0usize; 0]));
1270        assert!(is_sorted([1, 2]));
1271        assert!(!is_sorted([3, 2]));
1272    }
1273
1274    #[test]
1275    fn test_find_indices() -> Result<()> {
1276        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
1277        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
1278        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
1279        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
1280        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
1281        Ok(())
1282    }
1283
1284    #[test]
1285    fn test_transpose() -> Result<()> {
1286        let in_data = vec![vec![1, 2, 3], vec![4, 5, 6]];
1287        let transposed = transpose(in_data);
1288        let expected = vec![vec![1, 4], vec![2, 5], vec![3, 6]];
1289        assert_eq!(expected, transposed);
1290        Ok(())
1291    }
1292}