mod adapter;
mod kernels;
mod kernels_arrow;
use std::{any::Any, sync::Arc};
use chrono::NaiveDateTime;
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add_dyn, add_scalar_dyn as add_dyn_scalar, divide_dyn_opt,
divide_scalar_dyn as divide_dyn_scalar, modulus_dyn,
modulus_scalar_dyn as modulus_dyn_scalar, multiply_dyn,
multiply_scalar_dyn as multiply_dyn_scalar, subtract_dyn,
subtract_scalar_dyn as subtract_dyn_scalar,
};
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
use arrow::compute::kernels::comparison::regexp_is_match_utf8;
use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
use arrow::compute::kernels::comparison::{
eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar,
lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_bool_scalar, gt_dyn_bool_scalar, gt_eq_dyn_bool_scalar, lt_dyn_bool_scalar,
lt_eq_dyn_bool_scalar, neq_dyn_bool_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_scalar, gt_dyn_scalar, gt_eq_dyn_scalar, lt_dyn_scalar, lt_eq_dyn_scalar,
neq_dyn_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
use arrow::compute::{cast, try_unary, unary, CastOptions};
use arrow::datatypes::*;
use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
use datafusion_common::scalar::{
calculate_naives, microseconds_add, microseconds_sub, milliseconds_add,
milliseconds_sub, nanoseconds_add, nanoseconds_sub, op_dt, op_dt_mdn, op_mdn, op_ym,
op_ym_dt, op_ym_mdn, parse_timezones, seconds_add, seconds_sub, MILLISECOND_MODE,
NANOSECOND_MODE,
};
use datafusion_expr::type_coercion::{is_decimal, is_timestamp, is_utf8_or_large_utf8};
use kernels::{
bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
};
use kernels_arrow::{
add_decimal_dyn_scalar, add_dyn_decimal, add_dyn_temporal, add_dyn_temporal_scalar,
divide_decimal_dyn_scalar, divide_dyn_opt_decimal, is_distinct_from,
is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_f32,
is_distinct_from_f64, is_distinct_from_null, is_distinct_from_utf8,
is_not_distinct_from, is_not_distinct_from_bool, is_not_distinct_from_decimal,
is_not_distinct_from_f32, is_not_distinct_from_f64, is_not_distinct_from_null,
is_not_distinct_from_utf8, modulus_decimal_dyn_scalar, modulus_dyn_decimal,
multiply_decimal_dyn_scalar, multiply_dyn_decimal, subtract_decimal_dyn_scalar,
subtract_dyn_decimal, subtract_dyn_temporal, subtract_dyn_temporal_scalar,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use super::column::Column;
use crate::expressions::cast_column;
use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
use crate::intervals::{apply_operator, Interval};
use crate::physical_expr::down_cast_any_ref;
use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::cast::{
as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array,
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
as_timestamp_nanosecond_array, as_timestamp_second_array,
};
use datafusion_common::scalar::*;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::binary::{
coercion_decimal_mathematics_type, get_result_type,
};
use datafusion_expr::{ColumnarValue, Operator};
#[derive(Debug)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
}
impl BinaryExpr {
pub fn new(
left: Arc<dyn PhysicalExpr>,
op: Operator,
right: Arc<dyn PhysicalExpr>,
) -> Self {
Self { left, op, right }
}
pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
&self.left
}
pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
&self.right
}
pub fn op(&self) -> &Operator {
&self.op
}
}
impl std::fmt::Display for BinaryExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn write_child(
f: &mut std::fmt::Formatter,
expr: &dyn PhysicalExpr,
precedence: u8,
) -> std::fmt::Result {
if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
let p = child.op.precedence();
if p == 0 || p < precedence {
write!(f, "({child})")?;
} else {
write!(f, "{child}")?;
}
} else {
write!(f, "{expr}")?;
}
Ok(())
}
let precedence = self.op.precedence();
write_child(f, self.left.as_ref(), precedence)?;
write!(f, " {} ", self.op)?;
write_child(f, self.right.as_ref(), precedence)
}
}
macro_rules! compute_decimal_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
if let ScalarValue::Decimal128(Some(v_i128), _, _) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_scalar>]}($LEFT, v_i128)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_decimal_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT.as_any().downcast_ref::<$DT>().unwrap();
let rr = $RIGHT.as_any().downcast_ref::<$DT>().unwrap();
Ok(Arc::new(paste::expr! {[<$OP _decimal>]}(ll, rr)?))
}};
}
macro_rules! compute_f32_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _f32>]}(ll, rr)?))
}};
}
macro_rules! compute_f64_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _f64>]}(ll, rr)?))
}};
}
macro_rules! compute_null_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _null>]}(&ll, &rr)?))
}};
}
macro_rules! compute_utf8_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _utf8>]}(&ll, &rr)?))
}};
}
macro_rules! compute_utf8_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident, $OP_TYPE:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
if let ScalarValue::Utf8(Some(string_value))
| ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT
{
Ok(Arc::new(paste::expr! {[<$OP _utf8_scalar>]}(
&ll,
&string_value,
)?))
} else if $RIGHT.is_null() {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar for '{}' failed to cast literal value {}",
stringify!($OP),
$RIGHT
)))
}
}};
}
macro_rules! compute_utf8_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
if let Some(string_value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_utf8_scalar>]}(
$LEFT,
&string_value,
)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_binary_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
if let Some(binary_value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_binary_scalar>]}(
$LEFT,
&binary_value,
)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_bool_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
if let Some(b) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_bool_scalar>]}(
$LEFT,
b,
)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_bool_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new(paste::expr! {[<$OP _bool>]}(&ll, &rr)?))
}};
($OPERAND:expr, $OP:ident, $DT:ident) => {{
let operand = $OPERAND
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast operant array");
Ok(Arc::new(paste::expr! {[<$OP _bool>]}(&operand)?))
}};
}
macro_rules! compute_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
if let Some(value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_scalar>]}(
$LEFT,
value,
)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_primitive_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr, $SCALAR_TYPE:ident) => {{
if let Some(value) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_scalar>]::<$SCALAR_TYPE>}(
$LEFT,
value,
)?))
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_primitive_decimal_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr, $RET_TYPE:expr) => {{
if let Some(value) = $RIGHT {
Ok(paste::expr! {[<$OP _decimal_dyn_scalar>]}(
$LEFT, value, $RET_TYPE,
)?)
} else {
Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len())))
}
}};
}
macro_rules! compute_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast left side array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast right side array");
Ok(Arc::new($OP(&ll, &rr)?))
}};
($OPERAND:expr, $OP:ident, $DT:ident) => {{
let operand = $OPERAND
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
Ok(Arc::new($OP(&operand)?))
}};
}
macro_rules! binary_string_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
DataType::LargeUtf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, LargeStringArray),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary operation '{}' on string arrays",
other, stringify!($OP)
))),
}
}};
}
macro_rules! binary_primitive_array_op_dyn {
($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
match $LEFT.data_type() {
DataType::Decimal128(_, _) => {
Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
}
DataType::Dictionary(_, value_type)
if matches!(value_type.as_ref(), &DataType::Decimal128(_, _)) =>
{
Ok(paste::expr! {[<$OP _decimal>]}(&$LEFT, &$RIGHT, $RET_TYPE)?)
}
_ => Ok(Arc::new(
$OP(&$LEFT, &$RIGHT).map_err(|err| DataFusionError::ArrowError(err))?,
)),
}
}};
}
macro_rules! binary_primitive_array_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $RET_TYPE:expr) => {{
let right = unwrap_dict_value($RIGHT);
let op_type = $LEFT.data_type();
let result: Result<Arc<dyn Array>> = match right {
ScalarValue::Decimal128(v, _, _) => compute_primitive_decimal_op_dyn_scalar!($LEFT, v, $OP, op_type, $RET_TYPE),
ScalarValue::Int8(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int8Type),
ScalarValue::Int16(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int16Type),
ScalarValue::Int32(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int32Type),
ScalarValue::Int64(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Int64Type),
ScalarValue::UInt8(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, UInt8Type),
ScalarValue::UInt16(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, UInt16Type),
ScalarValue::UInt32(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, UInt32Type),
ScalarValue::UInt64(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, UInt64Type),
ScalarValue::Float32(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Float32Type),
ScalarValue::Float64(v) => compute_primitive_op_dyn_scalar!($LEFT, v, $OP, op_type, Float64Type),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation '{}' on dyn array",
other, stringify!($OP)))
)
};
Some(result)
}}
}
#[macro_export]
macro_rules! binary_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Null => compute_null_op!($LEFT, $RIGHT, $OP, NullArray),
DataType::Decimal128(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array),
DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_f32_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_f64_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
DataType::LargeUtf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, LargeStringArray),
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
compute_op!($LEFT, $RIGHT, $OP, Date32Array)
}
DataType::Date64 => {
compute_op!($LEFT, $RIGHT, $OP, Date64Array)
}
DataType::Time32(TimeUnit::Second) => {
compute_op!($LEFT, $RIGHT, $OP, Time32SecondArray)
}
DataType::Time32(TimeUnit::Millisecond) => {
compute_op!($LEFT, $RIGHT, $OP, Time32MillisecondArray)
}
DataType::Time64(TimeUnit::Microsecond) => {
compute_op!($LEFT, $RIGHT, $OP, Time64MicrosecondArray)
}
DataType::Time64(TimeUnit::Nanosecond) => {
compute_op!($LEFT, $RIGHT, $OP, Time64NanosecondArray)
}
DataType::Boolean => compute_bool_op!($LEFT, $RIGHT, $OP, BooleanArray),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary operation '{}' on dyn arrays",
other, stringify!($OP)
))),
}
}};
}
macro_rules! boolean_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast array");
let rr = as_boolean_array($RIGHT).expect("boolean_op failed to downcast array");
Ok(Arc::new($OP(&ll, &rr)?))
}};
}
macro_rules! binary_string_array_flag_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
match $LEFT.data_type() {
DataType::Utf8 => {
compute_utf8_flag_op!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
}
DataType::LargeUtf8 => {
compute_utf8_flag_op!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
}
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary_string_array_flag_op operation '{}' on string array",
other, stringify!($OP)
))),
}
}};
}
macro_rules! compute_utf8_flag_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("compute_utf8_flag_op failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("compute_utf8_flag_op failed to downcast array");
let flag = if $FLAG {
Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
} else {
None
};
let mut array = paste::expr! {[<$OP _utf8>]}(&ll, &rr, flag.as_ref())?;
if $NOT {
array = not(&array).unwrap();
}
Ok(Arc::new(array))
}};
}
macro_rules! binary_string_array_flag_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Utf8 => {
compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
}
DataType::LargeUtf8 => {
compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
}
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for binary_string_array_flag_op_scalar operation '{}' on string array",
other, stringify!($OP)
))),
};
Some(result)
}};
}
macro_rules! compute_utf8_flag_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$ARRAYTYPE>()
.expect("compute_utf8_flag_op_scalar failed to downcast array");
if let ScalarValue::Utf8(Some(string_value))|ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT {
let flag = if $FLAG { Some("i") } else { None };
let mut array =
paste::expr! {[<$OP _utf8_scalar>]}(&ll, &string_value, flag)?;
if $NOT {
array = not(&array).unwrap();
}
Ok(Arc::new(array))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_flag_op_scalar failed to cast literal value {} for operation '{}'",
$RIGHT, stringify!($OP)
)))
}
}};
}
impl PhysicalExpr for BinaryExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
get_result_type(
&self.left.data_type(input_schema)?,
&self.op,
&self.right.data_type(input_schema)?,
)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let left_value = self.left.evaluate(batch)?;
let right_value = self.right.evaluate(batch)?;
let left_data_type = left_value.data_type();
let right_data_type = right_value.data_type();
let schema = batch.schema();
let input_schema = schema.as_ref();
let coerced_type = coercion_decimal_mathematics_type(
&self.op,
&left_data_type,
&right_data_type,
);
let (left_value, right_value) = if let Some(coerced_type) = coerced_type {
let options = CastOptions { safe: true };
let left_value = cast_column(&left_value, &coerced_type, &options)?;
let right_value = cast_column(&right_value, &coerced_type, &options)?;
(left_value, right_value)
} else {
(left_value, right_value)
};
let result_type = self.data_type(input_schema)?;
let scalar_result = match (&left_value, &right_value) {
(ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
self.evaluate_array_scalar(array, scalar.clone(), &result_type)?
.map(|r| {
r.and_then(|a| to_result_type_array(&self.op, a, &result_type))
})
}
(ColumnarValue::Scalar(scalar), ColumnarValue::Array(array)) => {
self.evaluate_scalar_array(scalar.clone(), array)?
}
(_, _) => None, };
if let Some(result) = scalar_result {
return result.map(|a| ColumnarValue::Array(a));
}
let (left, right) = (
left_value.into_array(batch.num_rows()),
right_value.into_array(batch.num_rows()),
);
self.evaluate_with_resolved_args(
left,
&left_data_type,
right,
&right_data_type,
&result_type,
)
.map(|a| ColumnarValue::Array(a))
}
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.left.clone(), self.right.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(BinaryExpr::new(
children[0].clone(),
self.op,
children[1].clone(),
)))
}
fn analyze(&self, context: AnalysisContext) -> AnalysisContext {
match &self.op {
Operator::Eq
| Operator::Gt
| Operator::Lt
| Operator::LtEq
| Operator::GtEq => {
let context = self.left.analyze(context);
let left_boundaries =
analysis_expect!(context, context.boundaries()).clone();
let context = self.right.analyze(context);
let right_boundaries =
analysis_expect!(context, context.boundaries.clone());
match (left_boundaries.reduce(), right_boundaries.reduce()) {
(_, Some(right_value)) => {
analyze_expr_scalar_comparison(
context,
&self.op,
&self.left,
right_value,
)
}
(Some(left_value), _) => {
let swapped_op = analysis_expect!(context, self.op.swap());
analyze_expr_scalar_comparison(
context,
&swapped_op,
&self.right,
left_value,
)
}
_ => {
context.with_boundaries(None)
}
}
}
_ => context.with_boundaries(None),
}
}
fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
let left_interval = children[0];
let right_interval = children[1];
apply_operator(&self.op, left_interval, right_interval)
}
fn propagate_constraints(
&self,
interval: &Interval,
children: &[&Interval],
) -> Result<Vec<Option<Interval>>> {
let left_interval = children[0];
let right_interval = children[1];
let (left, right) = if self.op.is_logic_operator() {
return Ok(vec![]);
} else if self.op.is_comparison_operator() {
if interval == &Interval::CERTAINLY_FALSE {
return Ok(vec![]);
}
propagate_comparison(&self.op, left_interval, right_interval)?
} else {
propagate_arithmetic(&self.op, interval, left_interval, right_interval)?
};
Ok(vec![left, right])
}
}
impl PartialEq<dyn Any> for BinaryExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| self.left.eq(&x.left) && self.op == x.op && self.right.eq(&x.right))
.unwrap_or(false)
}
}
fn analyze_expr_scalar_comparison(
context: AnalysisContext,
op: &Operator,
left: &Arc<dyn PhysicalExpr>,
right: ScalarValue,
) -> AnalysisContext {
let left_bounds = analysis_expect!(context, left.analyze(context.clone()).boundaries);
let left_min = left_bounds.min_value;
let left_max = left_bounds.max_value;
let (always_selects, never_selects) = match op {
Operator::Lt => (right > left_max, right <= left_min),
Operator::LtEq => (right >= left_max, right < left_min),
Operator::Gt => (right < left_min, right >= left_max),
Operator::GtEq => (right <= left_min, right > left_max),
Operator::Eq => (
false,
right < left_min || right > left_max,
),
_ => unreachable!(),
};
assert!(!(always_selects && never_selects));
let selectivity = match (always_selects, never_selects) {
(true, _) => 1.0,
(_, true) => 0.0,
(false, false) => {
let total_range = analysis_expect!(context, left_max.distance(&left_min)) + 1;
let overlap_between_boundaries = analysis_expect!(
context,
match op {
Operator::Lt => right.distance(&left_min),
Operator::Gt => left_max.distance(&right),
Operator::LtEq => right.distance(&left_min).map(|dist| dist + 1),
Operator::GtEq => left_max.distance(&right).map(|dist| dist + 1),
Operator::Eq => Some(1),
_ => None,
}
);
overlap_between_boundaries as f64 / total_range as f64
}
};
let context = match left.as_any().downcast_ref::<Column>() {
Some(column_expr) => {
let (left_min, left_max) = match op {
Operator::Lt | Operator::LtEq => {
if right < left_max && selectivity > 0.0 {
(left_min, right)
} else {
(left_min, left_max)
}
}
Operator::Gt | Operator::GtEq => {
if right > left_min && selectivity > 0.0 {
(right, left_max)
} else {
(left_min, left_max)
}
}
Operator::Eq => (right.clone(), right),
_ => unreachable!(),
};
let left_bounds =
ExprBoundaries::new(left_min, left_max, left_bounds.distinct_count);
context.with_column_update(column_expr.index(), left_bounds)
}
None => context,
};
assert!(selectivity <= 1.0);
let (pred_min, pred_max, pred_distinct) = match (always_selects, never_selects) {
(false, true) => (false, false, 1),
(true, false) => (true, true, 1),
_ => (false, true, 2),
};
let result_boundaries = Some(ExprBoundaries::new_with_selectivity(
ScalarValue::Boolean(Some(pred_min)),
ScalarValue::Boolean(Some(pred_max)),
Some(pred_distinct),
Some(selectivity),
));
context.with_boundaries(result_boundaries)
}
fn unwrap_dict_value(v: ScalarValue) -> ScalarValue {
if let ScalarValue::Dictionary(_key_type, v) = v {
unwrap_dict_value(*v)
} else {
v
}
}
#[macro_export]
macro_rules! binary_array_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
let right = unwrap_dict_value($RIGHT);
let result: Result<Arc<dyn Array>> = match right {
ScalarValue::Boolean(b) => compute_bool_op_dyn_scalar!($LEFT, b, $OP, $OP_TYPE),
ScalarValue::Decimal128(..) => compute_decimal_op_dyn_scalar!($LEFT, right, $OP, $OP_TYPE),
ScalarValue::Utf8(v) => compute_utf8_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::LargeUtf8(v) => compute_utf8_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Binary(v) => compute_binary_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::LargeBinary(v) => compute_binary_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Int8(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Int16(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Int32(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Int64(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::UInt8(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::UInt16(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::UInt32(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::UInt64(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Float32(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Float64(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Date32(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Date64(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Time32Second(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Time32Millisecond(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Time64Microsecond(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::Time64Nanosecond(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::TimestampSecond(v, _) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::TimestampMillisecond(v, _) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::TimestampMicrosecond(v, _) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
ScalarValue::TimestampNanosecond(v, _) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation '{}' on dyn array",
other, stringify!($OP)))
)
};
Some(result)
}}
}
pub(crate) fn array_eq_scalar(lhs: &dyn Array, rhs: &ScalarValue) -> Result<ArrayRef> {
binary_array_op_dyn_scalar!(lhs, rhs.clone(), eq, &DataType::Boolean).ok_or_else(
|| {
DataFusionError::Internal(format!(
"Data type {:?} and scalar {:?} not supported for array_eq_scalar",
lhs.data_type(),
rhs.get_datatype()
))
},
)?
}
fn to_result_type_array(
op: &Operator,
array: ArrayRef,
result_type: &DataType,
) -> Result<ArrayRef> {
if op.is_numerical_operators() {
match array.data_type() {
DataType::Dictionary(_, value_type) => {
if value_type.as_ref() == result_type {
Ok(cast(&array, result_type)?)
} else {
Err(DataFusionError::Internal(format!(
"Incompatible Dictionary value type {:?} with result type {:?} of Binary operator {:?}",
value_type, result_type, op
)))
}
}
_ => Ok(array),
}
} else {
Ok(array)
}
}
impl BinaryExpr {
fn evaluate_array_scalar(
&self,
array: &dyn Array,
scalar: ScalarValue,
result_type: &DataType,
) -> Result<Option<Result<ArrayRef>>> {
let bool_type = &DataType::Boolean;
let scalar_result = match &self.op {
Operator::Lt => {
binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
}
Operator::LtEq => {
binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
}
Operator::Gt => {
binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
}
Operator::GtEq => {
binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
}
Operator::Eq => {
binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
}
Operator::NotEq => {
binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
}
Operator::Plus => {
binary_primitive_array_op_dyn_scalar!(array, scalar, add, result_type)
}
Operator::Minus => {
binary_primitive_array_op_dyn_scalar!(
array,
scalar,
subtract,
result_type
)
}
Operator::Multiply => {
binary_primitive_array_op_dyn_scalar!(
array,
scalar,
multiply,
result_type
)
}
Operator::Divide => {
binary_primitive_array_op_dyn_scalar!(array, scalar, divide, result_type)
}
Operator::Modulo => {
binary_primitive_array_op_dyn_scalar!(array, scalar, modulus, result_type)
}
Operator::RegexMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
false,
false
),
Operator::RegexIMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
false,
true
),
Operator::RegexNotMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
true,
false
),
Operator::RegexNotIMatch => binary_string_array_flag_op_scalar!(
array,
scalar,
regexp_is_match,
true,
true
),
Operator::BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
Operator::BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
Operator::BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
Operator::BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
Operator::BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
_ => None,
};
Ok(scalar_result)
}
fn evaluate_scalar_array(
&self,
scalar: ScalarValue,
array: &ArrayRef,
) -> Result<Option<Result<ArrayRef>>> {
let bool_type = &DataType::Boolean;
let scalar_result = match &self.op {
Operator::Lt => {
binary_array_op_dyn_scalar!(array, scalar, gt, bool_type)
}
Operator::LtEq => {
binary_array_op_dyn_scalar!(array, scalar, gt_eq, bool_type)
}
Operator::Gt => {
binary_array_op_dyn_scalar!(array, scalar, lt, bool_type)
}
Operator::GtEq => {
binary_array_op_dyn_scalar!(array, scalar, lt_eq, bool_type)
}
Operator::Eq => {
binary_array_op_dyn_scalar!(array, scalar, eq, bool_type)
}
Operator::NotEq => {
binary_array_op_dyn_scalar!(array, scalar, neq, bool_type)
}
_ => None,
};
Ok(scalar_result)
}
fn evaluate_with_resolved_args(
&self,
left: Arc<dyn Array>,
left_data_type: &DataType,
right: Arc<dyn Array>,
right_data_type: &DataType,
result_type: &DataType,
) -> Result<ArrayRef> {
match &self.op {
Operator::Lt => lt_dyn(&left, &right),
Operator::LtEq => lt_eq_dyn(&left, &right),
Operator::Gt => gt_dyn(&left, &right),
Operator::GtEq => gt_eq_dyn(&left, &right),
Operator::Eq => eq_dyn(&left, &right),
Operator::NotEq => neq_dyn(&left, &right),
Operator::IsDistinctFrom => {
match (left_data_type, right_data_type) {
(DataType::Null, _) => {
binary_array_op!(right, left, is_distinct_from)
}
_ => binary_array_op!(left, right, is_distinct_from),
}
}
Operator::IsNotDistinctFrom => {
binary_array_op!(left, right, is_not_distinct_from)
}
Operator::Plus => {
binary_primitive_array_op_dyn!(left, right, add_dyn, result_type)
}
Operator::Minus => {
binary_primitive_array_op_dyn!(left, right, subtract_dyn, result_type)
}
Operator::Multiply => {
binary_primitive_array_op_dyn!(left, right, multiply_dyn, result_type)
}
Operator::Divide => {
binary_primitive_array_op_dyn!(left, right, divide_dyn_opt, result_type)
}
Operator::Modulo => {
binary_primitive_array_op_dyn!(left, right, modulus_dyn, result_type)
}
Operator::And => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, and_kleene)
} else {
Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op,
left.data_type(),
right.data_type()
)))
}
}
Operator::Or => {
if left_data_type == &DataType::Boolean {
boolean_op!(&left, &right, or_kleene)
} else {
Err(DataFusionError::Internal(format!(
"Cannot evaluate binary expression {:?} with types {:?} and {:?}",
self.op, left_data_type, right_data_type
)))
}
}
Operator::RegexMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
}
Operator::RegexIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
}
Operator::RegexNotMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
}
Operator::RegexNotIMatch => {
binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
}
Operator::BitwiseAnd => bitwise_and_dyn(left, right),
Operator::BitwiseOr => bitwise_or_dyn(left, right),
Operator::BitwiseXor => bitwise_xor_dyn(left, right),
Operator::BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
Operator::BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
Operator::StringConcat => {
binary_string_array_op!(left, right, concat_elements)
}
}
}
}
pub fn binary(
lhs: Arc<dyn PhysicalExpr>,
op: Operator,
rhs: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let lhs_type = &lhs.data_type(input_schema)?;
let rhs_type = &rhs.data_type(input_schema)?;
if (is_utf8_or_large_utf8(lhs_type) && is_timestamp(rhs_type))
|| (is_timestamp(lhs_type) && is_utf8_or_large_utf8(rhs_type))
{
return Err(DataFusionError::Plan(format!(
"The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
)));
}
if !lhs_type.eq(rhs_type) && (!is_decimal(lhs_type) && !is_decimal(rhs_type)) {
return Err(DataFusionError::Internal(format!(
"The type of {lhs_type} {op:?} {rhs_type} of binary physical should be same"
)));
}
Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
}
macro_rules! sub_timestamp_macro {
($array:expr, $rhs:expr, $caster:expr, $interval_type:ty, $opt_tz_lhs:expr, $multiplier:expr,
$opt_tz_rhs:expr, $unit_sub:expr, $naive_sub_fn:expr, $counter:expr) => {{
let prim_array = $caster(&$array)?;
let ret: PrimitiveArray<$interval_type> = try_unary(prim_array, |lhs| {
let (parsed_lhs_tz, parsed_rhs_tz) =
(parse_timezones($opt_tz_lhs)?, parse_timezones($opt_tz_rhs)?);
let (naive_lhs, naive_rhs) = calculate_naives::<$unit_sub>(
lhs.mul_wrapping($multiplier),
parsed_lhs_tz,
$rhs.mul_wrapping($multiplier),
parsed_rhs_tz,
)?;
Ok($naive_sub_fn($counter(&naive_lhs), $counter(&naive_rhs)))
})?;
Arc::new(ret) as ArrayRef
}};
}
pub fn resolve_temporal_op(
lhs: &ArrayRef,
sign: i32,
rhs: &ArrayRef,
) -> Result<ArrayRef> {
match sign {
1 => add_dyn_temporal(lhs, rhs),
-1 => subtract_dyn_temporal(lhs, rhs),
other => Err(DataFusionError::Internal(format!(
"Undefined operation for temporal types {other}"
))),
}
}
pub fn resolve_temporal_op_scalar(
lhs: &ArrayRef,
sign: i32,
rhs: &ScalarValue,
) -> Result<ColumnarValue> {
match sign {
1 => add_dyn_temporal_scalar(lhs, rhs),
-1 => subtract_dyn_temporal_scalar(lhs, rhs),
other => Err(DataFusionError::Internal(format!(
"Undefined operation for temporal types {other}"
))),
}
}
pub fn ts_scalar_ts_op(array: &ArrayRef, scalar: &ScalarValue) -> Result<ColumnarValue> {
let ret = match (array.data_type(), scalar) {
(
DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
ScalarValue::TimestampSecond(Some(rhs), opt_tz_rhs),
) => {
sub_timestamp_macro!(
array,
rhs,
as_timestamp_second_array,
IntervalDayTimeType,
opt_tz_lhs.as_deref(),
1000,
opt_tz_rhs.as_deref(),
MILLISECOND_MODE,
seconds_sub,
NaiveDateTime::timestamp
)
}
(
DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
ScalarValue::TimestampMillisecond(Some(rhs), opt_tz_rhs),
) => {
sub_timestamp_macro!(
array,
rhs,
as_timestamp_millisecond_array,
IntervalDayTimeType,
opt_tz_lhs.as_deref(),
1,
opt_tz_rhs.as_deref(),
MILLISECOND_MODE,
milliseconds_sub,
NaiveDateTime::timestamp_millis
)
}
(
DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
ScalarValue::TimestampMicrosecond(Some(rhs), opt_tz_rhs),
) => {
sub_timestamp_macro!(
array,
rhs,
as_timestamp_microsecond_array,
IntervalMonthDayNanoType,
opt_tz_lhs.as_deref(),
1000,
opt_tz_rhs.as_deref(),
NANOSECOND_MODE,
microseconds_sub,
NaiveDateTime::timestamp_micros
)
}
(
DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
ScalarValue::TimestampNanosecond(Some(rhs), opt_tz_rhs),
) => {
sub_timestamp_macro!(
array,
rhs,
as_timestamp_nanosecond_array,
IntervalMonthDayNanoType,
opt_tz_lhs.as_deref(),
1,
opt_tz_rhs.as_deref(),
NANOSECOND_MODE,
nanoseconds_sub,
NaiveDateTime::timestamp_nanos
)
}
(_, _) => {
return Err(DataFusionError::Internal(format!(
"Invalid array - scalar types for Timestamp subtraction: {:?} - {:?}",
array.data_type(),
scalar.get_datatype()
)));
}
};
Ok(ColumnarValue::Array(ret))
}
macro_rules! sub_timestamp_interval_macro {
($array:expr, $as_timestamp:expr, $ts_type:ty, $fn_op:expr, $scalar:expr, $sign:expr, $tz:expr) => {{
let array = $as_timestamp(&$array)?;
let ret: PrimitiveArray<$ts_type> =
try_unary::<$ts_type, _, $ts_type>(array, |ts_s| {
Ok($fn_op(ts_s, $scalar, $sign)?)
})?;
Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
}};
}
pub fn ts_scalar_interval_op(
array: &ArrayRef,
sign: i32,
scalar: &ScalarValue,
) -> Result<ColumnarValue> {
let ret = match array.data_type() {
DataType::Timestamp(TimeUnit::Second, tz) => {
sub_timestamp_interval_macro!(
array,
as_timestamp_second_array,
TimestampSecondType,
seconds_add,
scalar,
sign,
tz
)
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
sub_timestamp_interval_macro!(
array,
as_timestamp_millisecond_array,
TimestampMillisecondType,
milliseconds_add,
scalar,
sign,
tz
)
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
sub_timestamp_interval_macro!(
array,
as_timestamp_microsecond_array,
TimestampMicrosecondType,
microseconds_add,
scalar,
sign,
tz
)
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
sub_timestamp_interval_macro!(
array,
as_timestamp_nanosecond_array,
TimestampNanosecondType,
nanoseconds_add,
scalar,
sign,
tz
)
}
_ => Err(DataFusionError::Internal(format!(
"Invalid lhs type for Timestamp vs Interval operations: {}",
array.data_type()
)))?,
};
Ok(ColumnarValue::Array(ret))
}
macro_rules! sub_interval_macro {
($array:expr, $as_interval:expr, $interval_type:ty, $fn_op:expr, $scalar:expr, $sign:expr) => {{
let array = $as_interval(&$array)?;
let ret: PrimitiveArray<$interval_type> =
unary(array, |lhs| $fn_op(lhs, *$scalar, $sign));
Arc::new(ret) as ArrayRef
}};
}
macro_rules! sub_interval_cross_macro {
($array:expr, $as_interval:expr, $commute:expr, $fn_op:expr, $scalar:expr, $sign:expr, $t1:ty, $t2:ty) => {{
let array = $as_interval(&$array)?;
let ret: PrimitiveArray<IntervalMonthDayNanoType> = if $commute {
unary(array, |lhs| {
$fn_op(*$scalar as $t1, lhs as $t2, $sign, $commute)
})
} else {
unary(array, |lhs| {
$fn_op(lhs as $t1, *$scalar as $t2, $sign, $commute)
})
};
Arc::new(ret) as ArrayRef
}};
}
pub fn interval_scalar_interval_op(
array: &ArrayRef,
sign: i32,
scalar: &ScalarValue,
) -> Result<ColumnarValue> {
let ret = match (array.data_type(), scalar) {
(
DataType::Interval(IntervalUnit::YearMonth),
ScalarValue::IntervalYearMonth(Some(rhs)),
) => {
sub_interval_macro!(
array,
as_interval_ym_array,
IntervalYearMonthType,
op_ym,
rhs,
sign
)
}
(
DataType::Interval(IntervalUnit::YearMonth),
ScalarValue::IntervalDayTime(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_ym_array,
false,
op_ym_dt,
rhs,
sign,
i32,
i64
)
}
(
DataType::Interval(IntervalUnit::YearMonth),
ScalarValue::IntervalMonthDayNano(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_ym_array,
false,
op_ym_mdn,
rhs,
sign,
i32,
i128
)
}
(
DataType::Interval(IntervalUnit::DayTime),
ScalarValue::IntervalYearMonth(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_dt_array,
true,
op_ym_dt,
rhs,
sign,
i32,
i64
)
}
(
DataType::Interval(IntervalUnit::DayTime),
ScalarValue::IntervalDayTime(Some(rhs)),
) => {
sub_interval_macro!(
array,
as_interval_dt_array,
IntervalDayTimeType,
op_dt,
rhs,
sign
)
}
(
DataType::Interval(IntervalUnit::DayTime),
ScalarValue::IntervalMonthDayNano(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_dt_array,
false,
op_dt_mdn,
rhs,
sign,
i64,
i128
)
}
(
DataType::Interval(IntervalUnit::MonthDayNano),
ScalarValue::IntervalYearMonth(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_mdn_array,
true,
op_ym_mdn,
rhs,
sign,
i32,
i128
)
}
(
DataType::Interval(IntervalUnit::MonthDayNano),
ScalarValue::IntervalDayTime(Some(rhs)),
) => {
sub_interval_cross_macro!(
array,
as_interval_mdn_array,
true,
op_dt_mdn,
rhs,
sign,
i64,
i128
)
}
(
DataType::Interval(IntervalUnit::MonthDayNano),
ScalarValue::IntervalMonthDayNano(Some(rhs)),
) => {
sub_interval_macro!(
array,
as_interval_mdn_array,
IntervalMonthDayNanoType,
op_mdn,
rhs,
sign
)
}
_ => Err(DataFusionError::Internal(format!(
"Invalid operands for Interval vs Interval operations: {} - {}",
array.data_type(),
scalar.get_datatype(),
)))?,
};
Ok(ColumnarValue::Array(ret))
}
macro_rules! ts_sub_op {
($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{
let prim_array_lhs = $caster(&$lhs)?;
let prim_array_rhs = $caster(&$rhs)?;
let ret: PrimitiveArray<$type_out> =
arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| {
let (parsed_lhs_tz, parsed_rhs_tz) = (
parse_timezones($lhs_tz.as_deref())?,
parse_timezones($rhs_tz.as_deref())?,
);
let (naive_lhs, naive_rhs) = calculate_naives::<$mode>(
ts1.mul_wrapping($coef),
parsed_lhs_tz,
ts2.mul_wrapping($coef),
parsed_rhs_tz,
)?;
Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs)))
})?;
Arc::new(ret) as ArrayRef
}};
}
macro_rules! interval_op {
($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{
let prim_array_lhs = $caster(&$lhs)?;
let prim_array_rhs = $caster(&$rhs)?;
let ret = Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>(
prim_array_lhs,
prim_array_rhs,
|interval1, interval2| $op(interval1, interval2, $sign),
)?) as ArrayRef;
ret
}};
}
macro_rules! interval_cross_op {
($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{
let prim_array_lhs = $caster1(&$lhs)?;
let prim_array_rhs = $caster2(&$rhs)?;
let ret = Arc::new(arrow::compute::binary::<
$type_in1,
$type_in2,
_,
IntervalMonthDayNanoType,
>(
prim_array_lhs,
prim_array_rhs,
|interval1, interval2| $op(interval1, interval2, $sign, $commute),
)?) as ArrayRef;
ret
}};
}
macro_rules! ts_interval_op {
($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{
let prim_array_lhs = $caster1(&$lhs)?;
let prim_array_rhs = $caster2(&$rhs)?;
let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary(
prim_array_lhs,
prim_array_rhs,
|ts, interval| Ok($op(ts, interval as i128, $sign)?),
)?;
Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef
}};
}
pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result<ArrayRef> {
match (array_lhs.data_type(), array_rhs.data_type()) {
(
DataType::Timestamp(TimeUnit::Second, opt_tz_lhs),
DataType::Timestamp(TimeUnit::Second, opt_tz_rhs),
) => Ok(ts_sub_op!(
array_lhs,
array_rhs,
opt_tz_lhs,
opt_tz_rhs,
1000i64,
as_timestamp_second_array,
seconds_sub,
NaiveDateTime::timestamp,
MILLISECOND_MODE,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs),
DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs),
) => Ok(ts_sub_op!(
array_lhs,
array_rhs,
opt_tz_lhs,
opt_tz_rhs,
1i64,
as_timestamp_millisecond_array,
milliseconds_sub,
NaiveDateTime::timestamp_millis,
MILLISECOND_MODE,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs),
DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs),
) => Ok(ts_sub_op!(
array_lhs,
array_rhs,
opt_tz_lhs,
opt_tz_rhs,
1000i64,
as_timestamp_microsecond_array,
microseconds_sub,
NaiveDateTime::timestamp_micros,
NANOSECOND_MODE,
IntervalMonthDayNanoType
)),
(
DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs),
DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs),
) => Ok(ts_sub_op!(
array_lhs,
array_rhs,
opt_tz_lhs,
opt_tz_rhs,
1i64,
as_timestamp_nanosecond_array,
nanoseconds_sub,
NaiveDateTime::timestamp_nanos,
NANOSECOND_MODE,
IntervalMonthDayNanoType
)),
(_, _) => Err(DataFusionError::Execution(format!(
"Invalid array types for Timestamp subtraction: {} - {}",
array_lhs.data_type(),
array_rhs.data_type()
))),
}
}
pub fn interval_array_op(
array_lhs: &ArrayRef,
array_rhs: &ArrayRef,
sign: i32,
) -> Result<ArrayRef> {
match (array_lhs.data_type(), array_rhs.data_type()) {
(
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(interval_op!(
array_lhs,
array_rhs,
as_interval_ym_array,
op_ym,
sign,
IntervalYearMonthType
)),
(
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(interval_cross_op!(
array_lhs,
array_rhs,
as_interval_ym_array,
as_interval_dt_array,
op_ym_dt,
sign,
false,
IntervalYearMonthType,
IntervalDayTimeType
)),
(
DataType::Interval(IntervalUnit::YearMonth),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(interval_cross_op!(
array_lhs,
array_rhs,
as_interval_ym_array,
as_interval_mdn_array,
op_ym_mdn,
sign,
false,
IntervalYearMonthType,
IntervalMonthDayNanoType
)),
(
DataType::Interval(IntervalUnit::DayTime),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(interval_cross_op!(
array_rhs,
array_lhs,
as_interval_ym_array,
as_interval_dt_array,
op_ym_dt,
sign,
true,
IntervalYearMonthType,
IntervalDayTimeType
)),
(
DataType::Interval(IntervalUnit::DayTime),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(interval_op!(
array_lhs,
array_rhs,
as_interval_dt_array,
op_dt,
sign,
IntervalDayTimeType
)),
(
DataType::Interval(IntervalUnit::DayTime),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(interval_cross_op!(
array_lhs,
array_rhs,
as_interval_dt_array,
as_interval_mdn_array,
op_dt_mdn,
sign,
false,
IntervalDayTimeType,
IntervalMonthDayNanoType
)),
(
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(interval_cross_op!(
array_rhs,
array_lhs,
as_interval_ym_array,
as_interval_mdn_array,
op_ym_mdn,
sign,
true,
IntervalYearMonthType,
IntervalMonthDayNanoType
)),
(
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(interval_cross_op!(
array_rhs,
array_lhs,
as_interval_dt_array,
as_interval_mdn_array,
op_dt_mdn,
sign,
true,
IntervalDayTimeType,
IntervalMonthDayNanoType
)),
(
DataType::Interval(IntervalUnit::MonthDayNano),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(interval_op!(
array_lhs,
array_rhs,
as_interval_mdn_array,
op_mdn,
sign,
IntervalMonthDayNanoType
)),
(_, _) => Err(DataFusionError::Execution(format!(
"Invalid array types for Interval operation: {} {} {}",
array_lhs.data_type(),
sign,
array_rhs.data_type()
))),
}
}
pub fn ts_interval_array_op(
array_lhs: &ArrayRef,
sign: i32,
array_rhs: &ArrayRef,
) -> Result<ArrayRef> {
match (array_lhs.data_type(), array_rhs.data_type()) {
(
DataType::Timestamp(TimeUnit::Second, tz),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_second_array,
as_interval_ym_array,
seconds_add_array::<YM_MODE>,
sign,
TimestampSecondType,
IntervalYearMonthType
)),
(
DataType::Timestamp(TimeUnit::Second, tz),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_second_array,
as_interval_dt_array,
seconds_add_array::<DT_MODE>,
sign,
TimestampSecondType,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Second, tz),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_second_array,
as_interval_mdn_array,
seconds_add_array::<MDN_MODE>,
sign,
TimestampSecondType,
IntervalMonthDayNanoType
)),
(
DataType::Timestamp(TimeUnit::Millisecond, tz),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_millisecond_array,
as_interval_ym_array,
milliseconds_add_array::<YM_MODE>,
sign,
TimestampMillisecondType,
IntervalYearMonthType
)),
(
DataType::Timestamp(TimeUnit::Millisecond, tz),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_millisecond_array,
as_interval_dt_array,
milliseconds_add_array::<DT_MODE>,
sign,
TimestampMillisecondType,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Millisecond, tz),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_millisecond_array,
as_interval_mdn_array,
milliseconds_add_array::<MDN_MODE>,
sign,
TimestampMillisecondType,
IntervalMonthDayNanoType
)),
(
DataType::Timestamp(TimeUnit::Microsecond, tz),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_microsecond_array,
as_interval_ym_array,
microseconds_add_array::<YM_MODE>,
sign,
TimestampMicrosecondType,
IntervalYearMonthType
)),
(
DataType::Timestamp(TimeUnit::Microsecond, tz),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_microsecond_array,
as_interval_dt_array,
microseconds_add_array::<DT_MODE>,
sign,
TimestampMicrosecondType,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Microsecond, tz),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_microsecond_array,
as_interval_mdn_array,
microseconds_add_array::<MDN_MODE>,
sign,
TimestampMicrosecondType,
IntervalMonthDayNanoType
)),
(
DataType::Timestamp(TimeUnit::Nanosecond, tz),
DataType::Interval(IntervalUnit::YearMonth),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_nanosecond_array,
as_interval_ym_array,
nanoseconds_add_array::<YM_MODE>,
sign,
TimestampNanosecondType,
IntervalYearMonthType
)),
(
DataType::Timestamp(TimeUnit::Nanosecond, tz),
DataType::Interval(IntervalUnit::DayTime),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_nanosecond_array,
as_interval_dt_array,
nanoseconds_add_array::<DT_MODE>,
sign,
TimestampNanosecondType,
IntervalDayTimeType
)),
(
DataType::Timestamp(TimeUnit::Nanosecond, tz),
DataType::Interval(IntervalUnit::MonthDayNano),
) => Ok(ts_interval_op!(
array_lhs,
array_rhs,
tz,
as_timestamp_nanosecond_array,
as_interval_mdn_array,
nanoseconds_add_array::<MDN_MODE>,
sign,
TimestampNanosecondType,
IntervalMonthDayNanoType
)),
(_, _) => Err(DataFusionError::Execution(format!(
"Invalid array types for Timestamp Interval operation: {} {} {}",
array_lhs.data_type(),
sign,
array_rhs.data_type()
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{col, lit};
use crate::expressions::{try_cast, Literal};
use arrow::datatypes::{
ArrowNumericType, Decimal128Type, Field, Int32Type, SchemaRef,
};
use datafusion_common::{ColumnStatistics, Result, Statistics};
use datafusion_expr::type_coercion::binary::{coerce_types, math_decimal_coercion};
fn binary_simple(
l: Arc<dyn PhysicalExpr>,
op: Operator,
r: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Arc<dyn PhysicalExpr> {
binary(l, op, r, input_schema).unwrap()
}
#[test]
fn binary_comparison() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let lt = binary_simple(
col("a", &schema)?,
Operator::Lt,
col("b", &schema)?,
&schema,
);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
let result = lt.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.len(), 5);
let expected = vec![false, false, true, true, true];
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
for (i, &expected_item) in expected.iter().enumerate().take(5) {
assert_eq!(result.value(i), expected_item);
}
Ok(())
}
#[test]
fn binary_nested() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
let expr = binary_simple(
binary_simple(
col("a", &schema)?,
Operator::Lt,
col("b", &schema)?,
&schema,
),
Operator::Or,
binary_simple(
col("a", &schema)?,
Operator::Eq,
col("b", &schema)?,
&schema,
),
&schema,
);
let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.len(), 5);
let expected = vec![true, true, false, true, false];
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
for (i, &expected_item) in expected.iter().enumerate().take(5) {
assert_eq!(result.value(i), expected_item);
}
Ok(())
}
macro_rules! test_coercion {
($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr,) => {{
let schema = Schema::new(vec![
Field::new("a", $A_TYPE, false),
Field::new("b", $B_TYPE, false),
]);
let a = $A_ARRAY::from($A_VEC);
let b = $B_ARRAY::from($B_VEC);
let common_type = coerce_types(&$A_TYPE, &$OP, &$B_TYPE)?;
let left = try_cast(col("a", &schema)?, &schema, common_type.clone())?;
let right = try_cast(col("b", &schema)?, &schema, common_type)?;
let expression = binary(left, $OP, right, &schema)?;
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(a), Arc::new(b)],
)?;
assert_eq!(expression.data_type(&schema)?, $C_TYPE);
let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(*result.data_type(), $C_TYPE);
let result = result
.as_any()
.downcast_ref::<$C_ARRAY>()
.expect("failed to downcast");
for (i, x) in $VEC.iter().enumerate() {
let v = result.value(i);
assert_eq!(
v,
*x,
"Unexpected output at position {i}:\n\nActual:\n{v}\n\nExpected:\n{x}"
);
}
}};
}
#[test]
fn test_type_coersion() -> Result<()> {
test_coercion!(
Int32Array,
DataType::Int32,
vec![1i32, 2i32],
UInt32Array,
DataType::UInt32,
vec![1u32, 2u32],
Operator::Plus,
Int32Array,
DataType::Int32,
vec![2i32, 4i32],
);
test_coercion!(
Int32Array,
DataType::Int32,
vec![1i32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Plus,
Int32Array,
DataType::Int32,
vec![2i32],
);
test_coercion!(
Float32Array,
DataType::Float32,
vec![1f32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Plus,
Float32Array,
DataType::Float32,
vec![2f32],
);
test_coercion!(
Float32Array,
DataType::Float32,
vec![2f32],
UInt16Array,
DataType::UInt16,
vec![1u16],
Operator::Multiply,
Float32Array,
DataType::Float32,
vec![2f32],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13", "1995-01-26"],
Date32Array,
DataType::Date32,
vec![9112, 9156],
Operator::Eq,
BooleanArray,
DataType::Boolean,
vec![true, true],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13", "1995-01-26"],
Date32Array,
DataType::Date32,
vec![9113, 9154],
Operator::Lt,
BooleanArray,
DataType::Boolean,
vec![true, false],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
Date64Array,
DataType::Date64,
vec![787322096000, 791083425000],
Operator::Eq,
BooleanArray,
DataType::Boolean,
vec![true, true],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
Date64Array,
DataType::Date64,
vec![787322096001, 791083424999],
Operator::Lt,
BooleanArray,
DataType::Boolean,
vec![true, false],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["abc"; 5],
StringArray,
DataType::Utf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexMatch,
BooleanArray,
DataType::Boolean,
vec![true, false, true, false, false],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["abc"; 5],
StringArray,
DataType::Utf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexIMatch,
BooleanArray,
DataType::Boolean,
vec![true, true, true, true, false],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["abc"; 5],
StringArray,
DataType::Utf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexNotMatch,
BooleanArray,
DataType::Boolean,
vec![false, true, false, true, true],
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["abc"; 5],
StringArray,
DataType::Utf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexNotIMatch,
BooleanArray,
DataType::Boolean,
vec![false, false, false, false, true],
);
test_coercion!(
LargeStringArray,
DataType::LargeUtf8,
vec!["abc"; 5],
LargeStringArray,
DataType::LargeUtf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexMatch,
BooleanArray,
DataType::Boolean,
vec![true, false, true, false, false],
);
test_coercion!(
LargeStringArray,
DataType::LargeUtf8,
vec!["abc"; 5],
LargeStringArray,
DataType::LargeUtf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexIMatch,
BooleanArray,
DataType::Boolean,
vec![true, true, true, true, false],
);
test_coercion!(
LargeStringArray,
DataType::LargeUtf8,
vec!["abc"; 5],
LargeStringArray,
DataType::LargeUtf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexNotMatch,
BooleanArray,
DataType::Boolean,
vec![false, true, false, true, true],
);
test_coercion!(
LargeStringArray,
DataType::LargeUtf8,
vec!["abc"; 5],
LargeStringArray,
DataType::LargeUtf8,
vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
Operator::RegexNotIMatch,
BooleanArray,
DataType::Boolean,
vec![false, false, false, false, true],
);
test_coercion!(
Int16Array,
DataType::Int16,
vec![1i16, 2i16, 3i16],
Int64Array,
DataType::Int64,
vec![10i64, 4i64, 5i64],
Operator::BitwiseAnd,
Int64Array,
DataType::Int64,
vec![0i64, 0i64, 1i64],
);
test_coercion!(
UInt16Array,
DataType::UInt16,
vec![1u16, 2u16, 3u16],
UInt64Array,
DataType::UInt64,
vec![10u64, 4u64, 5u64],
Operator::BitwiseAnd,
UInt64Array,
DataType::UInt64,
vec![0u64, 0u64, 1u64],
);
test_coercion!(
UInt16Array,
DataType::UInt16,
vec![1u16, 2u16, 3u16],
UInt64Array,
DataType::UInt64,
vec![10u64, 4u64, 5u64],
Operator::BitwiseOr,
UInt64Array,
DataType::UInt64,
vec![11u64, 6u64, 7u64],
);
test_coercion!(
Int16Array,
DataType::Int16,
vec![3i16, 2i16, 3i16],
Int64Array,
DataType::Int64,
vec![10i64, 6i64, 5i64],
Operator::BitwiseXor,
Int64Array,
DataType::Int64,
vec![9i64, 4i64, 6i64],
);
test_coercion!(
UInt16Array,
DataType::UInt16,
vec![3u16, 2u16, 3u16],
UInt64Array,
DataType::UInt64,
vec![10u64, 6u64, 5u64],
Operator::BitwiseXor,
UInt64Array,
DataType::UInt64,
vec![9u64, 4u64, 6u64],
);
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn test_dictionary_type_to_array_coersion() -> Result<()> {
let dict_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let string_type = DataType::Utf8;
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
dict_builder.append("one")?;
dict_builder.append_null();
dict_builder.append("three")?;
dict_builder.append("four")?;
let dict_array = Arc::new(dict_builder.finish()) as ArrayRef;
let str_array = Arc::new(StringArray::from(vec![
Some("not one"),
Some("two"),
None,
Some("four"),
])) as ArrayRef;
let schema = Arc::new(Schema::new(vec![
Field::new("a", dict_type.clone(), true),
Field::new("b", string_type.clone(), true),
]));
let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
apply_logic_op(&schema, &dict_array, &str_array, Operator::Eq, result)?;
let schema = Arc::new(Schema::new(vec![
Field::new("a", string_type, true),
Field::new("b", dict_type, true),
]));
let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
apply_logic_op(&schema, &str_array, &dict_array, Operator::Eq, result)?;
Ok(())
}
#[test]
fn plus_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
Int32Array::from(vec![2, 4, 7, 12, 21]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn plus_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
let a = DictionaryArray::try_new(keys, Arc::new(a))?;
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
let b = DictionaryArray::try_new(keys, Arc::new(b))?;
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
Int32Array::from(vec![Some(2), None, Some(4), Some(8), None]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn plus_op_dict_decimal() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
Field::new(
"b",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value),
Some(value + 2),
Some(value - 1),
Some(value + 1),
],
10,
0,
));
let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value + 3),
Some(value),
Some(value + 2),
],
10,
0,
));
let b = DictionaryArray::try_new(keys, decimal_array)?;
apply_arithmetic(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Plus,
create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
)?;
Ok(())
}
#[test]
fn plus_op_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Plus,
ScalarValue::Int32(Some(1)),
Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
)?;
Ok(())
}
#[test]
fn plus_op_dict_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
)]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
let a = dict_builder.finish();
let expected: PrimitiveArray<Int32Type> =
PrimitiveArray::from(vec![Some(2), None, Some(3), Some(6)]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Plus,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Int32(Some(1))),
),
Arc::new(expected),
)?;
Ok(())
}
#[test]
fn plus_op_dict_scalar_decimal() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
)]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
));
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value),
None,
Some(value + 2),
Some(value + 1),
],
11,
0,
));
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Plus,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
),
decimal_array,
)?;
Ok(())
}
#[test]
fn minus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
apply_arithmetic::<Int32Type>(
schema.clone(),
vec![a.clone(), b.clone()],
Operator::Minus,
Int32Array::from(vec![0, 0, 1, 4, 11]),
)?;
apply_arithmetic::<Int32Type>(
schema,
vec![b, a],
Operator::Minus,
Int32Array::from(vec![0, 0, -1, -4, -11]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn minus_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
let a = DictionaryArray::try_new(keys, Arc::new(a))?;
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
let b = DictionaryArray::try_new(keys, Arc::new(b))?;
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Minus,
Int32Array::from(vec![Some(0), None, Some(0), Some(0), None]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn minus_op_dict_decimal() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
Field::new(
"b",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value),
Some(value + 2),
Some(value - 1),
Some(value + 1),
],
10,
0,
));
let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value + 3),
Some(value),
Some(value + 2),
],
10,
0,
));
let b = DictionaryArray::try_new(keys, decimal_array)?;
apply_arithmetic(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Minus,
create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
)?;
Ok(())
}
#[test]
fn minus_op_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Minus,
ScalarValue::Int32(Some(1)),
Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])),
)?;
Ok(())
}
#[test]
fn minus_op_dict_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
)]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
let a = dict_builder.finish();
let expected: PrimitiveArray<Int32Type> =
PrimitiveArray::from(vec![Some(0), None, Some(1), Some(4)]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Minus,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Int32(Some(1))),
),
Arc::new(expected),
)?;
Ok(())
}
#[test]
fn minus_op_dict_scalar_decimal() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
)]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
));
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value - 1),
Some(value - 2),
None,
Some(value),
Some(value - 1),
],
11,
0,
));
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Minus,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
),
decimal_array,
)?;
Ok(())
}
#[test]
fn multiply_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Multiply,
Int32Array::from(vec![8, 32, 128, 512, 2048]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn multiply_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
let a = DictionaryArray::try_new(keys, Arc::new(a))?;
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
let b = DictionaryArray::try_new(keys, Arc::new(b))?;
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Multiply,
Int32Array::from(vec![Some(1), None, Some(4), Some(16), None]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn multiply_op_dict_decimal() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
Field::new(
"b",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value),
Some(value + 2),
Some(value - 1),
Some(value + 1),
],
10,
0,
)) as ArrayRef;
let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value + 3),
Some(value),
Some(value + 2),
],
10,
0,
));
let b = DictionaryArray::try_new(keys, decimal_array)?;
apply_arithmetic(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Multiply,
create_decimal_array(
&[Some(15252), None, None, Some(15252), Some(15129)],
21,
0,
),
)?;
Ok(())
}
#[test]
fn multiply_op_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Multiply,
ScalarValue::Int32(Some(2)),
Arc::new(Int32Array::from(vec![2, 4, 6, 8, 10])),
)?;
Ok(())
}
#[test]
fn multiply_op_dict_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
)]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
let a = dict_builder.finish();
let expected: PrimitiveArray<Int32Type> =
PrimitiveArray::from(vec![Some(2), None, Some(4), Some(10)]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Multiply,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Int32(Some(2))),
),
Arc::new(expected),
)?;
Ok(())
}
#[test]
fn multiply_op_dict_scalar_decimal() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
)]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
));
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let decimal_array = Arc::new(create_decimal_array(
&[Some(246), Some(244), None, Some(248), Some(246)],
21,
0,
));
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Multiply,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
),
decimal_array,
)?;
Ok(())
}
#[test]
fn divide_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Divide,
Int32Array::from(vec![4, 8, 16, 32, 64]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn divide_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
dict_builder.append(0)?;
let a = dict_builder.finish();
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
let b = DictionaryArray::try_new(keys, Arc::new(b))?;
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Divide,
Int32Array::from(vec![Some(1), None, Some(1), Some(1), Some(0)]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn divide_op_dict_decimal() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
Field::new(
"b",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value),
Some(value + 2),
Some(value - 1),
Some(value + 1),
],
10,
0,
));
let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value + 3),
Some(value),
Some(value + 2),
],
10,
0,
));
let b = DictionaryArray::try_new(keys, decimal_array)?;
apply_arithmetic(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Divide,
create_decimal_array(
&[
Some(99193548387), None,
None,
Some(100813008130), Some(100000000000), ],
21,
11,
),
)?;
Ok(())
}
#[test]
fn divide_op_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Divide,
ScalarValue::Int32(Some(2)),
Arc::new(Int32Array::from(vec![0, 1, 1, 2, 2])),
)?;
Ok(())
}
#[test]
fn divide_op_dict_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
)]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
let a = dict_builder.finish();
let expected: PrimitiveArray<Int32Type> =
PrimitiveArray::from(vec![Some(0), None, Some(1), Some(2)]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Divide,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Int32(Some(2))),
),
Arc::new(expected),
)?;
Ok(())
}
#[test]
fn divide_op_dict_scalar_decimal() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
)]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
));
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(6150000000000),
Some(6100000000000),
None,
Some(6200000000000),
Some(6150000000000),
],
21,
11,
));
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Divide,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
),
decimal_array,
)?;
Ok(())
}
#[test]
fn modulus_op() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Modulo,
Int32Array::from(vec![0, 0, 2, 8, 0]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn modulus_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
),
]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
dict_builder.append(0)?;
let a = dict_builder.finish();
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
let b = DictionaryArray::try_new(keys, Arc::new(b))?;
apply_arithmetic::<Int32Type>(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Modulo,
Int32Array::from(vec![Some(0), None, Some(0), Some(1), Some(0)]),
)?;
Ok(())
}
#[test]
#[cfg(feature = "dictionary_expressions")]
fn modulus_op_dict_decimal() -> Result<()> {
let schema = Schema::new(vec![
Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
Field::new(
"b",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
),
]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value),
Some(value + 2),
Some(value - 1),
Some(value + 1),
],
10,
0,
));
let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value + 1),
Some(value + 3),
Some(value),
Some(value + 2),
],
10,
0,
));
let b = DictionaryArray::try_new(keys, decimal_array)?;
apply_arithmetic(
Arc::new(schema),
vec![Arc::new(a), Arc::new(b)],
Operator::Modulo,
create_decimal_array(&[Some(123), None, None, Some(1), Some(0)], 10, 0),
)?;
Ok(())
}
#[test]
fn modulus_op_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Modulo,
ScalarValue::Int32(Some(2)),
Arc::new(Int32Array::from(vec![1, 0, 1, 0, 1])),
)?;
Ok(())
}
#[test]
fn modules_op_dict_scalar() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
true,
)]);
let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
dict_builder.append(1)?;
dict_builder.append_null();
dict_builder.append(2)?;
dict_builder.append(5)?;
let a = dict_builder.finish();
let expected: PrimitiveArray<Int32Type> =
PrimitiveArray::from(vec![Some(1), None, Some(0), Some(1)]);
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Modulo,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Int32(Some(2))),
),
Arc::new(expected),
)?;
Ok(())
}
#[test]
fn modulus_op_dict_scalar_decimal() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(10, 0)),
),
true,
)]);
let value = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
));
let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
let a = DictionaryArray::try_new(keys, decimal_array)?;
let decimal_array = Arc::new(create_decimal_array(
&[Some(1), Some(0), None, Some(0), Some(1)],
10,
0,
));
apply_arithmetic_scalar(
Arc::new(schema),
vec![Arc::new(a)],
Operator::Modulo,
ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
),
decimal_array,
)?;
Ok(())
}
fn apply_arithmetic<T: ArrowNumericType>(
schema: SchemaRef,
data: Vec<ArrayRef>,
op: Operator,
expected: PrimitiveArray<T>,
) -> Result<()> {
let arithmetic_op =
binary_simple(col("a", &schema)?, op, col("b", &schema)?, &schema);
let batch = RecordBatch::try_new(schema, data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), &expected);
Ok(())
}
fn apply_arithmetic_scalar(
schema: SchemaRef,
data: Vec<ArrayRef>,
op: Operator,
literal: ScalarValue,
expected: ArrayRef,
) -> Result<()> {
let lit = Arc::new(Literal::new(literal));
let arithmetic_op = binary_simple(col("a", &schema)?, op, lit, &schema);
let batch = RecordBatch::try_new(schema, data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(&result, &expected);
Ok(())
}
fn apply_logic_op(
schema: &SchemaRef,
left: &ArrayRef,
right: &ArrayRef,
op: Operator,
expected: BooleanArray,
) -> Result<()> {
let left_type = left.data_type();
let right_type = right.data_type();
let common_type = coerce_types(left_type, &op, right_type)?;
let left_expr = try_cast(col("a", schema)?, schema, common_type.clone())?;
let right_expr = try_cast(col("b", schema)?, schema, common_type)?;
let arithmetic_op = binary_simple(left_expr, op, right_expr, schema);
let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
let batch = RecordBatch::try_new(schema.clone(), data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), &expected);
Ok(())
}
fn apply_logic_op_scalar_arr(
schema: &SchemaRef,
scalar: &ScalarValue,
arr: &ArrayRef,
op: Operator,
expected: &BooleanArray,
) -> Result<()> {
let scalar = lit(scalar.clone());
let op_type = coerce_types(&scalar.data_type(schema)?, &op, arr.data_type())?;
let left_expr = if op_type.eq(&scalar.data_type(schema)?) {
scalar
} else {
try_cast(scalar, schema, op_type.clone())?
};
let right_expr = if op_type.eq(arr.data_type()) {
col("a", schema)?
} else {
try_cast(col("a", schema)?, schema, op_type)?
};
let arithmetic_op = binary_simple(left_expr, op, right_expr, schema);
let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), expected);
Ok(())
}
fn apply_logic_op_arr_scalar(
schema: &SchemaRef,
arr: &ArrayRef,
scalar: &ScalarValue,
op: Operator,
expected: &BooleanArray,
) -> Result<()> {
let scalar = lit(scalar.clone());
let op_type = coerce_types(arr.data_type(), &op, &scalar.data_type(schema)?)?;
let right_expr = if op_type.eq(&scalar.data_type(schema)?) {
scalar
} else {
try_cast(scalar, schema, op_type.clone())?
};
let left_expr = if op_type.eq(arr.data_type()) {
col("a", schema)?
} else {
try_cast(col("a", schema)?, schema, op_type)?
};
let arithmetic_op = binary_simple(left_expr, op, right_expr, schema);
let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), expected);
Ok(())
}
#[test]
fn and_with_nulls_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, true),
Field::new("b", DataType::Boolean, true),
]);
let a = Arc::new(BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
])) as ArrayRef;
let b = Arc::new(BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
])) as ArrayRef;
let expected = BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(false),
Some(false),
Some(false),
None,
Some(false),
None,
]);
apply_logic_op(&Arc::new(schema), &a, &b, Operator::And, expected)?;
Ok(())
}
#[test]
fn or_with_nulls_op() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, true),
Field::new("b", DataType::Boolean, true),
]);
let a = Arc::new(BooleanArray::from(vec![
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
Some(true),
Some(false),
None,
])) as ArrayRef;
let b = Arc::new(BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(false),
Some(false),
Some(false),
None,
None,
None,
])) as ArrayRef;
let expected = BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
Some(true),
Some(false),
None,
Some(true),
None,
None,
]);
apply_logic_op(&Arc::new(schema), &a, &b, Operator::Or, expected)?;
Ok(())
}
fn bool_test_arrays() -> (SchemaRef, ArrayRef, ArrayRef) {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, true),
Field::new("b", DataType::Boolean, true),
]);
let a: BooleanArray = [
Some(true),
Some(true),
Some(true),
None,
None,
None,
Some(false),
Some(false),
Some(false),
]
.iter()
.collect();
let b: BooleanArray = [
Some(true),
None,
Some(false),
Some(true),
None,
Some(false),
Some(true),
None,
Some(false),
]
.iter()
.collect();
(Arc::new(schema), Arc::new(a), Arc::new(b))
}
fn scalar_bool_test_array() -> (SchemaRef, ArrayRef) {
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
let a: BooleanArray = vec![Some(true), None, Some(false)].iter().collect();
(Arc::new(schema), Arc::new(a))
}
#[test]
fn eq_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = vec![
Some(true),
None,
Some(false),
None,
None,
None,
Some(false),
None,
Some(true),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::Eq, expected).unwrap();
}
#[test]
fn eq_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::Eq,
&expected,
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::Eq,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::Eq,
&expected,
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::Eq,
&expected,
)
.unwrap();
}
#[test]
fn neq_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(false),
None,
Some(true),
None,
None,
None,
Some(true),
None,
Some(false),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::NotEq, expected).unwrap();
}
#[test]
fn neq_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::NotEq,
&expected,
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::NotEq,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::NotEq,
&expected,
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::NotEq,
&expected,
)
.unwrap();
}
#[test]
fn lt_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(false),
None,
Some(false),
None,
None,
None,
Some(true),
None,
Some(false),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::Lt, expected).unwrap();
}
#[test]
fn lt_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(false), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::Lt,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::Lt,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::Lt,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(false)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::Lt,
&expected,
)
.unwrap();
}
#[test]
fn lt_eq_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(true),
None,
Some(false),
None,
None,
None,
Some(true),
None,
Some(true),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::LtEq, expected).unwrap();
}
#[test]
fn lt_eq_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::LtEq,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(true)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::LtEq,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::LtEq,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::LtEq,
&expected,
)
.unwrap();
}
#[test]
fn gt_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(false),
None,
Some(true),
None,
None,
None,
Some(false),
None,
Some(false),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::Gt, expected).unwrap();
}
#[test]
fn gt_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::Gt,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(false)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::Gt,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(false)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::Gt,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::Gt,
&expected,
)
.unwrap();
}
#[test]
fn gt_eq_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(true),
None,
Some(true),
None,
None,
None,
Some(false),
None,
Some(true),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::GtEq, expected).unwrap();
}
#[test]
fn gt_eq_op_bool_scalar() {
let (schema, a) = scalar_bool_test_array();
let expected = [Some(true), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(true),
&a,
Operator::GtEq,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(false)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(true),
Operator::GtEq,
&expected,
)
.unwrap();
let expected = [Some(false), None, Some(true)].iter().collect();
apply_logic_op_scalar_arr(
&schema,
&ScalarValue::from(false),
&a,
Operator::GtEq,
&expected,
)
.unwrap();
let expected = [Some(true), None, Some(true)].iter().collect();
apply_logic_op_arr_scalar(
&schema,
&a,
&ScalarValue::from(false),
Operator::GtEq,
&expected,
)
.unwrap();
}
#[test]
fn is_distinct_from_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(false),
Some(true),
Some(true),
Some(true),
Some(false),
Some(true),
Some(true),
Some(true),
Some(false),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::IsDistinctFrom, expected).unwrap();
}
#[test]
fn is_not_distinct_from_op_bool() {
let (schema, a, b) = bool_test_arrays();
let expected = [
Some(true),
Some(false),
Some(false),
Some(false),
Some(true),
Some(false),
Some(false),
Some(false),
Some(true),
]
.iter()
.collect();
apply_logic_op(&schema, &a, &b, Operator::IsNotDistinctFrom, expected).unwrap();
}
#[test]
fn relatively_deeply_nested() {
let input: Vec<_> = vec![1, 2, 3, 4, 5].into_iter().map(Some).collect();
let a: Int32Array = input.iter().collect();
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(a) as _)]).unwrap();
let schema = batch.schema();
let tree_depth: i32 = 100;
let expr = (0..tree_depth)
.map(|_| col("a", schema.as_ref()).unwrap())
.reduce(|l, r| binary_simple(l, Operator::Plus, r, &schema))
.unwrap();
let result = expr
.evaluate(&batch)
.expect("evaluation")
.into_array(batch.num_rows());
let expected: Int32Array = input
.into_iter()
.map(|i| i.map(|i| i * tree_depth))
.collect();
assert_eq!(result.as_ref(), &expected);
}
fn create_decimal_array(
array: &[Option<i128>],
precision: u8,
scale: i8,
) -> Decimal128Array {
let mut decimal_builder = Decimal128Builder::with_capacity(array.len());
for value in array.iter().copied() {
decimal_builder.append_option(value)
}
decimal_builder
.finish()
.with_precision_and_scale(precision, scale)
.unwrap()
}
#[test]
fn comparison_dict_decimal_scalar_expr_test() -> Result<()> {
let value_i128 = 123;
let decimal_scalar = ScalarValue::Dictionary(
Box::new(DataType::Int8),
Box::new(ScalarValue::Decimal128(Some(value_i128), 25, 3)),
);
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Decimal128(25, 3)),
),
true,
)]));
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value_i128),
None,
Some(value_i128 - 1),
Some(value_i128 + 1),
],
25,
3,
));
let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
let dictionary =
Arc::new(DictionaryArray::try_new(keys, decimal_array)?) as ArrayRef;
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::Eq,
&BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::NotEq,
&BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::Lt,
&BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::LtEq,
&BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::Gt,
&BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&dictionary,
&decimal_scalar,
Operator::GtEq,
&BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
Ok(())
}
#[test]
fn comparison_decimal_expr_test() -> Result<()> {
let value_i128 = 123;
let decimal_scalar = ScalarValue::Decimal128(Some(value_i128), 25, 3);
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Decimal128(25, 3),
true,
)]));
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value_i128),
None,
Some(value_i128 - 1),
Some(value_i128 + 1),
],
25,
3,
)) as ArrayRef;
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::Eq,
&BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::NotEq,
&BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::Lt,
&BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::LtEq,
&BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::Gt,
&BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&decimal_array,
&decimal_scalar,
Operator::GtEq,
&BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
apply_logic_op_scalar_arr(
&schema,
&decimal_scalar,
&(Arc::new(Int64Array::from(vec![Some(124), None])) as ArrayRef),
Operator::Eq,
&BooleanArray::from(vec![Some(false), None]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Int64Array::from(vec![Some(123), None, Some(1)])) as ArrayRef),
&decimal_scalar,
Operator::NotEq,
&BooleanArray::from(vec![Some(true), None, Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
&decimal_scalar,
Operator::Lt,
&BooleanArray::from(vec![Some(true), None, Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
&decimal_scalar,
Operator::Gt,
&BooleanArray::from(vec![Some(false), None, Some(true)]),
)
.unwrap();
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Float64Array::from(vec![Some(123.456), None, Some(123.457)]))
as ArrayRef),
&decimal_scalar,
Operator::Eq,
&BooleanArray::from(vec![Some(true), None, Some(false)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Float64Array::from(vec![
Some(123.456),
None,
Some(123.457),
Some(123.45),
])) as ArrayRef),
&decimal_scalar,
Operator::LtEq,
&BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op_arr_scalar(
&schema,
&(Arc::new(Float64Array::from(vec![
Some(123.456),
None,
Some(123.457),
Some(123.45),
])) as ArrayRef),
&decimal_scalar,
Operator::GtEq,
&BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
)
.unwrap();
let value: i128 = 123;
let decimal_array = Arc::new(create_decimal_array(
&[Some(value), None, Some(value - 1), Some(value + 1)],
10,
0,
)) as ArrayRef;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Decimal128(10, 0), true),
Field::new("b", DataType::Decimal128(10, 0), true),
]));
let right_decimal_array = Arc::new(create_decimal_array(
&[
Some(value - 1),
Some(value),
Some(value + 1),
Some(value + 1),
],
10,
0,
)) as ArrayRef;
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::Eq,
BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::NotEq,
BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::Lt,
BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::LtEq,
BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
)
.unwrap();
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::Gt,
BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&decimal_array,
&right_decimal_array,
Operator::GtEq,
BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
let value: i64 = 123;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Decimal128(10, 0), true),
]));
let int64_array = Arc::new(Int64Array::from(vec![
Some(value),
Some(value - 1),
Some(value),
Some(value + 1),
])) as ArrayRef;
apply_logic_op(
&schema,
&int64_array,
&decimal_array,
Operator::Eq,
BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op(
&schema,
&int64_array,
&decimal_array,
Operator::NotEq,
BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let value: i128 = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value), None,
Some(value - 1), Some(value + 1), ],
10,
2,
)) as ArrayRef;
let float64_array = Arc::new(Float64Array::from(vec![
Some(1.23),
Some(1.22),
Some(1.23),
Some(1.24),
])) as ArrayRef;
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Lt,
BooleanArray::from(vec![Some(false), None, Some(false), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::LtEq,
BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
)
.unwrap();
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Gt,
BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::GtEq,
BooleanArray::from(vec![Some(true), None, Some(true), Some(true)]),
)
.unwrap();
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::IsDistinctFrom,
BooleanArray::from(vec![Some(false), Some(true), Some(true), Some(false)]),
)
.unwrap();
apply_logic_op(
&schema,
&float64_array,
&decimal_array,
Operator::IsNotDistinctFrom,
BooleanArray::from(vec![Some(true), Some(false), Some(false), Some(true)]),
)
.unwrap();
Ok(())
}
fn apply_decimal_arithmetic_op(
schema: &SchemaRef,
left: &ArrayRef,
right: &ArrayRef,
op: Operator,
expected: ArrayRef,
) -> Result<()> {
let (lhs_op_type, rhs_op_type) =
math_decimal_coercion(left.data_type(), right.data_type());
let (left_expr, lhs_type) = if let Some(lhs_op_type) = lhs_op_type {
(
try_cast(col("a", schema)?, schema, lhs_op_type.clone())?,
lhs_op_type,
)
} else {
(col("a", schema)?, left.data_type().clone())
};
let (right_expr, rhs_type) = if let Some(rhs_op_type) = rhs_op_type {
(
try_cast(col("b", schema)?, schema, rhs_op_type.clone())?,
rhs_op_type,
)
} else {
(col("b", schema)?, right.data_type().clone())
};
let coerced_schema = Schema::new(vec![
Field::new(
schema.field(0).name(),
lhs_type,
schema.field(0).is_nullable(),
),
Field::new(
schema.field(1).name(),
rhs_type,
schema.field(1).is_nullable(),
),
]);
let arithmetic_op = binary_simple(left_expr, op, right_expr, &coerced_schema);
let data: Vec<ArrayRef> = vec![left.clone(), right.clone()];
let batch = RecordBatch::try_new(schema.clone(), data)?;
let result = arithmetic_op.evaluate(&batch)?.into_array(batch.num_rows());
assert_eq!(result.as_ref(), expected.as_ref());
Ok(())
}
#[test]
fn arithmetic_decimal_expr_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let value: i128 = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value), None,
Some(value - 1), Some(value + 1), ],
10,
2,
)) as ArrayRef;
let int32_array = Arc::new(Int32Array::from(vec![
Some(123),
Some(122),
Some(123),
Some(124),
])) as ArrayRef;
let expect = Arc::new(create_decimal_array(
&[Some(12423), None, Some(12422), Some(12524)],
13,
2,
)) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
Operator::Plus,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Decimal128(10, 2), true),
Field::new("b", DataType::Int32, true),
]));
let expect = Arc::new(create_decimal_array(
&[Some(-12177), None, Some(-12178), Some(-12276)],
13,
2,
)) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&decimal_array,
&int32_array,
Operator::Minus,
expect,
)
.unwrap();
let expect = Arc::new(create_decimal_array(
&[Some(15129), None, Some(15006), Some(15376)],
21,
2,
)) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&decimal_array,
&int32_array,
Operator::Multiply,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let expect = Arc::new(create_decimal_array(
&[
Some(10000000000000),
None,
Some(10081967213114),
Some(10000000000000),
],
23,
11,
)) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
Operator::Divide,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let expect = Arc::new(create_decimal_array(
&[Some(000), None, Some(100), Some(000)],
10,
2,
)) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&int32_array,
&decimal_array,
Operator::Modulo,
expect,
)
.unwrap();
Ok(())
}
#[test]
fn arithmetic_decimal_float_expr_test() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let value: i128 = 123;
let decimal_array = Arc::new(create_decimal_array(
&[
Some(value), None,
Some(value - 1), Some(value + 1), ],
10,
2,
)) as ArrayRef;
let float64_array = Arc::new(Float64Array::from(vec![
Some(123.0),
Some(122.0),
Some(123.0),
Some(124.0),
])) as ArrayRef;
let expect = Arc::new(Float64Array::from(vec![
Some(124.23),
None,
Some(124.22),
Some(125.24),
])) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Plus,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let expect = Arc::new(Float64Array::from(vec![
Some(121.77),
None,
Some(121.78),
Some(122.76),
])) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Minus,
expect,
)
.unwrap();
let expect = Arc::new(Float64Array::from(vec![
Some(151.29),
None,
Some(150.06),
Some(153.76),
])) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Multiply,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let expect = Arc::new(Float64Array::from(vec![
Some(100.0),
None,
Some(100.81967213114754),
Some(100.0),
])) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Divide,
expect,
)
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, true),
Field::new("b", DataType::Decimal128(10, 2), true),
]));
let expect = Arc::new(Float64Array::from(vec![
Some(1.7763568394002505e-15),
None,
Some(1.0000000000000027),
Some(8.881784197001252e-16),
])) as ArrayRef;
apply_decimal_arithmetic_op(
&schema,
&float64_array,
&decimal_array,
Operator::Modulo,
expect,
)
.unwrap();
Ok(())
}
#[test]
fn arithmetic_divide_zero() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]));
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048, 100]));
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32, 0]));
apply_arithmetic::<Int32Type>(
schema,
vec![a, b],
Operator::Divide,
Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64), None]),
)?;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Decimal128(25, 3), true),
Field::new("b", DataType::Decimal128(25, 3), true),
]));
let left_decimal_array =
Arc::new(create_decimal_array(&[Some(1234567), Some(1234567)], 25, 3));
let right_decimal_array =
Arc::new(create_decimal_array(&[Some(10), Some(0)], 25, 3));
apply_arithmetic::<Decimal128Type>(
schema,
vec![left_decimal_array, right_decimal_array],
Operator::Divide,
create_decimal_array(
&[Some(12345670000000000000000000000000000), None],
38,
29,
),
)?;
Ok(())
}
#[test]
fn bitwise_array_test() -> Result<()> {
let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
let right =
Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
let mut result = bitwise_and_dyn(left.clone(), right.clone())?;
let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_or_dyn(left.clone(), right.clone())?;
let expected = Int32Array::from(vec![Some(13), None, Some(15)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_xor_dyn(left.clone(), right.clone())?;
let expected = Int32Array::from(vec![Some(13), None, Some(12)]);
assert_eq!(result.as_ref(), &expected);
let left =
Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
let right =
Arc::new(UInt32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
let mut result = bitwise_and_dyn(left.clone(), right.clone())?;
let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_or_dyn(left.clone(), right.clone())?;
let expected = UInt32Array::from(vec![Some(13), None, Some(15)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_xor_dyn(left.clone(), right.clone())?;
let expected = UInt32Array::from(vec![Some(13), None, Some(12)]);
assert_eq!(result.as_ref(), &expected);
Ok(())
}
#[test]
fn bitwise_shift_array_test() -> Result<()> {
let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
let modules =
Arc::new(Int32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
let mut result = bitwise_shift_left_dyn(input.clone(), modules.clone())?;
let expected = Int32Array::from(vec![Some(8), None, Some(2560)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_shift_right_dyn(result.clone(), modules.clone())?;
assert_eq!(result.as_ref(), &input);
let input =
Arc::new(UInt32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
let modules =
Arc::new(UInt32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
let mut result = bitwise_shift_left_dyn(input.clone(), modules.clone())?;
let expected = UInt32Array::from(vec![Some(8), None, Some(2560)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_shift_right_dyn(result.clone(), modules.clone())?;
assert_eq!(result.as_ref(), &input);
Ok(())
}
#[test]
fn bitwise_shift_array_overflow_test() -> Result<()> {
let input = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
let modules = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef;
let result = bitwise_shift_left_dyn(input.clone(), modules.clone())?;
let expected = Int32Array::from(vec![Some(32)]);
assert_eq!(result.as_ref(), &expected);
let input = Arc::new(UInt32Array::from(vec![Some(2)])) as ArrayRef;
let modules = Arc::new(UInt32Array::from(vec![Some(100)])) as ArrayRef;
let result = bitwise_shift_left_dyn(input.clone(), modules.clone())?;
let expected = UInt32Array::from(vec![Some(32)]);
assert_eq!(result.as_ref(), &expected);
Ok(())
}
#[test]
fn bitwise_scalar_test() -> Result<()> {
let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
let right = ScalarValue::from(3i32);
let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
let expected = Int32Array::from(vec![Some(15), None, Some(11)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
let expected = Int32Array::from(vec![Some(15), None, Some(8)]);
assert_eq!(result.as_ref(), &expected);
let left =
Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
let right = ScalarValue::from(3u32);
let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
let expected = UInt32Array::from(vec![Some(15), None, Some(11)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
let expected = UInt32Array::from(vec![Some(15), None, Some(8)]);
assert_eq!(result.as_ref(), &expected);
Ok(())
}
#[test]
fn bitwise_shift_scalar_test() -> Result<()> {
let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
let module = ScalarValue::from(10i32);
let mut result =
bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
let expected = Int32Array::from(vec![Some(2048), None, Some(4096)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
assert_eq!(result.as_ref(), &input);
let input = Arc::new(UInt32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
let module = ScalarValue::from(10u32);
let mut result =
bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
let expected = UInt32Array::from(vec![Some(2048), None, Some(4096)]);
assert_eq!(result.as_ref(), &expected);
result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
assert_eq!(result.as_ref(), &input);
Ok(())
}
fn get_test_table_stats(
min_value: ScalarValue,
max_value: ScalarValue,
) -> (Schema, Statistics) {
assert_eq!(min_value.get_datatype(), max_value.get_datatype());
let schema = Schema::new(vec![Field::new("a", min_value.get_datatype(), false)]);
let columns = vec![ColumnStatistics {
min_value: Some(min_value),
max_value: Some(max_value),
null_count: None,
distinct_count: None,
}];
let statistics = Statistics {
column_statistics: Some(columns),
..Default::default()
};
(schema, statistics)
}
#[test]
fn test_analyze_expr_scalar_comparison() -> Result<()> {
let (schema, statistics) =
get_test_table_stats(ScalarValue::from(1i64), ScalarValue::from(100i64));
let cases = [
((Operator::Eq, 1), (1.0 / 100.0, 1, 1)),
((Operator::Eq, 5), (1.0 / 100.0, 5, 5)),
((Operator::Eq, 99), (1.0 / 100.0, 99, 99)),
((Operator::Eq, 100), (1.0 / 100.0, 100, 100)),
((Operator::Eq, 0), (0.0, 0, 0)),
((Operator::Eq, -101), (0.0, -101, -101)),
((Operator::Eq, 101), (0.0, 101, 101)),
((Operator::Lt, 5), (4.0 / 100.0, 1, 5)),
((Operator::Lt, 99), (98.0 / 100.0, 1, 99)),
((Operator::Lt, 101), (100.0 / 100.0, 1, 100)),
((Operator::Lt, 0), (0.0, 1, 100)),
((Operator::Lt, 1), (0.0, 1, 100)),
((Operator::Lt, -100), (0.0, 1, 100)),
((Operator::Lt, -200), (0.0, 1, 100)),
((Operator::Lt, 200), (1.0, 1, 100)),
((Operator::LtEq, -100), (0.0, 1, 100)),
((Operator::LtEq, 0), (0.0, 1, 100)),
((Operator::LtEq, 1), (1.0 / 100.0, 1, 1)),
((Operator::LtEq, 5), (5.0 / 100.0, 1, 5)),
((Operator::LtEq, 99), (99.0 / 100.0, 1, 99)),
((Operator::LtEq, 100), (100.0 / 100.0, 1, 100)),
((Operator::LtEq, 101), (1.0, 1, 100)),
((Operator::LtEq, 200), (1.0, 1, 100)),
((Operator::Gt, -100), (1.0, 1, 100)),
((Operator::Gt, 0), (1.0, 1, 100)),
((Operator::Gt, 1), (99.0 / 100.0, 1, 100)),
((Operator::Gt, 5), (95.0 / 100.0, 5, 100)),
((Operator::Gt, 99), (1.0 / 100.0, 99, 100)),
((Operator::Gt, 100), (0.0, 1, 100)),
((Operator::Gt, 101), (0.0, 1, 100)),
((Operator::Gt, 200), (0.0, 1, 100)),
((Operator::GtEq, -100), (1.0, 1, 100)),
((Operator::GtEq, 0), (1.0, 1, 100)),
((Operator::GtEq, 1), (1.0, 1, 100)),
((Operator::GtEq, 5), (96.0 / 100.0, 5, 100)),
((Operator::GtEq, 99), (2.0 / 100.0, 99, 100)),
((Operator::GtEq, 100), (1.0 / 100.0, 100, 100)),
((Operator::GtEq, 101), (0.0, 1, 100)),
((Operator::GtEq, 200), (0.0, 1, 100)),
];
for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases {
let context = AnalysisContext::from_statistics(&schema, &statistics);
let left = col("a", &schema).unwrap();
let right = ScalarValue::Int64(Some(rhs));
let analysis_ctx =
analyze_expr_scalar_comparison(context, &operator, &left, right);
let boundaries = analysis_ctx
.boundaries
.as_ref()
.expect("Analysis must complete for this test!");
assert_eq!(
boundaries
.selectivity
.expect("compare_left_boundaries must produce a selectivity value"),
exp_selectivity
);
if exp_selectivity == 1.0 {
assert_eq!(boundaries.reduce(), Some(ScalarValue::Boolean(Some(true))));
} else if exp_selectivity == 0.0 {
assert_eq!(boundaries.reduce(), Some(ScalarValue::Boolean(Some(false))));
} else {
assert_eq!(boundaries.min_value, ScalarValue::Boolean(Some(false)));
assert_eq!(boundaries.max_value, ScalarValue::Boolean(Some(true)));
}
let left_boundaries = left
.analyze(analysis_ctx)
.boundaries
.expect("this case should not return None");
assert_eq!(left_boundaries.min_value, ScalarValue::Int64(Some(exp_min)));
assert_eq!(left_boundaries.max_value, ScalarValue::Int64(Some(exp_max)));
}
Ok(())
}
#[test]
fn test_comparison_result_estimate_different_type() -> Result<()> {
let (schema, statistics) =
get_test_table_stats(ScalarValue::from(1.3), ScalarValue::from(50.7));
let distance = 50.0; let cases = [
((Operator::Eq, 1.1), (0.0, 1.1, 1.1)),
((Operator::Eq, 50.75), (0.0, 50.75, 50.75)),
((Operator::Lt, 1.3), (0.0, 1.3, 50.7)),
((Operator::LtEq, 1.29), (0.0, 1.3, 50.7)),
((Operator::Gt, 50.7), (0.0, 1.3, 50.7)),
((Operator::GtEq, 50.75), (0.0, 1.3, 50.7)),
((Operator::Lt, 50.75), (1.0, 1.3, 50.7)),
((Operator::LtEq, 50.75), (1.0, 1.3, 50.7)),
((Operator::Gt, 1.29), (1.0, 1.3, 50.7)),
((Operator::GtEq, 1.3), (1.0, 1.3, 50.7)),
((Operator::Eq, 27.8), (1.0 / distance, 27.8, 27.8)),
((Operator::Lt, 5.2), (4.0 / distance, 1.3, 5.2)), ((Operator::LtEq, 1.3), (1.0 / distance, 1.3, 1.3)),
((Operator::Gt, 45.5), (5.0 / distance, 45.5, 50.7)), ((Operator::GtEq, 50.7), (1.0 / distance, 50.7, 50.7)),
];
for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases {
let context = AnalysisContext::from_statistics(&schema, &statistics);
let left = col("a", &schema).unwrap();
let right = ScalarValue::from(rhs);
let analysis_ctx =
analyze_expr_scalar_comparison(context, &operator, &left, right);
let boundaries = analysis_ctx
.clone()
.boundaries
.expect("Analysis must complete for this test!");
assert_eq!(
boundaries
.selectivity
.expect("compare_left_boundaries must produce a selectivity value"),
exp_selectivity
);
if exp_selectivity == 1.0 {
assert_eq!(boundaries.reduce(), Some(ScalarValue::from(true)));
} else if exp_selectivity == 0.0 {
assert_eq!(boundaries.reduce(), Some(ScalarValue::from(false)));
} else {
assert_eq!(boundaries.min_value, ScalarValue::from(false));
assert_eq!(boundaries.max_value, ScalarValue::from(true));
}
let left_boundaries = left
.analyze(analysis_ctx)
.boundaries
.expect("this case should not return None");
assert_eq!(
left_boundaries.min_value,
ScalarValue::Float64(Some(exp_min))
);
assert_eq!(
left_boundaries.max_value,
ScalarValue::Float64(Some(exp_max))
);
}
Ok(())
}
#[test]
fn test_binary_expression_boundaries() -> Result<()> {
let (schema, statistics) =
get_test_table_stats(ScalarValue::from(1), ScalarValue::from(100));
let a = col("a", &schema).unwrap();
let gt = binary_simple(
a.clone(),
Operator::GtEq,
lit(ScalarValue::from(25)),
&schema,
);
let context = AnalysisContext::from_statistics(&schema, &statistics);
let predicate_boundaries = gt
.analyze(context)
.boundaries
.expect("boundaries should not be None");
assert_eq!(predicate_boundaries.selectivity, Some(0.76));
Ok(())
}
#[test]
fn test_binary_expression_boundaries_rhs() -> Result<()> {
let (schema, statistics) =
get_test_table_stats(ScalarValue::from(1), ScalarValue::from(100));
let a = col("a", &schema).unwrap();
let gt = binary_simple(
lit(ScalarValue::from(50)),
Operator::GtEq,
a.clone(),
&schema,
);
let context = AnalysisContext::from_statistics(&schema, &statistics);
let predicate_boundaries = gt
.analyze(context)
.boundaries
.expect("boundaries should not be None");
assert_eq!(predicate_boundaries.selectivity, Some(0.5));
Ok(())
}
#[test]
fn test_display_and_or_combo() {
let expr = BinaryExpr::new(
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(1)),
Operator::And,
lit(ScalarValue::from(2)),
)),
Operator::And,
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(3)),
Operator::And,
lit(ScalarValue::from(4)),
)),
);
assert_eq!(expr.to_string(), "1 AND 2 AND 3 AND 4");
let expr = BinaryExpr::new(
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(1)),
Operator::Or,
lit(ScalarValue::from(2)),
)),
Operator::Or,
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(3)),
Operator::Or,
lit(ScalarValue::from(4)),
)),
);
assert_eq!(expr.to_string(), "1 OR 2 OR 3 OR 4");
let expr = BinaryExpr::new(
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(1)),
Operator::And,
lit(ScalarValue::from(2)),
)),
Operator::Or,
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(3)),
Operator::And,
lit(ScalarValue::from(4)),
)),
);
assert_eq!(expr.to_string(), "1 AND 2 OR 3 AND 4");
let expr = BinaryExpr::new(
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(1)),
Operator::Or,
lit(ScalarValue::from(2)),
)),
Operator::And,
Arc::new(BinaryExpr::new(
lit(ScalarValue::from(3)),
Operator::Or,
lit(ScalarValue::from(4)),
)),
);
assert_eq!(expr.to_string(), "(1 OR 2) AND (3 OR 4)");
}
}