use std::hash::{Hash, Hasher};
use std::{any::Any, sync::Arc};
use crate::{physical_expr::down_cast_any_ref, PhysicalExpr};
use arrow::compute::kernels::comparison::{
ilike_utf8, like_utf8, nilike_utf8, nlike_utf8,
};
use arrow::compute::kernels::comparison::{
ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar,
};
use arrow::{
array::{new_null_array, Array, ArrayRef, LargeStringArray, StringArray},
record_batch::RecordBatch,
};
use arrow_schema::{DataType, Schema};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::ColumnarValue;
#[derive(Debug, Hash)]
pub struct LikeExpr {
negated: bool,
case_insensitive: bool,
expr: Arc<dyn PhysicalExpr>,
pattern: Arc<dyn PhysicalExpr>,
}
impl LikeExpr {
pub fn new(
negated: bool,
case_insensitive: bool,
expr: Arc<dyn PhysicalExpr>,
pattern: Arc<dyn PhysicalExpr>,
) -> Self {
Self {
negated,
case_insensitive,
expr,
pattern,
}
}
pub fn negated(&self) -> bool {
self.negated
}
pub fn case_insensitive(&self) -> bool {
self.case_insensitive
}
pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}
pub fn pattern(&self) -> &Arc<dyn PhysicalExpr> {
&self.pattern
}
fn op_name(&self) -> &str {
match (self.negated, self.case_insensitive) {
(false, false) => "LIKE",
(true, false) => "NOT LIKE",
(false, true) => "ILIKE",
(true, true) => "NOT ILIKE",
}
}
}
impl std::fmt::Display for LikeExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{} {} {}", self.expr, self.op_name(), self.pattern)
}
}
impl PhysicalExpr for LikeExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::Boolean)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
Ok(self.expr.nullable(input_schema)? || self.pattern.nullable(input_schema)?)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let expr_value = self.expr.evaluate(batch)?;
let pattern_value = self.pattern.evaluate(batch)?;
let expr_data_type = expr_value.data_type();
let pattern_data_type = pattern_value.data_type();
match (
&expr_value,
&expr_data_type,
&pattern_value,
&pattern_data_type,
) {
(_, l, _, r) if l == r => {}
(
ColumnarValue::Array(_),
DataType::Dictionary(_, dict_t),
ColumnarValue::Scalar(_),
scalar_t,
)
| (
ColumnarValue::Scalar(_),
scalar_t,
ColumnarValue::Array(_),
DataType::Dictionary(_, dict_t),
) if dict_t.as_ref() == scalar_t => {}
_ => {
return internal_err!(
"Cannot evaluate {} expression with types {:?} and {:?}",
self.op_name(),
expr_data_type,
pattern_data_type
);
}
}
let scalar_result = match (&expr_value, &pattern_value) {
(ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
self.evaluate_array_scalar(array, scalar)?
}
(_, _) => None, };
if let Some(result) = scalar_result {
return result.map(|a| ColumnarValue::Array(a));
}
let (expr, pattern) = (
expr_value.into_array(batch.num_rows()),
pattern_value.into_array(batch.num_rows()),
);
self.evaluate_array_array(expr, pattern)
.map(|a| ColumnarValue::Array(a))
}
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone(), self.pattern.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(LikeExpr::new(
self.negated,
self.case_insensitive,
children[0].clone(),
children[1].clone(),
)))
}
fn dyn_hash(&self, state: &mut dyn Hasher) {
let mut s = state;
self.hash(&mut s);
}
}
impl PartialEq<dyn Any> for LikeExpr {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| {
self.negated == x.negated
&& self.case_insensitive == x.case_insensitive
&& self.expr.eq(&x.expr)
&& self.pattern.eq(&x.pattern)
})
.unwrap_or(false)
}
}
macro_rules! binary_string_array_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE),
DataType::LargeUtf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $OP_TYPE),
other => internal_err!(
"Data type {:?} not supported for scalar operation '{}' on string array",
other, stringify!($OP)
),
};
Some(result)
}};
}
impl LikeExpr {
fn evaluate_array_scalar(
&self,
array: &dyn Array,
scalar: &ScalarValue,
) -> Result<Option<Result<ArrayRef>>> {
let scalar_result = match (self.negated, self.case_insensitive) {
(false, false) => binary_string_array_op_scalar!(
array,
scalar.clone(),
like,
&DataType::Boolean
),
(true, false) => binary_string_array_op_scalar!(
array,
scalar.clone(),
nlike,
&DataType::Boolean
),
(false, true) => binary_string_array_op_scalar!(
array,
scalar.clone(),
ilike,
&DataType::Boolean
),
(true, true) => binary_string_array_op_scalar!(
array,
scalar.clone(),
nilike,
&DataType::Boolean
),
};
Ok(scalar_result)
}
fn evaluate_array_array(
&self,
left: Arc<dyn Array>,
right: Arc<dyn Array>,
) -> Result<ArrayRef> {
match (self.negated, self.case_insensitive) {
(false, false) => binary_string_array_op!(left, right, like),
(true, false) => binary_string_array_op!(left, right, nlike),
(false, true) => binary_string_array_op!(left, right, ilike),
(true, true) => binary_string_array_op!(left, right, nilike),
}
}
}
pub fn like(
negated: bool,
case_insensitive: bool,
expr: Arc<dyn PhysicalExpr>,
pattern: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Result<Arc<dyn PhysicalExpr>> {
let expr_type = &expr.data_type(input_schema)?;
let pattern_type = &pattern.data_type(input_schema)?;
if !expr_type.eq(pattern_type) {
return internal_err!(
"The type of {expr_type} AND {pattern_type} of like physical should be same"
);
}
Ok(Arc::new(LikeExpr::new(
negated,
case_insensitive,
expr,
pattern,
)))
}
#[cfg(test)]
mod test {
use super::*;
use crate::expressions::col;
use arrow::array::BooleanArray;
use arrow_schema::Field;
use datafusion_common::cast::as_boolean_array;
macro_rules! test_like {
($A_VEC:expr, $B_VEC:expr, $VEC:expr, $NULLABLE: expr, $NEGATED:expr, $CASE_INSENSITIVE:expr,) => {{
let schema = Schema::new(vec![
Field::new("a", DataType::Utf8, $NULLABLE),
Field::new("b", DataType::Utf8, $NULLABLE),
]);
let a = StringArray::from($A_VEC);
let b = StringArray::from($B_VEC);
let expression = like(
$NEGATED,
$CASE_INSENSITIVE,
col("a", &schema)?,
col("b", &schema)?,
&schema,
)?;
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(a), Arc::new(b)],
)?;
let result = expression.evaluate(&batch)?.into_array(batch.num_rows());
let result =
as_boolean_array(&result).expect("failed to downcast to BooleanArray");
let expected = &BooleanArray::from($VEC);
assert_eq!(expected, result);
}};
}
#[test]
fn like_op() -> Result<()> {
test_like!(
vec!["hello world", "world"],
vec!["%hello%", "%hello%"],
vec![true, false],
false,
false,
false,
); test_like!(
vec![Some("hello world"), None, Some("world")],
vec![Some("%hello%"), None, Some("%hello%")],
vec![Some(false), None, Some(true)],
true,
true,
false,
); test_like!(
vec!["hello world", "world"],
vec!["%helLo%", "%helLo%"],
vec![true, false],
false,
false,
true,
); test_like!(
vec![Some("hello world"), None, Some("world")],
vec![Some("%helLo%"), None, Some("%helLo%")],
vec![Some(false), None, Some(true)],
true,
true,
true,
); Ok(())
}
}