use crate::expressions::GetFieldAccessExpr;
use crate::var_provider::is_system_variables;
use crate::{
execution_props::ExecutionProps,
expressions::{self, binary, like, Column, GetIndexedFieldExpr, Literal},
functions, udf,
var_provider::VarType,
PhysicalExpr,
};
use arrow::datatypes::Schema;
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_err, DFSchema, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
use datafusion_expr::{
binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Like,
Operator, ScalarFunctionDefinition, TryCast,
};
use std::sync::Arc;
pub fn create_physical_expr(
e: &Expr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
let input_schema: &Schema = &input_dfschema.into();
match e {
Expr::Alias(Alias { expr, .. }) => {
Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
}
Expr::Column(c) => {
let idx = input_dfschema.index_of_column(c)?;
Ok(Arc::new(Column::new(&c.name, idx)))
}
Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
Expr::ScalarVariable(_, variable_names) => {
if is_system_variables(variable_names) {
match execution_props.get_var_provider(VarType::System) {
Some(provider) => {
let scalar_value = provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => plan_err!("No system variable provider found"),
}
} else {
match execution_props.get_var_provider(VarType::UserDefined) {
Some(provider) => {
let scalar_value = provider.get_value(variable_names.clone())?;
Ok(Arc::new(Literal::new(scalar_value)))
}
_ => plan_err!("No user defined variable provider found"),
}
}
}
Expr::IsTrue(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsNotDistinctFrom,
Expr::Literal(ScalarValue::Boolean(Some(true))),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::IsNotTrue(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsDistinctFrom,
Expr::Literal(ScalarValue::Boolean(Some(true))),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::IsFalse(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsNotDistinctFrom,
Expr::Literal(ScalarValue::Boolean(Some(false))),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::IsNotFalse(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsDistinctFrom,
Expr::Literal(ScalarValue::Boolean(Some(false))),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::IsUnknown(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsNotDistinctFrom,
Expr::Literal(ScalarValue::Boolean(None)),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::IsNotUnknown(expr) => {
let binary_op = binary_expr(
expr.as_ref().clone(),
Operator::IsDistinctFrom,
Expr::Literal(ScalarValue::Boolean(None)),
);
create_physical_expr(&binary_op, input_dfschema, execution_props)
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
binary(lhs, *op, rhs, input_schema)
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
if escape_char.is_some() {
return exec_err!("LIKE does not support escape_char");
}
let physical_expr =
create_physical_expr(expr, input_dfschema, execution_props)?;
let physical_pattern =
create_physical_expr(pattern, input_dfschema, execution_props)?;
like(
*negated,
*case_insensitive,
physical_expr,
physical_pattern,
input_schema,
)
}
Expr::Case(case) => {
let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
Some(create_physical_expr(
e.as_ref(),
input_dfschema,
execution_props,
)?)
} else {
None
};
let when_expr = case
.when_then_expr
.iter()
.map(|(w, _)| {
create_physical_expr(w.as_ref(), input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
let then_expr = case
.when_then_expr
.iter()
.map(|(_, t)| {
create_physical_expr(t.as_ref(), input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
when_expr
.iter()
.zip(then_expr.iter())
.map(|(w, t)| (w.clone(), t.clone()))
.collect();
let else_expr: Option<Arc<dyn PhysicalExpr>> =
if let Some(e) = &case.else_expr {
Some(create_physical_expr(
e.as_ref(),
input_dfschema,
execution_props,
)?)
} else {
None
};
Ok(expressions::case(expr, when_then_expr, else_expr)?)
}
Expr::Cast(Cast { expr, data_type }) => expressions::cast(
create_physical_expr(expr, input_dfschema, execution_props)?,
input_schema,
data_type.clone(),
),
Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
create_physical_expr(expr, input_dfschema, execution_props)?,
input_schema,
data_type.clone(),
),
Expr::Not(expr) => {
expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
}
Expr::Negative(expr) => expressions::negative(
create_physical_expr(expr, input_dfschema, execution_props)?,
input_schema,
),
Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
expr,
input_dfschema,
execution_props,
)?),
Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
expr,
input_dfschema,
execution_props,
)?),
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let field = match field {
GetFieldAccess::NamedStructField { name } => {
GetFieldAccessExpr::NamedStructField { name: name.clone() }
}
GetFieldAccess::ListIndex { key } => GetFieldAccessExpr::ListIndex {
key: create_physical_expr(key, input_dfschema, execution_props)?,
},
GetFieldAccess::ListRange {
start,
stop,
stride,
} => GetFieldAccessExpr::ListRange {
start: create_physical_expr(start, input_dfschema, execution_props)?,
stop: create_physical_expr(stop, input_dfschema, execution_props)?,
stride: create_physical_expr(
stride,
input_dfschema,
execution_props,
)?,
},
};
Ok(Arc::new(GetIndexedFieldExpr::new(
create_physical_expr(expr, input_dfschema, execution_props)?,
field,
)))
}
Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
let physical_args = args
.iter()
.map(|e| create_physical_expr(e, input_dfschema, execution_props))
.collect::<Result<Vec<_>>>()?;
match func_def {
ScalarFunctionDefinition::BuiltIn(fun) => {
functions::create_physical_expr(
fun,
&physical_args,
input_schema,
execution_props,
)
}
ScalarFunctionDefinition::UDF(fun) => {
let return_type = fun.return_type_from_exprs(args, input_dfschema)?;
udf::create_physical_expr(
fun.clone().as_ref(),
&physical_args,
return_type,
)
}
ScalarFunctionDefinition::Name(_) => {
internal_err!("Function `Expr` with name should be resolved.")
}
}
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
let binary_expr = binary(
binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?,
Operator::And,
binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?,
input_schema,
);
if *negated {
expressions::not(binary_expr?)
} else {
binary_expr
}
}
Expr::InList(InList {
expr,
list,
negated,
}) => match expr.as_ref() {
Expr::Literal(ScalarValue::Utf8(None)) => {
Ok(expressions::lit(ScalarValue::Boolean(None)))
}
_ => {
let value_expr =
create_physical_expr(expr, input_dfschema, execution_props)?;
let list_exprs = list
.iter()
.map(|expr| {
create_physical_expr(expr, input_dfschema, execution_props)
})
.collect::<Result<Vec<_>>>()?;
expressions::in_list(value_expr, list_exprs, negated, input_schema)
}
},
other => {
not_impl_err!("Physical plan does not support logical expression {other:?}")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::{col, left, Literal};
#[test]
fn test_create_physical_expr_scalar_input_output() -> Result<()> {
let expr = col("letter").eq(left("APACHE".lit(), 1i64.lit()));
let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from_iter_values(vec![
"A", "B", "C", "D",
]))],
)?;
let result = p.evaluate(&batch)?;
let result = result.into_array(4).expect("Failed to convert to array");
assert_eq!(
&result,
&(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
);
Ok(())
}
}