1use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22 expressions::{self, binary, like, similar_to, Column, Literal},
23 PhysicalExpr,
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::{
28 exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
29};
30use datafusion_expr::execution_props::ExecutionProps;
31use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
32use datafusion_expr::var_provider::is_system_variables;
33use datafusion_expr::var_provider::VarType;
34use datafusion_expr::{
35 binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
36};
37
38pub fn create_physical_expr(
107 e: &Expr,
108 input_dfschema: &DFSchema,
109 execution_props: &ExecutionProps,
110) -> Result<Arc<dyn PhysicalExpr>> {
111 let input_schema: &Schema = &input_dfschema.into();
112
113 match e {
114 Expr::Alias(Alias { expr, .. }) => {
115 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
116 }
117 Expr::Column(c) => {
118 let idx = input_dfschema.index_of_column(c)?;
119 Ok(Arc::new(Column::new(&c.name, idx)))
120 }
121 Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
122 Expr::ScalarVariable(_, variable_names) => {
123 if is_system_variables(variable_names) {
124 match execution_props.get_var_provider(VarType::System) {
125 Some(provider) => {
126 let scalar_value = provider.get_value(variable_names.clone())?;
127 Ok(Arc::new(Literal::new(scalar_value)))
128 }
129 _ => plan_err!("No system variable provider found"),
130 }
131 } else {
132 match execution_props.get_var_provider(VarType::UserDefined) {
133 Some(provider) => {
134 let scalar_value = provider.get_value(variable_names.clone())?;
135 Ok(Arc::new(Literal::new(scalar_value)))
136 }
137 _ => plan_err!("No user defined variable provider found"),
138 }
139 }
140 }
141 Expr::IsTrue(expr) => {
142 let binary_op = binary_expr(
143 expr.as_ref().clone(),
144 Operator::IsNotDistinctFrom,
145 lit(true),
146 );
147 create_physical_expr(&binary_op, input_dfschema, execution_props)
148 }
149 Expr::IsNotTrue(expr) => {
150 let binary_op =
151 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
152 create_physical_expr(&binary_op, input_dfschema, execution_props)
153 }
154 Expr::IsFalse(expr) => {
155 let binary_op = binary_expr(
156 expr.as_ref().clone(),
157 Operator::IsNotDistinctFrom,
158 lit(false),
159 );
160 create_physical_expr(&binary_op, input_dfschema, execution_props)
161 }
162 Expr::IsNotFalse(expr) => {
163 let binary_op =
164 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
165 create_physical_expr(&binary_op, input_dfschema, execution_props)
166 }
167 Expr::IsUnknown(expr) => {
168 let binary_op = binary_expr(
169 expr.as_ref().clone(),
170 Operator::IsNotDistinctFrom,
171 Expr::Literal(ScalarValue::Boolean(None)),
172 );
173 create_physical_expr(&binary_op, input_dfschema, execution_props)
174 }
175 Expr::IsNotUnknown(expr) => {
176 let binary_op = binary_expr(
177 expr.as_ref().clone(),
178 Operator::IsDistinctFrom,
179 Expr::Literal(ScalarValue::Boolean(None)),
180 );
181 create_physical_expr(&binary_op, input_dfschema, execution_props)
182 }
183 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
184 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
186 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
187 binary(lhs, *op, rhs, input_schema)
195 }
196 Expr::Like(Like {
197 negated,
198 expr,
199 pattern,
200 escape_char,
201 case_insensitive,
202 }) => {
203 if escape_char.unwrap_or('\\') != '\\' {
205 return exec_err!(
206 "LIKE does not support escape_char other than the backslash (\\)"
207 );
208 }
209 let physical_expr =
210 create_physical_expr(expr, input_dfschema, execution_props)?;
211 let physical_pattern =
212 create_physical_expr(pattern, input_dfschema, execution_props)?;
213 like(
214 *negated,
215 *case_insensitive,
216 physical_expr,
217 physical_pattern,
218 input_schema,
219 )
220 }
221 Expr::SimilarTo(Like {
222 negated,
223 expr,
224 pattern,
225 escape_char,
226 case_insensitive,
227 }) => {
228 if escape_char.is_some() {
229 return exec_err!("SIMILAR TO does not support escape_char yet");
230 }
231 let physical_expr =
232 create_physical_expr(expr, input_dfschema, execution_props)?;
233 let physical_pattern =
234 create_physical_expr(pattern, input_dfschema, execution_props)?;
235 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
236 }
237 Expr::Case(case) => {
238 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
239 Some(create_physical_expr(
240 e.as_ref(),
241 input_dfschema,
242 execution_props,
243 )?)
244 } else {
245 None
246 };
247 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
248 .when_then_expr
249 .iter()
250 .map(|(w, t)| (w.as_ref(), t.as_ref()))
251 .unzip();
252 let when_expr =
253 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
254 let then_expr =
255 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
256 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
257 when_expr
258 .iter()
259 .zip(then_expr.iter())
260 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
261 .collect();
262 let else_expr: Option<Arc<dyn PhysicalExpr>> =
263 if let Some(e) = &case.else_expr {
264 Some(create_physical_expr(
265 e.as_ref(),
266 input_dfschema,
267 execution_props,
268 )?)
269 } else {
270 None
271 };
272 Ok(expressions::case(expr, when_then_expr, else_expr)?)
273 }
274 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
275 create_physical_expr(expr, input_dfschema, execution_props)?,
276 input_schema,
277 data_type.clone(),
278 ),
279 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
280 create_physical_expr(expr, input_dfschema, execution_props)?,
281 input_schema,
282 data_type.clone(),
283 ),
284 Expr::Not(expr) => {
285 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
286 }
287 Expr::Negative(expr) => expressions::negative(
288 create_physical_expr(expr, input_dfschema, execution_props)?,
289 input_schema,
290 ),
291 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
292 expr,
293 input_dfschema,
294 execution_props,
295 )?),
296 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
297 expr,
298 input_dfschema,
299 execution_props,
300 )?),
301 Expr::ScalarFunction(ScalarFunction { func, args }) => {
302 let physical_args =
303 create_physical_exprs(args, input_dfschema, execution_props)?;
304
305 Ok(Arc::new(ScalarFunctionExpr::try_new(
306 Arc::clone(func),
307 physical_args,
308 input_schema,
309 )?))
310 }
311 Expr::Between(Between {
312 expr,
313 negated,
314 low,
315 high,
316 }) => {
317 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
318 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
319 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
320
321 let binary_expr = binary(
323 binary(
324 Arc::clone(&value_expr),
325 Operator::GtEq,
326 low_expr,
327 input_schema,
328 )?,
329 Operator::And,
330 binary(
331 Arc::clone(&value_expr),
332 Operator::LtEq,
333 high_expr,
334 input_schema,
335 )?,
336 input_schema,
337 );
338
339 if *negated {
340 expressions::not(binary_expr?)
341 } else {
342 binary_expr
343 }
344 }
345 Expr::InList(InList {
346 expr,
347 list,
348 negated,
349 }) => match expr.as_ref() {
350 Expr::Literal(ScalarValue::Utf8(None)) => {
351 Ok(expressions::lit(ScalarValue::Boolean(None)))
352 }
353 _ => {
354 let value_expr =
355 create_physical_expr(expr, input_dfschema, execution_props)?;
356
357 let list_exprs =
358 create_physical_exprs(list, input_dfschema, execution_props)?;
359 expressions::in_list(value_expr, list_exprs, negated, input_schema)
360 }
361 },
362 Expr::Placeholder(Placeholder { id, .. }) => {
363 exec_err!("Placeholder '{id}' was not provided a value for execution.")
364 }
365 other => {
366 not_impl_err!("Physical plan does not support logical expression {other:?}")
367 }
368 }
369}
370
371pub fn create_physical_exprs<'a, I>(
373 exprs: I,
374 input_dfschema: &DFSchema,
375 execution_props: &ExecutionProps,
376) -> Result<Vec<Arc<dyn PhysicalExpr>>>
377where
378 I: IntoIterator<Item = &'a Expr>,
379{
380 exprs
381 .into_iter()
382 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
383 .collect::<Result<Vec<_>>>()
384}
385
386pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
388 let df_schema = schema.clone().to_dfschema().unwrap();
389 let execution_props = ExecutionProps::new();
390 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
391}
392
393#[cfg(test)]
394mod tests {
395 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
396 use arrow::datatypes::{DataType, Field};
397
398 use datafusion_expr::{col, lit};
399
400 use super::*;
401
402 #[test]
403 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
404 let expr = col("letter").eq(lit("A"));
405
406 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
407 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
408 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
409
410 let batch = RecordBatch::try_new(
411 Arc::new(schema),
412 vec![Arc::new(StringArray::from_iter_values(vec![
413 "A", "B", "C", "D",
414 ]))],
415 )?;
416 let result = p.evaluate(&batch)?;
417 let result = result.into_array(4).expect("Failed to convert to array");
418
419 assert_eq!(
420 &result,
421 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
422 );
423
424 Ok(())
425 }
426}