datafusion_physical_expr/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22    expressions::{self, binary, like, similar_to, Column, Literal},
23    PhysicalExpr,
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::{
28    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
29};
30use datafusion_expr::execution_props::ExecutionProps;
31use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
32use datafusion_expr::var_provider::is_system_variables;
33use datafusion_expr::var_provider::VarType;
34use datafusion_expr::{
35    binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
36};
37
38/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
39/// AS int)`.
40///
41/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
42/// planning, and can be evaluated directly on a [RecordBatch]. They are
43/// normally created from [Expr] by a [PhysicalPlanner] and can be created
44/// directly using [create_physical_expr].
45///
46/// A Physical expression knows its type, nullability and how to evaluate itself.
47///
48/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
49/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
50///
51/// # Example: Create `PhysicalExpr` from `Expr`
52/// ```
53/// # use arrow::datatypes::{DataType, Field, Schema};
54/// # use datafusion_common::DFSchema;
55/// # use datafusion_expr::{Expr, col, lit};
56/// # use datafusion_physical_expr::create_physical_expr;
57/// # use datafusion_expr::execution_props::ExecutionProps;
58/// // For a logical expression `a = 1`, we can create a physical expression
59/// let expr = col("a").eq(lit(1));
60/// // To create a PhysicalExpr we need 1. a schema
61/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
62/// let df_schema = DFSchema::try_from(schema).unwrap();
63/// // 2. ExecutionProps
64/// let props = ExecutionProps::new();
65/// // We can now create a PhysicalExpr:
66/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
67/// ```
68///
69/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
70/// ```
71/// # use std::sync::Arc;
72/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
73/// # use arrow::datatypes::{DataType, Field, Schema};
74/// # use datafusion_common::{assert_batches_eq, DFSchema};
75/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
76/// # use datafusion_physical_expr::create_physical_expr;
77/// # use datafusion_expr::execution_props::ExecutionProps;
78/// # let expr = col("a").eq(lit(1));
79/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
80/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
81/// # let props = ExecutionProps::new();
82/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this:
83/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
84/// // Input of [1,2,3]
85/// let input_batch = RecordBatch::try_from_iter(vec![
86///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
87/// ]).unwrap();
88/// // The result is a ColumnarValue (either an Array or a Scalar)
89/// let result = physical_expr.evaluate(&input_batch).unwrap();
90/// // In this case, a BooleanArray with the result of the comparison
91/// let ColumnarValue::Array(arr) = result else {
92///  panic!("Expected an array")
93/// };
94/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false]));
95/// ```
96///
97/// [ColumnarValue]: datafusion_expr::ColumnarValue
98///
99/// Create a physical expression from a logical expression ([Expr]).
100///
101/// # Arguments
102///
103/// * `e` - The logical expression
104/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
105///                      to qualified or unqualified fields by name.
106pub fn create_physical_expr(
107    e: &Expr,
108    input_dfschema: &DFSchema,
109    execution_props: &ExecutionProps,
110) -> Result<Arc<dyn PhysicalExpr>> {
111    let input_schema: &Schema = &input_dfschema.into();
112
113    match e {
114        Expr::Alias(Alias { expr, .. }) => {
115            Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
116        }
117        Expr::Column(c) => {
118            let idx = input_dfschema.index_of_column(c)?;
119            Ok(Arc::new(Column::new(&c.name, idx)))
120        }
121        Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
122        Expr::ScalarVariable(_, variable_names) => {
123            if is_system_variables(variable_names) {
124                match execution_props.get_var_provider(VarType::System) {
125                    Some(provider) => {
126                        let scalar_value = provider.get_value(variable_names.clone())?;
127                        Ok(Arc::new(Literal::new(scalar_value)))
128                    }
129                    _ => plan_err!("No system variable provider found"),
130                }
131            } else {
132                match execution_props.get_var_provider(VarType::UserDefined) {
133                    Some(provider) => {
134                        let scalar_value = provider.get_value(variable_names.clone())?;
135                        Ok(Arc::new(Literal::new(scalar_value)))
136                    }
137                    _ => plan_err!("No user defined variable provider found"),
138                }
139            }
140        }
141        Expr::IsTrue(expr) => {
142            let binary_op = binary_expr(
143                expr.as_ref().clone(),
144                Operator::IsNotDistinctFrom,
145                lit(true),
146            );
147            create_physical_expr(&binary_op, input_dfschema, execution_props)
148        }
149        Expr::IsNotTrue(expr) => {
150            let binary_op =
151                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
152            create_physical_expr(&binary_op, input_dfschema, execution_props)
153        }
154        Expr::IsFalse(expr) => {
155            let binary_op = binary_expr(
156                expr.as_ref().clone(),
157                Operator::IsNotDistinctFrom,
158                lit(false),
159            );
160            create_physical_expr(&binary_op, input_dfschema, execution_props)
161        }
162        Expr::IsNotFalse(expr) => {
163            let binary_op =
164                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
165            create_physical_expr(&binary_op, input_dfschema, execution_props)
166        }
167        Expr::IsUnknown(expr) => {
168            let binary_op = binary_expr(
169                expr.as_ref().clone(),
170                Operator::IsNotDistinctFrom,
171                Expr::Literal(ScalarValue::Boolean(None)),
172            );
173            create_physical_expr(&binary_op, input_dfschema, execution_props)
174        }
175        Expr::IsNotUnknown(expr) => {
176            let binary_op = binary_expr(
177                expr.as_ref().clone(),
178                Operator::IsDistinctFrom,
179                Expr::Literal(ScalarValue::Boolean(None)),
180            );
181            create_physical_expr(&binary_op, input_dfschema, execution_props)
182        }
183        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
184            // Create physical expressions for left and right operands
185            let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
186            let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
187            // Note that the logical planner is responsible
188            // for type coercion on the arguments (e.g. if one
189            // argument was originally Int32 and one was
190            // Int64 they will both be coerced to Int64).
191            //
192            // There should be no coercion during physical
193            // planning.
194            binary(lhs, *op, rhs, input_schema)
195        }
196        Expr::Like(Like {
197            negated,
198            expr,
199            pattern,
200            escape_char,
201            case_insensitive,
202        }) => {
203            // `\` is the implicit escape, see https://github.com/apache/datafusion/issues/13291
204            if escape_char.unwrap_or('\\') != '\\' {
205                return exec_err!(
206                    "LIKE does not support escape_char other than the backslash (\\)"
207                );
208            }
209            let physical_expr =
210                create_physical_expr(expr, input_dfschema, execution_props)?;
211            let physical_pattern =
212                create_physical_expr(pattern, input_dfschema, execution_props)?;
213            like(
214                *negated,
215                *case_insensitive,
216                physical_expr,
217                physical_pattern,
218                input_schema,
219            )
220        }
221        Expr::SimilarTo(Like {
222            negated,
223            expr,
224            pattern,
225            escape_char,
226            case_insensitive,
227        }) => {
228            if escape_char.is_some() {
229                return exec_err!("SIMILAR TO does not support escape_char yet");
230            }
231            let physical_expr =
232                create_physical_expr(expr, input_dfschema, execution_props)?;
233            let physical_pattern =
234                create_physical_expr(pattern, input_dfschema, execution_props)?;
235            similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
236        }
237        Expr::Case(case) => {
238            let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
239                Some(create_physical_expr(
240                    e.as_ref(),
241                    input_dfschema,
242                    execution_props,
243                )?)
244            } else {
245                None
246            };
247            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
248                .when_then_expr
249                .iter()
250                .map(|(w, t)| (w.as_ref(), t.as_ref()))
251                .unzip();
252            let when_expr =
253                create_physical_exprs(when_expr, input_dfschema, execution_props)?;
254            let then_expr =
255                create_physical_exprs(then_expr, input_dfschema, execution_props)?;
256            let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
257                when_expr
258                    .iter()
259                    .zip(then_expr.iter())
260                    .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
261                    .collect();
262            let else_expr: Option<Arc<dyn PhysicalExpr>> =
263                if let Some(e) = &case.else_expr {
264                    Some(create_physical_expr(
265                        e.as_ref(),
266                        input_dfschema,
267                        execution_props,
268                    )?)
269                } else {
270                    None
271                };
272            Ok(expressions::case(expr, when_then_expr, else_expr)?)
273        }
274        Expr::Cast(Cast { expr, data_type }) => expressions::cast(
275            create_physical_expr(expr, input_dfschema, execution_props)?,
276            input_schema,
277            data_type.clone(),
278        ),
279        Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
280            create_physical_expr(expr, input_dfschema, execution_props)?,
281            input_schema,
282            data_type.clone(),
283        ),
284        Expr::Not(expr) => {
285            expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
286        }
287        Expr::Negative(expr) => expressions::negative(
288            create_physical_expr(expr, input_dfschema, execution_props)?,
289            input_schema,
290        ),
291        Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
292            expr,
293            input_dfschema,
294            execution_props,
295        )?),
296        Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
297            expr,
298            input_dfschema,
299            execution_props,
300        )?),
301        Expr::ScalarFunction(ScalarFunction { func, args }) => {
302            let physical_args =
303                create_physical_exprs(args, input_dfschema, execution_props)?;
304
305            Ok(Arc::new(ScalarFunctionExpr::try_new(
306                Arc::clone(func),
307                physical_args,
308                input_schema,
309            )?))
310        }
311        Expr::Between(Between {
312            expr,
313            negated,
314            low,
315            high,
316        }) => {
317            let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
318            let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
319            let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
320
321            // rewrite the between into the two binary operators
322            let binary_expr = binary(
323                binary(
324                    Arc::clone(&value_expr),
325                    Operator::GtEq,
326                    low_expr,
327                    input_schema,
328                )?,
329                Operator::And,
330                binary(
331                    Arc::clone(&value_expr),
332                    Operator::LtEq,
333                    high_expr,
334                    input_schema,
335                )?,
336                input_schema,
337            );
338
339            if *negated {
340                expressions::not(binary_expr?)
341            } else {
342                binary_expr
343            }
344        }
345        Expr::InList(InList {
346            expr,
347            list,
348            negated,
349        }) => match expr.as_ref() {
350            Expr::Literal(ScalarValue::Utf8(None)) => {
351                Ok(expressions::lit(ScalarValue::Boolean(None)))
352            }
353            _ => {
354                let value_expr =
355                    create_physical_expr(expr, input_dfschema, execution_props)?;
356
357                let list_exprs =
358                    create_physical_exprs(list, input_dfschema, execution_props)?;
359                expressions::in_list(value_expr, list_exprs, negated, input_schema)
360            }
361        },
362        Expr::Placeholder(Placeholder { id, .. }) => {
363            exec_err!("Placeholder '{id}' was not provided a value for execution.")
364        }
365        other => {
366            not_impl_err!("Physical plan does not support logical expression {other:?}")
367        }
368    }
369}
370
371/// Create vector of Physical Expression from a vector of logical expression
372pub fn create_physical_exprs<'a, I>(
373    exprs: I,
374    input_dfschema: &DFSchema,
375    execution_props: &ExecutionProps,
376) -> Result<Vec<Arc<dyn PhysicalExpr>>>
377where
378    I: IntoIterator<Item = &'a Expr>,
379{
380    exprs
381        .into_iter()
382        .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
383        .collect::<Result<Vec<_>>>()
384}
385
386/// Convert a logical expression to a physical expression (without any simplification, etc)
387pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
388    let df_schema = schema.clone().to_dfschema().unwrap();
389    let execution_props = ExecutionProps::new();
390    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
391}
392
393#[cfg(test)]
394mod tests {
395    use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
396    use arrow::datatypes::{DataType, Field};
397
398    use datafusion_expr::{col, lit};
399
400    use super::*;
401
402    #[test]
403    fn test_create_physical_expr_scalar_input_output() -> Result<()> {
404        let expr = col("letter").eq(lit("A"));
405
406        let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
407        let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
408        let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
409
410        let batch = RecordBatch::try_new(
411            Arc::new(schema),
412            vec![Arc::new(StringArray::from_iter_values(vec![
413                "A", "B", "C", "D",
414            ]))],
415        )?;
416        let result = p.evaluate(&batch)?;
417        let result = result.into_array(4).expect("Failed to convert to array");
418
419        assert_eq!(
420            &result,
421            &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
422        );
423
424        Ok(())
425    }
426}