datafusion_physical_expr/intervals/
utils.rs1use std::sync::Arc;
21
22use crate::{
23 expressions::{BinaryExpr, CastExpr, Column, Literal, NegativeExpr},
24 PhysicalExpr,
25};
26
27use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
28use arrow::datatypes::{DataType, SchemaRef};
29use datafusion_common::{internal_err, Result, ScalarValue};
30use datafusion_expr::interval_arithmetic::Interval;
31use datafusion_expr::Operator;
32
33pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
39 let expr_any = expr.as_any();
40 if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
41 is_operator_supported(binary_expr.op())
42 && check_support(binary_expr.left(), schema)
43 && check_support(binary_expr.right(), schema)
44 } else if let Some(column) = expr_any.downcast_ref::<Column>() {
45 if let Ok(field) = schema.field_with_name(column.name()) {
46 is_datatype_supported(field.data_type())
47 } else {
48 return false;
49 }
50 } else if let Some(literal) = expr_any.downcast_ref::<Literal>() {
51 if let Ok(dt) = literal.data_type(schema) {
52 is_datatype_supported(&dt)
53 } else {
54 return false;
55 }
56 } else if let Some(cast) = expr_any.downcast_ref::<CastExpr>() {
57 check_support(cast.expr(), schema)
58 } else if let Some(negative) = expr_any.downcast_ref::<NegativeExpr>() {
59 check_support(negative.arg(), schema)
60 } else {
61 false
62 }
63}
64
65pub fn get_inverse_op(op: Operator) -> Result<Operator> {
67 match op {
68 Operator::Plus => Ok(Operator::Minus),
69 Operator::Minus => Ok(Operator::Plus),
70 Operator::Multiply => Ok(Operator::Divide),
71 Operator::Divide => Ok(Operator::Multiply),
72 _ => internal_err!("Interval arithmetic does not support the operator {}", op),
73 }
74}
75
76pub fn is_operator_supported(op: &Operator) -> bool {
78 matches!(
79 op,
80 &Operator::Plus
81 | &Operator::Minus
82 | &Operator::And
83 | &Operator::Gt
84 | &Operator::GtEq
85 | &Operator::Lt
86 | &Operator::LtEq
87 | &Operator::Eq
88 | &Operator::Multiply
89 | &Operator::Divide
90 )
91}
92
93pub fn is_datatype_supported(data_type: &DataType) -> bool {
95 matches!(
96 data_type,
97 &DataType::Int64
98 | &DataType::Int32
99 | &DataType::Int16
100 | &DataType::Int8
101 | &DataType::UInt64
102 | &DataType::UInt32
103 | &DataType::UInt16
104 | &DataType::UInt8
105 | &DataType::Float64
106 | &DataType::Float32
107 )
108}
109
110pub fn convert_interval_type_to_duration(interval: &Interval) -> Option<Interval> {
112 if let (Some(lower), Some(upper)) = (
113 convert_interval_bound_to_duration(interval.lower()),
114 convert_interval_bound_to_duration(interval.upper()),
115 ) {
116 Interval::try_new(lower, upper).ok()
117 } else {
118 None
119 }
120}
121
122fn convert_interval_bound_to_duration(
124 interval_bound: &ScalarValue,
125) -> Option<ScalarValue> {
126 match interval_bound {
127 ScalarValue::IntervalMonthDayNano(Some(mdn)) => interval_mdn_to_duration_ns(mdn)
128 .ok()
129 .map(|duration| ScalarValue::DurationNanosecond(Some(duration))),
130 ScalarValue::IntervalDayTime(Some(dt)) => interval_dt_to_duration_ms(dt)
131 .ok()
132 .map(|duration| ScalarValue::DurationMillisecond(Some(duration))),
133 _ => None,
134 }
135}
136
137pub fn convert_duration_type_to_interval(interval: &Interval) -> Option<Interval> {
139 if let (Some(lower), Some(upper)) = (
140 convert_duration_bound_to_interval(interval.lower()),
141 convert_duration_bound_to_interval(interval.upper()),
142 ) {
143 Interval::try_new(lower, upper).ok()
144 } else {
145 None
146 }
147}
148
149fn convert_duration_bound_to_interval(
151 interval_bound: &ScalarValue,
152) -> Option<ScalarValue> {
153 match interval_bound {
154 ScalarValue::DurationNanosecond(Some(duration)) => {
155 Some(ScalarValue::new_interval_mdn(0, 0, *duration))
156 }
157 ScalarValue::DurationMicrosecond(Some(duration)) => {
158 Some(ScalarValue::new_interval_mdn(0, 0, *duration * 1000))
159 }
160 ScalarValue::DurationMillisecond(Some(duration)) => {
161 Some(ScalarValue::new_interval_dt(0, *duration as i32))
162 }
163 ScalarValue::DurationSecond(Some(duration)) => {
164 Some(ScalarValue::new_interval_dt(0, *duration as i32 * 1000))
165 }
166 _ => None,
167 }
168}
169
170fn interval_mdn_to_duration_ns(mdn: &IntervalMonthDayNano) -> Result<i64> {
173 if mdn.months == 0 && mdn.days == 0 {
174 Ok(mdn.nanoseconds)
175 } else {
176 internal_err!(
177 "The interval cannot have a non-zero month or day value for duration convertibility"
178 )
179 }
180}
181
182fn interval_dt_to_duration_ms(dt: &IntervalDayTime) -> Result<i64> {
185 if dt.days == 0 {
186 Ok(dt.milliseconds as i64)
188 } else {
189 internal_err!(
190 "The interval cannot have a non-zero day value for duration convertibility"
191 )
192 }
193}