use arrow_schema::DataType;
use arrow_schema::TimeUnit;
use datafusion_expr::planner::{
PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
};
use sqlparser::ast::{
BinaryOperator, CastKind, DictionaryField, Expr as SQLExpr, MapEntry, StructField,
Subscript, TrimWhereField, Value,
};
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
ScalarValue,
};
use datafusion_expr::expr::InList;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
Operator, TryCast,
};
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
mod binary_op;
mod function;
mod grouping_set;
mod identifier;
mod order_by;
mod subquery;
mod substring;
mod unary_op;
mod value;
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn sql_expr_to_logical_expr(
&self,
sql: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
enum StackEntry {
SQLExpr(Box<SQLExpr>),
Operator(sqlparser::ast::BinaryOperator),
}
let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
let mut eval_stack = vec![];
while let Some(entry) = stack.pop() {
match entry {
StackEntry::SQLExpr(sql_expr) => {
match *sql_expr {
SQLExpr::BinaryOp { left, op, right } => {
stack.push(StackEntry::Operator(op));
stack.push(StackEntry::SQLExpr(right));
stack.push(StackEntry::SQLExpr(left));
}
_ => {
let expr = self.sql_expr_to_logical_expr_internal(
*sql_expr,
schema,
planner_context,
)?;
eval_stack.push(expr);
}
}
}
StackEntry::Operator(op) => {
let right = eval_stack.pop().unwrap();
let left = eval_stack.pop().unwrap();
let expr = self.build_logical_expr(op, left, right, schema)?;
eval_stack.push(expr);
}
}
}
assert_eq!(1, eval_stack.len());
let expr = eval_stack.pop().unwrap();
Ok(expr)
}
fn build_logical_expr(
&self,
op: BinaryOperator,
left: Expr,
right: Expr,
schema: &DFSchema,
) -> Result<Expr> {
let mut binary_expr = RawBinaryExpr { op, left, right };
for planner in self.context_provider.get_expr_planners() {
match planner.plan_binary_op(binary_expr, schema)? {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => {
binary_expr = expr;
}
}
}
let RawBinaryExpr { op, left, right } = binary_expr;
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
self.parse_sql_binary_op(op)?,
Box::new(right),
)))
}
pub fn sql_to_expr(
&self,
sql: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
expr = self.rewrite_partial_qualifier(expr, schema);
self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
let expr = expr.infer_placeholder_types(schema)?;
Ok(expr)
}
fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
match expr {
Expr::Column(col) => match &col.relation {
Some(q) => {
match schema.iter().find(|(qualifier, field)| match qualifier {
Some(field_q) => {
field.name() == &col.name
&& field_q.to_string().ends_with(&format!(".{q}"))
}
_ => false,
}) {
Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
None => Expr::Column(col),
}
}
None => Expr::Column(col),
},
_ => expr,
}
}
fn sql_expr_to_logical_expr_internal(
&self,
sql: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
match sql {
SQLExpr::Value(value) => {
self.parse_value(value, planner_context.prepare_param_data_types())
}
SQLExpr::Extract { field, expr } => {
let mut extract_args = vec![
Expr::Literal(ScalarValue::from(format!("{field}"))),
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
];
for planner in self.context_provider.get_expr_planners() {
match planner.plan_extract(extract_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
extract_args = args;
}
}
}
not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
}
SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
SQLExpr::Interval(interval) => {
self.sql_interval_to_expr(false, interval, schema, planner_context)
}
SQLExpr::Identifier(id) => {
self.sql_identifier_to_expr(id, schema, planner_context)
}
SQLExpr::MapAccess { .. } => {
not_impl_err!("Map Access")
}
SQLExpr::Subscript { expr, subscript } => {
let expr =
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
let field_access = match *subscript {
Subscript::Index { index } => {
match index {
SQLExpr::Value(
Value::SingleQuotedString(s)
| Value::DoubleQuotedString(s),
) => GetFieldAccess::NamedStructField {
name: ScalarValue::from(s),
},
SQLExpr::JsonAccess { .. } => {
return not_impl_err!("JsonAccess");
}
_ => GetFieldAccess::ListIndex {
key: Box::new(self.sql_expr_to_logical_expr(
index,
schema,
planner_context,
)?),
},
}
}
Subscript::Slice {
lower_bound,
upper_bound,
stride,
} => {
let lower_bound = if let Some(lower_bound) = lower_bound {
self.sql_expr_to_logical_expr(
lower_bound,
schema,
planner_context,
)
} else {
not_impl_err!("Slice subscript requires a lower bound")
}?;
let upper_bound = if let Some(upper_bound) = upper_bound {
self.sql_expr_to_logical_expr(
upper_bound,
schema,
planner_context,
)
} else {
not_impl_err!("Slice subscript requires an upper bound")
}?;
let stride = if let Some(stride) = stride {
self.sql_expr_to_logical_expr(
stride,
schema,
planner_context,
)?
} else {
lit(1i64)
};
GetFieldAccess::ListRange {
start: Box::new(lower_bound),
stop: Box::new(upper_bound),
stride: Box::new(stride),
}
}
};
let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
for planner in self.context_provider.get_expr_planners() {
match planner.plan_field_access(field_access_expr, schema)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(expr) => {
field_access_expr = expr;
}
}
}
not_impl_err!(
"GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
)
}
SQLExpr::CompoundIdentifier(ids) => {
self.sql_compound_identifier_to_expr(ids, schema, planner_context)
}
SQLExpr::Case {
operand,
conditions,
results,
else_result,
} => self.sql_case_identifier_to_expr(
operand,
conditions,
results,
else_result,
schema,
planner_context,
),
SQLExpr::Cast {
kind: CastKind::Cast | CastKind::DoubleColon,
expr,
data_type,
format,
} => {
if let Some(format) = format {
return not_impl_err!("CAST with format is not supported: {format}");
}
let dt = self.convert_data_type(&data_type)?;
let expr =
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?;
let expr = match &dt {
DataType::Timestamp(TimeUnit::Nanosecond, tz)
if expr.get_type(schema)? == DataType::Int64 =>
{
Expr::Cast(Cast::new(
Box::new(expr),
DataType::Timestamp(TimeUnit::Second, tz.clone()),
))
}
_ => expr,
};
Ok(Expr::Cast(Cast::new(Box::new(expr), dt)))
}
SQLExpr::Cast {
kind: CastKind::TryCast | CastKind::SafeCast,
expr,
data_type,
format,
} => {
if let Some(format) = format {
return not_impl_err!("CAST with format is not supported: {format}");
}
Ok(Expr::TryCast(TryCast::new(
Box::new(self.sql_expr_to_logical_expr(
*expr,
schema,
planner_context,
)?),
self.convert_data_type(&data_type)?,
)))
}
SQLExpr::TypedString { data_type, value } => Ok(Expr::Cast(Cast::new(
Box::new(lit(value)),
self.convert_data_type(&data_type)?,
))),
SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsDistinctFrom(left, right) => {
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.sql_expr_to_logical_expr(
*left,
schema,
planner_context,
)?),
Operator::IsDistinctFrom,
Box::new(self.sql_expr_to_logical_expr(
*right,
schema,
planner_context,
)?),
)))
}
SQLExpr::IsNotDistinctFrom(left, right) => {
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.sql_expr_to_logical_expr(
*left,
schema,
planner_context,
)?),
Operator::IsNotDistinctFrom,
Box::new(self.sql_expr_to_logical_expr(
*right,
schema,
planner_context,
)?),
)))
}
SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
))),
SQLExpr::UnaryOp { op, expr } => {
self.parse_sql_unary_op(op, *expr, schema, planner_context)
}
SQLExpr::Between {
expr,
negated,
low,
high,
} => Ok(Expr::Between(Between::new(
Box::new(self.sql_expr_to_logical_expr(
*expr,
schema,
planner_context,
)?),
negated,
Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
Box::new(self.sql_expr_to_logical_expr(
*high,
schema,
planner_context,
)?),
))),
SQLExpr::InList {
expr,
list,
negated,
} => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
SQLExpr::Like {
negated,
expr,
pattern,
escape_char,
} => self.sql_like_to_expr(
negated,
*expr,
*pattern,
escape_char,
schema,
planner_context,
false,
),
SQLExpr::ILike {
negated,
expr,
pattern,
escape_char,
} => self.sql_like_to_expr(
negated,
*expr,
*pattern,
escape_char,
schema,
planner_context,
true,
),
SQLExpr::SimilarTo {
negated,
expr,
pattern,
escape_char,
} => self.sql_similarto_to_expr(
negated,
*expr,
*pattern,
escape_char,
schema,
planner_context,
),
SQLExpr::BinaryOp { .. } => {
internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
}
#[cfg(feature = "unicode_expressions")]
SQLExpr::Substring {
expr,
substring_from,
substring_for,
special: _,
} => self.sql_substring_to_expr(
expr,
substring_from,
substring_for,
schema,
planner_context,
),
#[cfg(not(feature = "unicode_expressions"))]
SQLExpr::Substring { .. } => {
internal_err!(
"statement substring requires compilation with feature flag: unicode_expressions."
)
}
SQLExpr::Trim {
expr,
trim_where,
trim_what,
trim_characters,
} => self.sql_trim_to_expr(
*expr,
trim_where,
trim_what,
trim_characters,
schema,
planner_context,
),
SQLExpr::Function(function) => {
self.sql_function_to_expr(function, schema, planner_context)
}
SQLExpr::Rollup(exprs) => {
self.sql_rollup_to_expr(exprs, schema, planner_context)
}
SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
SQLExpr::GroupingSets(exprs) => {
self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
}
SQLExpr::Floor {
expr,
field: _field,
} => self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context),
SQLExpr::Ceil {
expr,
field: _field,
} => self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context),
SQLExpr::Overlay {
expr,
overlay_what,
overlay_from,
overlay_for,
} => self.sql_overlay_to_expr(
*expr,
*overlay_what,
*overlay_from,
overlay_for,
schema,
planner_context,
),
SQLExpr::Nested(e) => {
self.sql_expr_to_logical_expr(*e, schema, planner_context)
}
SQLExpr::Exists { subquery, negated } => {
self.parse_exists_subquery(*subquery, negated, schema, planner_context)
}
SQLExpr::InSubquery {
expr,
subquery,
negated,
} => {
self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
}
SQLExpr::Subquery(subquery) => {
self.parse_scalar_subquery(*subquery, schema, planner_context)
}
SQLExpr::Struct { values, fields } => {
self.parse_struct(schema, planner_context, values, fields)
}
SQLExpr::Position { expr, r#in } => {
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
}
SQLExpr::AtTimeZone {
timestamp,
time_zone,
} => Ok(Expr::Cast(Cast::new(
Box::new(self.sql_expr_to_logical_expr_internal(
*timestamp,
schema,
planner_context,
)?),
match *time_zone {
SQLExpr::Value(Value::SingleQuotedString(s)) => {
DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into()))
}
_ => {
return not_impl_err!(
"Unsupported ast node in sqltorel: {time_zone:?}"
)
}
},
))),
SQLExpr::Dictionary(fields) => {
self.try_plan_dictionary_literal(fields, schema, planner_context)
}
SQLExpr::Map(map) => {
self.try_plan_map_literal(map.entries, schema, planner_context)
}
SQLExpr::AnyOp {
left,
compare_op,
right,
} => {
let mut binary_expr = RawBinaryExpr {
op: compare_op,
left: self.sql_expr_to_logical_expr(
*left,
schema,
planner_context,
)?,
right: self.sql_expr_to_logical_expr(
*right,
schema,
planner_context,
)?,
};
for planner in self.context_provider.get_expr_planners() {
match planner.plan_any(binary_expr)? {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => {
binary_expr = expr;
}
}
}
not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}")
}
_ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
}
}
fn parse_struct(
&self,
schema: &DFSchema,
planner_context: &mut PlannerContext,
values: Vec<sqlparser::ast::Expr>,
fields: Vec<StructField>,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}
let is_named_struct = values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }));
let mut create_struct_args = if is_named_struct {
self.create_named_struct_expr(values, schema, planner_context)?
} else {
self.create_struct_expr(values, schema, planner_context)?
};
for planner in self.context_provider.get_expr_planners() {
match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => create_struct_args = args,
}
}
not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
}
fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
str_expr: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let substr =
self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
let mut position_args = vec![fullstr, substr];
for planner in self.context_provider.get_expr_planners() {
match planner.plan_position(position_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => {
position_args = args;
}
}
}
not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
}
fn try_plan_dictionary_literal(
&self,
fields: Vec<DictionaryField>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let mut keys = vec![];
let mut values = vec![];
for field in fields {
let key = lit(field.key.value);
let value =
self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
keys.push(key);
values.push(value);
}
let mut raw_expr = RawDictionaryExpr { keys, values };
for planner in self.context_provider.get_expr_planners() {
match planner.plan_dictionary_literal(raw_expr, schema)? {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => raw_expr = expr,
}
}
not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
}
fn try_plan_map_literal(
&self,
entries: Vec<MapEntry>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let mut exprs: Vec<_> = entries
.into_iter()
.flat_map(|entry| vec![entry.key, entry.value].into_iter())
.map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
for planner in self.context_provider.get_expr_planners() {
match planner.plan_make_map(exprs)? {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => exprs = expr,
}
}
not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
}
fn create_named_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
Ok(values
.into_iter()
.enumerate()
.map(|(i, value)| {
let args = if let SQLExpr::Named { expr, name } = value {
[
name.value.lit(),
self.sql_expr_to_logical_expr(
*expr,
input_schema,
planner_context,
)?,
]
} else {
[
format!("c{i}").lit(),
self.sql_expr_to_logical_expr(
value,
input_schema,
planner_context,
)?,
]
};
Ok(args)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect())
}
fn create_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Vec<Expr>> {
values
.into_iter()
.map(|value| {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
})
.collect::<Result<Vec<_>>>()
}
fn sql_in_list_to_expr(
&self,
expr: SQLExpr,
list: Vec<SQLExpr>,
negated: bool,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let list_expr = list
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
Ok(Expr::InList(InList::new(
Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
list_expr,
negated,
)))
}
#[allow(clippy::too_many_arguments)]
fn sql_like_to_expr(
&self,
negated: bool,
expr: SQLExpr,
pattern: SQLExpr,
escape_char: Option<String>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
case_insensitive: bool,
) -> Result<Expr> {
let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
let pattern_type = pattern.get_type(schema)?;
if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
return plan_err!("Invalid pattern in LIKE expression");
}
let escape_char = if let Some(char) = escape_char {
if char.len() != 1 {
return plan_err!("Invalid escape character in LIKE expression");
}
Some(char.chars().next().unwrap())
} else {
None
};
Ok(Expr::Like(Like::new(
negated,
Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
Box::new(pattern),
escape_char,
case_insensitive,
)))
}
fn sql_similarto_to_expr(
&self,
negated: bool,
expr: SQLExpr,
pattern: SQLExpr,
escape_char: Option<String>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
let pattern_type = pattern.get_type(schema)?;
if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
return plan_err!("Invalid pattern in SIMILAR TO expression");
}
let escape_char = if let Some(char) = escape_char {
if char.len() != 1 {
return plan_err!("Invalid escape character in SIMILAR TO expression");
}
Some(char.chars().next().unwrap())
} else {
None
};
Ok(Expr::SimilarTo(Like::new(
negated,
Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
Box::new(pattern),
escape_char,
false,
)))
}
fn sql_trim_to_expr(
&self,
expr: SQLExpr,
trim_where: Option<TrimWhereField>,
trim_what: Option<Box<SQLExpr>>,
trim_characters: Option<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
let args = match (trim_what, trim_characters) {
(Some(to_trim), None) => {
let to_trim =
self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
Ok(vec![arg, to_trim])
}
(None, Some(trim_characters)) => {
if let Some(first) = trim_characters.first() {
let to_trim = self.sql_expr_to_logical_expr(
first.clone(),
schema,
planner_context,
)?;
Ok(vec![arg, to_trim])
} else {
plan_err!("TRIM CHARACTERS cannot be empty")
}
}
(Some(_), Some(_)) => {
plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
}
(None, None) => Ok(vec![arg]),
}?;
let fun_name = match trim_where {
Some(TrimWhereField::Leading) => "ltrim",
Some(TrimWhereField::Trailing) => "rtrim",
Some(TrimWhereField::Both) => "btrim",
None => "trim",
};
let fun = self
.context_provider
.get_function_meta(fun_name)
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected '{fun_name}' function")
})?;
Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
}
fn sql_overlay_to_expr(
&self,
expr: SQLExpr,
overlay_what: SQLExpr,
overlay_from: SQLExpr,
overlay_for: Option<Box<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
let what_arg =
self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
let from_arg =
self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
let mut overlay_args = match overlay_for {
Some(for_expr) => {
let for_expr =
self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
vec![arg, what_arg, from_arg, for_expr]
}
None => vec![arg, what_arg, from_arg],
};
for planner in self.context_provider.get_expr_planners() {
match planner.plan_overlay(overlay_args)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => overlay_args = args,
}
}
not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use arrow::datatypes::{Field, Schema};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use crate::TableReference;
use super::*;
struct TestContextProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}
impl TestContextProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
"table1".to_string(),
create_table_source(vec![Field::new(
"column1".to_string(),
DataType::Utf8,
false,
)]),
);
Self {
options: Default::default(),
tables,
}
}
}
impl ContextProvider for TestContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(Arc::clone(table)),
_ => plan_err!("Table not found: {}", name.table()),
}
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}
fn options(&self) -> &ConfigOptions {
&self.options
}
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}
fn udf_names(&self) -> Vec<String> {
Vec::new()
}
fn udaf_names(&self) -> Vec<String> {
Vec::new()
}
fn udwf_names(&self) -> Vec<String> {
Vec::new()
}
}
fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
Arc::new(LogicalTableSource::new(Arc::new(
Schema::new_with_metadata(fields, HashMap::new()),
)))
}
macro_rules! test_stack_overflow {
($num_expr:expr) => {
paste::item! {
#[test]
fn [<test_stack_overflow_ $num_expr>]() {
let schema = DFSchema::empty();
let mut planner_context = PlannerContext::default();
let expr_str = (0..$num_expr)
.map(|i| format!("column1 = 'value{:?}'", i))
.collect::<Vec<String>>()
.join(" OR ");
let dialect = GenericDialect{};
let mut parser = Parser::new(&dialect)
.try_with_sql(expr_str.as_str())
.unwrap();
let sql_expr = parser.parse_expr().unwrap();
let context_provider = TestContextProvider::new();
let sql_to_rel = SqlToRel::new(&context_provider);
sql_to_rel.sql_expr_to_logical_expr(
sql_expr,
&schema,
&mut planner_context,
).unwrap();
}
}
};
}
test_stack_overflow!(64);
test_stack_overflow!(128);
test_stack_overflow!(256);
test_stack_overflow!(512);
test_stack_overflow!(1024);
test_stack_overflow!(2048);
test_stack_overflow!(4096);
test_stack_overflow!(8192);
}