use std::borrow::Cow;
use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use crate::expressions::try_cast;
use crate::expressions::NoOp;
use crate::physical_expr::down_cast_any_ref;
use crate::PhysicalExpr;
use arrow::array::*;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_null, not, or, prep_null_mask_filter};
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::exec_err;
use datafusion_common::{cast::as_boolean_array, internal_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use itertools::Itertools;
type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);
#[derive(Debug, Hash)]
pub struct CaseExpr {
expr: Option<Arc<dyn PhysicalExpr>>,
when_then_expr: Vec<WhenThen>,
else_expr: Option<Arc<dyn PhysicalExpr>>,
}
impl std::fmt::Display for CaseExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "CASE ")?;
if let Some(e) = &self.expr {
write!(f, "{e} ")?;
}
for (w, t) in &self.when_then_expr {
write!(f, "WHEN {w} THEN {t} ")?;
}
if let Some(e) = &self.else_expr {
write!(f, "ELSE {e} ")?;
}
write!(f, "END")
}
}
impl CaseExpr {
pub fn try_new(
expr: Option<Arc<dyn PhysicalExpr>>,
when_then_expr: Vec<WhenThen>,
else_expr: Option<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
if when_then_expr.is_empty() {
exec_err!("There must be at least one WHEN clause")
} else {
Ok(Self {
expr,
when_then_expr,
else_expr,
})
}
}
pub fn expr(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.expr.as_ref()
}
pub fn when_then_expr(&self) -> &[WhenThen] {
&self.when_then_expr
}
pub fn else_expr(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.else_expr.as_ref()
}
}
impl CaseExpr {
fn case_when_with_expr(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let return_type = self.data_type(&batch.schema())?;
let expr = self.expr.as_ref().unwrap();
let base_value = expr.evaluate(batch)?;
let base_value = base_value.into_array(batch.num_rows());
let base_nulls = is_null(base_value.as_ref())?;
let mut current_value = new_null_array(&return_type, batch.num_rows());
let mut remainder = not(&base_nulls)?;
for i in 0..self.when_then_expr.len() {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
let when_value = when_value.into_array(batch.num_rows());
let when_match = eq(&when_value, &base_value)?;
let when_match = match when_match.null_count() {
0 => Cow::Borrowed(&when_match),
_ => Cow::Owned(prep_null_mask_filter(&when_match)),
};
let then_value = self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_match)?;
let then_value = match then_value {
ColumnarValue::Scalar(value) if value.is_null() => {
new_null_array(&return_type, batch.num_rows())
}
_ => then_value.into_array(batch.num_rows()),
};
current_value =
zip(&when_match, then_value.as_ref(), current_value.as_ref())?;
remainder = and(&remainder, ¬(&when_match)?)?;
}
if let Some(e) = &self.else_expr {
let expr = try_cast(e.clone(), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| e.clone());
remainder = or(&base_nulls, &remainder)?;
let else_ = expr
.evaluate_selection(batch, &remainder)?
.into_array(batch.num_rows());
current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
Ok(ColumnarValue::Array(current_value))
}
fn case_when_no_expr(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let return_type = self.data_type(&batch.schema())?;
let mut current_value = new_null_array(&return_type, batch.num_rows());
let mut remainder = BooleanArray::from(vec![true; batch.num_rows()]);
for i in 0..self.when_then_expr.len() {
let when_value = self.when_then_expr[i]
.0
.evaluate_selection(batch, &remainder)?;
let when_value = when_value.into_array(batch.num_rows());
let when_value = as_boolean_array(&when_value).map_err(|e| {
DataFusionError::Context(
"WHEN expression did not return a BooleanArray".to_string(),
Box::new(e),
)
})?;
let when_value = match when_value.null_count() {
0 => Cow::Borrowed(when_value),
_ => Cow::Owned(prep_null_mask_filter(when_value)),
};
let then_value = self.when_then_expr[i]
.1
.evaluate_selection(batch, &when_value)?;
let then_value = match then_value {
ColumnarValue::Scalar(value) if value.is_null() => {
new_null_array(&return_type, batch.num_rows())
}
_ => then_value.into_array(batch.num_rows()),
};
current_value =
zip(&when_value, then_value.as_ref(), current_value.as_ref())?;
remainder = and(&remainder, ¬(&when_value)?)?;
}
if let Some(e) = &self.else_expr {
let expr = try_cast(e.clone(), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| e.clone());
let else_ = expr
.evaluate_selection(batch, &remainder)?
.into_array(batch.num_rows());
current_value = zip(&remainder, else_.as_ref(), current_value.as_ref())?;
}
Ok(ColumnarValue::Array(current_value))
}
}
impl PhysicalExpr for CaseExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
let mut data_type = DataType::Null;
for i in 0..self.when_then_expr.len() {
data_type = self.when_then_expr[i].1.data_type(input_schema)?;
if !data_type.equals_datatype(&DataType::Null) {
break;
}
}
if data_type.equals_datatype(&DataType::Null) {
if let Some(e) = &self.else_expr {
data_type = e.data_type(input_schema)?;
}
}
Ok(data_type)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
let then_nullable = self
.when_then_expr
.iter()
.map(|(_, t)| t.nullable(input_schema))
.collect::<Result<Vec<_>>>()?;
if then_nullable.contains(&true) {
Ok(true)
} else if let Some(e) = &self.else_expr {
e.nullable(input_schema)
} else {
Ok(true)
}
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
if self.expr.is_some() {
self.case_when_with_expr(batch)
} else {
self.case_when_no_expr(batch)
}
}
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
let mut children = vec![];
match &self.expr {
Some(expr) => children.push(expr.clone()),
None => children.push(Arc::new(NoOp::new())),
}
self.when_then_expr.iter().for_each(|(cond, value)| {
children.push(cond.clone());
children.push(value.clone());
});
match &self.else_expr {
Some(expr) => children.push(expr.clone()),
None => children.push(Arc::new(NoOp::new())),
}
children
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
if children.len() != self.children().len() {
internal_err!("CaseExpr: Wrong number of children")
} else {
assert_eq!(children.len() % 2, 0);
let expr = match children[0].clone().as_any().downcast_ref::<NoOp>() {
Some(_) => None,
_ => Some(children[0].clone()),
};
let else_expr = match children[children.len() - 1]
.clone()
.as_any()
.downcast_ref::<NoOp>()
{
Some(_) => None,
_ => Some(children[children.len() - 1].clone()),
};
let branches = children[1..children.len() - 1].to_vec();
let mut when_then_expr: Vec<WhenThen> = vec![];
for (prev, next) in branches.into_iter().tuples() {
when_then_expr.push((prev, next));
}
Ok(Arc::new(CaseExpr::try_new(
expr,
when_then_expr,
else_expr,
)?))
}
}
fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}
impl PartialEq<dyn Any> for CaseExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
let expr_eq = match (&self.expr, &x.expr) {
(Some(expr1), Some(expr2)) => expr1.eq(expr2),
(None, None) => true,
_ => false,
};
let else_expr_eq = match (&self.else_expr, &x.else_expr) {
(Some(expr1), Some(expr2)) => expr1.eq(expr2),
(None, None) => true,
_ => false,
};
expr_eq
&& else_expr_eq
&& self.when_then_expr.len() == x.when_then_expr.len()
&& self.when_then_expr.iter().zip(x.when_then_expr.iter()).all(
|((when1, then1), (when2, then2))| {
when1.eq(when2) && then1.eq(then2)
},
)
})
.unwrap_or(false)
}
}
pub fn case(
expr: Option<Arc<dyn PhysicalExpr>>,
when_thens: Vec<WhenThen>,
else_expr: Option<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(CaseExpr::try_new(expr, when_thens, else_expr)?))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::col;
use crate::expressions::lit;
use crate::expressions::{binary, cast};
use arrow::array::StringArray;
use arrow::buffer::Buffer;
use arrow::datatypes::DataType::Float64;
use arrow::datatypes::*;
use datafusion_common::cast::{as_float64_array, as_int32_array};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::Operator;
#[test]
fn case_with_expr() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = lit("foo");
let then1 = lit(123i32);
let when2 = lit("bar");
let then2 = lit(456i32);
let expr = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1, then1), (when2, then2)],
None,
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = as_int32_array(&result)?;
let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_with_expr_else() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = lit("foo");
let then1 = lit(123i32);
let when2 = lit("bar");
let then2 = lit(456i32);
let else_value = lit(999i32);
let expr = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1, then1), (when2, then2)],
Some(else_value),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = as_int32_array(&result)?;
let expected =
&Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_with_expr_divide_by_zero() -> Result<()> {
let batch = case_test_batch1()?;
let schema = batch.schema();
let when1 = lit(0i32);
let then1 = lit(ScalarValue::Float64(None));
let else_value = binary(
lit(25.0f64),
Operator::Divide,
cast(col("a", &schema)?, &batch.schema(), Float64)?,
&batch.schema(),
)?;
let expr = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1, then1)],
Some(else_value),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
let expected = &Float64Array::from(vec![Some(25.0), None, None, Some(5.0)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_without_expr() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = binary(
col("a", &schema)?,
Operator::Eq,
lit("foo"),
&batch.schema(),
)?;
let then1 = lit(123i32);
let when2 = binary(
col("a", &schema)?,
Operator::Eq,
lit("bar"),
&batch.schema(),
)?;
let then2 = lit(456i32);
let expr = generate_case_when_with_type_coercion(
None,
vec![(when1, then1), (when2, then2)],
None,
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = as_int32_array(&result)?;
let expected = &Int32Array::from(vec![Some(123), None, None, Some(456)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_with_expr_when_null() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = lit(ScalarValue::Utf8(None));
let then1 = lit(0i32);
let when2 = col("a", &schema)?;
let then2 = lit(123i32);
let else_value = lit(999i32);
let expr = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1, then1), (when2, then2)],
Some(else_value),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = as_int32_array(&result)?;
let expected =
&Int32Array::from(vec![Some(123), Some(123), Some(999), Some(123)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_without_expr_divide_by_zero() -> Result<()> {
let batch = case_test_batch1()?;
let schema = batch.schema();
let when1 = binary(col("a", &schema)?, Operator::Gt, lit(0i32), &batch.schema())?;
let then1 = binary(
lit(25.0f64),
Operator::Divide,
cast(col("a", &schema)?, &batch.schema(), Float64)?,
&batch.schema(),
)?;
let x = lit(ScalarValue::Float64(None));
let expr = generate_case_when_with_type_coercion(
None,
vec![(when1, then1)],
Some(x),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
let expected = &Float64Array::from(vec![Some(25.0), None, None, Some(5.0)]);
assert_eq!(expected, result);
Ok(())
}
fn case_test_batch1() -> Result<RecordBatch> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let a = Int32Array::from(vec![Some(1), Some(0), None, Some(5)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
Ok(batch)
}
#[test]
fn case_without_expr_else() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = binary(
col("a", &schema)?,
Operator::Eq,
lit("foo"),
&batch.schema(),
)?;
let then1 = lit(123i32);
let when2 = binary(
col("a", &schema)?,
Operator::Eq,
lit("bar"),
&batch.schema(),
)?;
let then2 = lit(456i32);
let else_value = lit(999i32);
let expr = generate_case_when_with_type_coercion(
None,
vec![(when1, then1), (when2, then2)],
Some(else_value),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result = as_int32_array(&result)?;
let expected =
&Int32Array::from(vec![Some(123), Some(999), Some(999), Some(456)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_with_type_cast() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when = binary(
col("a", &schema)?,
Operator::Eq,
lit("foo"),
&batch.schema(),
)?;
let then = lit(123.3f64);
let else_value = lit(999i32);
let expr = generate_case_when_with_type_coercion(
None,
vec![(when, then)],
Some(else_value),
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
let expected =
&Float64Array::from(vec![Some(123.3), Some(999.0), Some(999.0), Some(999.0)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_with_matches_and_nulls() -> Result<()> {
let batch = case_test_batch_nulls()?;
let schema = batch.schema();
let when = binary(
col("load4", &schema)?,
Operator::Eq,
lit(1.77f64),
&batch.schema(),
)?;
let then = col("load4", &schema)?;
let expr = generate_case_when_with_type_coercion(
None,
vec![(when, then)],
None,
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
let expected =
&Float64Array::from(vec![Some(1.77), None, None, None, None, Some(1.77)]);
assert_eq!(expected, result);
Ok(())
}
#[test]
fn case_expr_matches_and_nulls() -> Result<()> {
let batch = case_test_batch_nulls()?;
let schema = batch.schema();
let expr = col("load4", &schema)?;
let when = lit(1.77f64);
let then = col("load4", &schema)?;
let expr = generate_case_when_with_type_coercion(
Some(expr),
vec![(when, then)],
None,
schema.as_ref(),
)?;
let result = expr.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_float64_array(&result).expect("failed to downcast to Float64Array");
let expected =
&Float64Array::from(vec![Some(1.77), None, None, None, None, Some(1.77)]);
assert_eq!(expected, result);
Ok(())
}
fn case_test_batch() -> Result<RecordBatch> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let a = StringArray::from(vec![Some("foo"), Some("baz"), None, Some("bar")]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)])?;
Ok(batch)
}
fn case_test_batch_nulls() -> Result<RecordBatch> {
let load4: Float64Array = vec![
Some(1.77), Some(1.77), Some(1.77), Some(1.78), None, Some(1.77), ]
.into_iter()
.collect();
let null_buffer = Buffer::from([0b00101001u8]);
let load4 = load4
.into_data()
.into_builder()
.null_bit_buffer(Some(null_buffer))
.build()
.unwrap();
let load4: Float64Array = load4.into();
let batch =
RecordBatch::try_from_iter(vec![("load4", Arc::new(load4) as ArrayRef)])?;
Ok(batch)
}
#[test]
fn case_test_incompatible() -> Result<()> {
let batch = case_test_batch()?;
let schema = batch.schema();
let when1 = binary(
col("a", &schema)?,
Operator::Eq,
lit("foo"),
&batch.schema(),
)?;
let then1 = lit(123i32);
let when2 = binary(
col("a", &schema)?,
Operator::Eq,
lit("bar"),
&batch.schema(),
)?;
let then2 = lit(true);
let expr = generate_case_when_with_type_coercion(
None,
vec![(when1, then1), (when2, then2)],
None,
schema.as_ref(),
);
assert!(expr.is_err());
let when1 = binary(
col("a", &schema)?,
Operator::Eq,
lit("foo"),
&batch.schema(),
)?;
let then1 = lit(123i32);
let when2 = binary(
col("a", &schema)?,
Operator::Eq,
lit("bar"),
&batch.schema(),
)?;
let then2 = lit(456i64);
let else_expr = lit(1.23f64);
let expr = generate_case_when_with_type_coercion(
None,
vec![(when1, then1), (when2, then2)],
Some(else_expr),
schema.as_ref(),
);
assert!(expr.is_ok());
let result_type = expr.unwrap().data_type(schema.as_ref())?;
assert_eq!(DataType::Float64, result_type);
Ok(())
}
#[test]
fn case_eq() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let when1 = lit("foo");
let then1 = lit(123i32);
let when2 = lit("bar");
let then2 = lit(456i32);
let else_value = lit(999i32);
let expr1 = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![
(when1.clone(), then1.clone()),
(when2.clone(), then2.clone()),
],
Some(else_value.clone()),
&schema,
)?;
let expr2 = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![
(when1.clone(), then1.clone()),
(when2.clone(), then2.clone()),
],
Some(else_value.clone()),
&schema,
)?;
let expr3 = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1.clone(), then1.clone()), (when2, then2)],
None,
&schema,
)?;
let expr4 = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![(when1, then1)],
Some(else_value),
&schema,
)?;
assert!(expr1.eq(&expr2));
assert!(expr2.eq(&expr1));
assert!(expr2.ne(&expr3));
assert!(expr3.ne(&expr2));
assert!(expr1.ne(&expr4));
assert!(expr4.ne(&expr1));
Ok(())
}
#[test]
fn case_tranform() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let when1 = lit("foo");
let then1 = lit(123i32);
let when2 = lit("bar");
let then2 = lit(456i32);
let else_value = lit(999i32);
let expr = generate_case_when_with_type_coercion(
Some(col("a", &schema)?),
vec![
(when1.clone(), then1.clone()),
(when2.clone(), then2.clone()),
],
Some(else_value.clone()),
&schema,
)?;
let expr2 = expr
.clone()
.transform(&|e| {
let transformed =
match e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
},
_ => None,
};
Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
} else {
Transformed::No(e)
})
})
.unwrap();
let expr3 = expr
.clone()
.transform_down(&|e| {
let transformed =
match e.as_any().downcast_ref::<crate::expressions::Literal>() {
Some(lit_value) => match lit_value.value() {
ScalarValue::Utf8(Some(str_value)) => {
Some(lit(str_value.to_uppercase()))
}
_ => None,
},
_ => None,
};
Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
} else {
Transformed::No(e)
})
})
.unwrap();
assert!(expr.ne(&expr2));
assert!(expr2.eq(&expr3));
Ok(())
}
fn generate_case_when_with_type_coercion(
expr: Option<Arc<dyn PhysicalExpr>>,
when_thens: Vec<WhenThen>,
else_expr: Option<Arc<dyn PhysicalExpr>>,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let coerce_type =
get_case_common_type(&when_thens, else_expr.clone(), input_schema);
let (when_thens, else_expr) = match coerce_type {
None => plan_err!(
"Can't get a common type for then {when_thens:?} and else {else_expr:?} expression"
),
Some(data_type) => {
let left = when_thens
.into_iter()
.map(|(when, then)| {
let then = try_cast(then, input_schema, data_type.clone())?;
Ok((when, then))
})
.collect::<Result<Vec<_>>>()?;
let right = match else_expr {
None => None,
Some(expr) => Some(try_cast(expr, input_schema, data_type.clone())?),
};
Ok((left, right))
}
}?;
case(expr, when_thens, else_expr)
}
fn get_case_common_type(
when_thens: &[WhenThen],
else_expr: Option<Arc<dyn PhysicalExpr>>,
input_schema: &Schema,
) -> Option<DataType> {
let thens_type = when_thens
.iter()
.map(|when_then| {
let data_type = &when_then.1.data_type(input_schema).unwrap();
data_type.clone()
})
.collect::<Vec<_>>();
let else_type = match else_expr {
None => {
thens_type[0].clone()
}
Some(else_phy_expr) => else_phy_expr.data_type(input_schema).unwrap(),
};
thens_type
.iter()
.try_fold(else_type, |left_type, right_type| {
comparison_coercion(&left_type, right_type)
})
}
}