datafusion_physical_expr/expressions/
like.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::hash::Hash;
19use std::{any::Any, sync::Arc};
20
21use crate::PhysicalExpr;
22use arrow::datatypes::{DataType, Schema};
23use arrow::record_batch::RecordBatch;
24use datafusion_common::{internal_err, Result};
25use datafusion_expr::ColumnarValue;
26use datafusion_physical_expr_common::datum::apply_cmp;
27
28// Like expression
29#[derive(Debug, Eq)]
30pub struct LikeExpr {
31    negated: bool,
32    case_insensitive: bool,
33    expr: Arc<dyn PhysicalExpr>,
34    pattern: Arc<dyn PhysicalExpr>,
35}
36
37// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808
38impl PartialEq for LikeExpr {
39    fn eq(&self, other: &Self) -> bool {
40        self.negated == other.negated
41            && self.case_insensitive == other.case_insensitive
42            && self.expr.eq(&other.expr)
43            && self.pattern.eq(&other.pattern)
44    }
45}
46
47impl Hash for LikeExpr {
48    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
49        self.negated.hash(state);
50        self.case_insensitive.hash(state);
51        self.expr.hash(state);
52        self.pattern.hash(state);
53    }
54}
55
56impl LikeExpr {
57    pub fn new(
58        negated: bool,
59        case_insensitive: bool,
60        expr: Arc<dyn PhysicalExpr>,
61        pattern: Arc<dyn PhysicalExpr>,
62    ) -> Self {
63        Self {
64            negated,
65            case_insensitive,
66            expr,
67            pattern,
68        }
69    }
70
71    /// Is negated
72    pub fn negated(&self) -> bool {
73        self.negated
74    }
75
76    /// Is case insensitive
77    pub fn case_insensitive(&self) -> bool {
78        self.case_insensitive
79    }
80
81    /// Input expression
82    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
83        &self.expr
84    }
85
86    /// Pattern expression
87    pub fn pattern(&self) -> &Arc<dyn PhysicalExpr> {
88        &self.pattern
89    }
90
91    /// Operator name
92    fn op_name(&self) -> &str {
93        match (self.negated, self.case_insensitive) {
94            (false, false) => "LIKE",
95            (true, false) => "NOT LIKE",
96            (false, true) => "ILIKE",
97            (true, true) => "NOT ILIKE",
98        }
99    }
100}
101
102impl std::fmt::Display for LikeExpr {
103    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104        write!(f, "{} {} {}", self.expr, self.op_name(), self.pattern)
105    }
106}
107
108impl PhysicalExpr for LikeExpr {
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112
113    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
114        Ok(DataType::Boolean)
115    }
116
117    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
118        Ok(self.expr.nullable(input_schema)? || self.pattern.nullable(input_schema)?)
119    }
120
121    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
122        use arrow::compute::*;
123        let lhs = self.expr.evaluate(batch)?;
124        let rhs = self.pattern.evaluate(batch)?;
125        match (self.negated, self.case_insensitive) {
126            (false, false) => apply_cmp(&lhs, &rhs, like),
127            (false, true) => apply_cmp(&lhs, &rhs, ilike),
128            (true, false) => apply_cmp(&lhs, &rhs, nlike),
129            (true, true) => apply_cmp(&lhs, &rhs, nilike),
130        }
131    }
132
133    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
134        vec![&self.expr, &self.pattern]
135    }
136
137    fn with_new_children(
138        self: Arc<Self>,
139        children: Vec<Arc<dyn PhysicalExpr>>,
140    ) -> Result<Arc<dyn PhysicalExpr>> {
141        Ok(Arc::new(LikeExpr::new(
142            self.negated,
143            self.case_insensitive,
144            Arc::clone(&children[0]),
145            Arc::clone(&children[1]),
146        )))
147    }
148
149    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        self.expr.fmt_sql(f)?;
151        write!(f, " {} ", self.op_name())?;
152        self.pattern.fmt_sql(f)
153    }
154}
155
156/// used for optimize Dictionary like
157fn can_like_type(from_type: &DataType) -> bool {
158    match from_type {
159        DataType::Dictionary(_, inner_type_from) => **inner_type_from == DataType::Utf8,
160        _ => false,
161    }
162}
163
164/// Create a like expression, erroring if the argument types are not compatible.
165pub fn like(
166    negated: bool,
167    case_insensitive: bool,
168    expr: Arc<dyn PhysicalExpr>,
169    pattern: Arc<dyn PhysicalExpr>,
170    input_schema: &Schema,
171) -> Result<Arc<dyn PhysicalExpr>> {
172    let expr_type = &expr.data_type(input_schema)?;
173    let pattern_type = &pattern.data_type(input_schema)?;
174    if !expr_type.eq(pattern_type) && !can_like_type(expr_type) {
175        return internal_err!(
176            "The type of {expr_type} AND {pattern_type} of like physical should be same"
177        );
178    }
179    Ok(Arc::new(LikeExpr::new(
180        negated,
181        case_insensitive,
182        expr,
183        pattern,
184    )))
185}
186
187#[cfg(test)]
188mod test {
189    use super::*;
190    use crate::expressions::col;
191    use arrow::array::*;
192    use arrow::datatypes::Field;
193    use datafusion_common::cast::as_boolean_array;
194    use datafusion_physical_expr_common::physical_expr::fmt_sql;
195
196    macro_rules! test_like {
197        ($A_VEC:expr, $B_VEC:expr, $VEC:expr, $NULLABLE: expr, $NEGATED:expr, $CASE_INSENSITIVE:expr,) => {{
198            let schema = Schema::new(vec![
199                Field::new("a", DataType::Utf8, $NULLABLE),
200                Field::new("b", DataType::Utf8, $NULLABLE),
201            ]);
202            let a = StringArray::from($A_VEC);
203            let b = StringArray::from($B_VEC);
204
205            let expression = like(
206                $NEGATED,
207                $CASE_INSENSITIVE,
208                col("a", &schema)?,
209                col("b", &schema)?,
210                &schema,
211            )?;
212            let batch = RecordBatch::try_new(
213                Arc::new(schema.clone()),
214                vec![Arc::new(a), Arc::new(b)],
215            )?;
216
217            // compute
218            let result = expression
219                .evaluate(&batch)?
220                .into_array(batch.num_rows())
221                .expect("Failed to convert to array");
222            let result =
223                as_boolean_array(&result).expect("failed to downcast to BooleanArray");
224            let expected = &BooleanArray::from($VEC);
225            assert_eq!(expected, result);
226        }};
227    }
228
229    #[test]
230    fn like_op() -> Result<()> {
231        test_like!(
232            vec!["hello world", "world"],
233            vec!["%hello%", "%hello%"],
234            vec![true, false],
235            false,
236            false,
237            false,
238        ); // like
239        test_like!(
240            vec![Some("hello world"), None, Some("world")],
241            vec![Some("%hello%"), None, Some("%hello%")],
242            vec![Some(false), None, Some(true)],
243            true,
244            true,
245            false,
246        ); // not like
247        test_like!(
248            vec!["hello world", "world"],
249            vec!["%helLo%", "%helLo%"],
250            vec![true, false],
251            false,
252            false,
253            true,
254        ); // ilike
255        test_like!(
256            vec![Some("hello world"), None, Some("world")],
257            vec![Some("%helLo%"), None, Some("%helLo%")],
258            vec![Some(false), None, Some(true)],
259            true,
260            true,
261            true,
262        ); // not ilike
263
264        Ok(())
265    }
266
267    #[test]
268    fn test_fmt_sql() -> Result<()> {
269        let schema = Schema::new(vec![
270            Field::new("a", DataType::Utf8, false),
271            Field::new("b", DataType::Utf8, false),
272        ]);
273
274        let expr = like(
275            false,
276            false,
277            col("a", &schema)?,
278            col("b", &schema)?,
279            &schema,
280        )?;
281
282        // Display format
283        let display_string = expr.to_string();
284        assert_eq!(display_string, "a@0 LIKE b@1");
285
286        // fmt_sql format
287        let sql_string = fmt_sql(expr.as_ref()).to_string();
288        assert_eq!(sql_string, "a LIKE b");
289
290        Ok(())
291    }
292}