datafusion_physical_expr/intervals/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility functions for the interval arithmetic library
19
20use std::sync::Arc;
21
22use crate::{
23    expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
24    PhysicalExpr,
25};
26
27use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{internal_err, Result, ScalarValue};
30use datafusion_expr::interval_arithmetic::Interval;
31use datafusion_expr::Operator;
32
33/// Indicates whether interval arithmetic is supported for the given expression.
34/// Currently, we do not support all [`PhysicalExpr`]s for interval calculations.
35/// We do not support every type of [`Operator`]s either. Over time, this check
36/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
37/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
38pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
39    let expr_any = expr.as_any();
40    if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
41        is_operator_supported(binary_expr.op())
42            && check_support(binary_expr.left(), schema)
43            && check_support(binary_expr.right(), schema)
44    } else if let Some(column) = expr_any.downcast_ref::<Column>() {
45        if let Ok(field) = schema.field_with_name(column.name()) {
46            is_datatype_supported(field.data_type())
47        } else {
48            return false;
49        }
50    } else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
51        if let Ok(dt) = literal.data_type(schema) {
52            is_datatype_supported(&dt)
53        } else {
54            return false;
55        }
56    } else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
57        check_support(cast.expr(), schema)
58    } else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
59        check_support(negative.arg(), schema)
60    } else {
61        false
62    }
63}
64
65// This function returns the inverse operator of the given operator.
66pub fn get_inverse_op(op: Operator) -> Result<Operator> {
67    match op {
68        Operator::Plus => Ok(Operator::Minus),
69        Operator::Minus => Ok(Operator::Plus),
70        Operator::Multiply => Ok(Operator::Divide),
71        Operator::Divide => Ok(Operator::Multiply),
72        _ => internal_err!("Interval arithmetic does not support the operator {}", op),
73    }
74}
75
76/// Indicates whether interval arithmetic is supported for the given operator.
77pub fn is_operator_supported(op: &Operator) -> bool {
78    matches!(
79        op,
80        &Operator::Plus
81            | &Operator::Minus
82            | &Operator::And
83            | &Operator::Gt
84            | &Operator::GtEq
85            | &Operator::Lt
86            | &Operator::LtEq
87            | &Operator::Eq
88            | &Operator::Multiply
89            | &Operator::Divide
90    )
91}
92
93/// Indicates whether interval arithmetic is supported for the given data type.
94pub fn is_datatype_supported(data_type: &DataType) -> bool {
95    matches!(
96        data_type,
97        &DataType::Int64
98            | &DataType::Int32
99            | &DataType::Int16
100            | &DataType::Int8
101            | &DataType::UInt64
102            | &DataType::UInt32
103            | &DataType::UInt16
104            | &DataType::UInt8
105            | &DataType::Float64
106            | &DataType::Float32
107    )
108}
109
110/// Converts an [`Interval`] of time intervals to one of `Duration`s, if applicable. Otherwise, returns [`None`].
111pub fn convert_interval_type_to_duration(interval: &Interval) -> Option<Interval> {
112    if let (Some(lower), Some(upper)) = (
113        convert_interval_bound_to_duration(interval.lower()),
114        convert_interval_bound_to_duration(interval.upper()),
115    ) {
116        Interval::try_new(lower, upper).ok()
117    } else {
118        None
119    }
120}
121
122/// Converts an [`ScalarValue`] containing a time interval to one containing a `Duration`, if applicable. Otherwise, returns [`None`].
123fn convert_interval_bound_to_duration(
124    interval_bound: &ScalarValue,
125) -> Option<ScalarValue> {
126    match interval_bound {
127        ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn)
128            .ok()
129            .map(|duration| ScalarValue::DurationNanosecond(Some(duration))),
130        ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt)
131            .ok()
132            .map(|duration| ScalarValue::DurationMillisecond(Some(duration))),
133        _ => None,
134    }
135}
136
137/// Converts an [`Interval`] of `Duration`s to one of time intervals, if applicable. Otherwise, returns [`None`].
138pub fn convert_duration_type_to_interval(interval: &Interval) -> Option<Interval> {
139    if let (Some(lower), Some(upper)) = (
140        convert_duration_bound_to_interval(interval.lower()),
141        convert_duration_bound_to_interval(interval.upper()),
142    ) {
143        Interval::try_new(lower, upper).ok()
144    } else {
145        None
146    }
147}
148
149/// Converts a [`ScalarValue`] containing a `Duration` to one containing a time interval, if applicable. Otherwise, returns [`None`].
150fn convert_duration_bound_to_interval(
151    interval_bound: &ScalarValue,
152) -> Option<ScalarValue> {
153    match interval_bound {
154        ScalarValue::DurationNanosecond(Some(duration)) => {
155            Some(ScalarValue::new_interval_mdn(0, 0, *duration))
156        }
157        ScalarValue::DurationMicrosecond(Some(duration)) => {
158            Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000))
159        }
160        ScalarValue::DurationMillisecond(Some(duration)) => {
161            Some(ScalarValue::new_interval_dt(0, *duration as i32))
162        }
163        ScalarValue::DurationSecond(Some(duration)) => {
164            Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000))
165        }
166        _ => None,
167    }
168}
169
170/// If both the month and day fields of [`ScalarValue::IntervalMonthDayNano`] are zero, this function returns the nanoseconds part.
171/// Otherwise, it returns an error.
172fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result<i64> {
173    if mdn.months == 0 && mdn.days == 0 {
174        Ok(mdn.nanoseconds)
175    } else {
176        internal_err!(
177            "The interval cannot have a non-zero month or day value for duration convertibility"
178        )
179    }
180}
181
182/// If the day field of the [`ScalarValue::IntervalDayTime`] is zero, this function returns the milliseconds part.
183/// Otherwise, it returns an error.
184fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result<i64> {
185    if dt.days == 0 {
186        // Safe to cast i32 to i64
187        Ok(dt.milliseconds as i64)
188    } else {
189        internal_err!(
190            "The interval cannot have a non-zero day value for duration convertibility"
191        )
192    }
193}