use std::sync::Arc;
use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::{
logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan,
};
#[derive(Default)]
pub struct InlineTableScan;
impl InlineTableScan {
pub fn new() -> Self {
Self {}
}
}
impl AnalyzerRule for InlineTableScan {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform_up(&analyze_internal)
}
fn name(&self) -> &str {
"inline_table_scan"
}
}
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
Ok(match plan {
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
let plan = LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
.alias(table_name)?
.build()?;
Transformed::Yes(plan)
}
LogicalPlan::Filter(filter) => {
let new_expr = filter.predicate.transform(&rewrite_subquery)?;
Transformed::Yes(LogicalPlan::Filter(Filter::try_new(
new_expr,
filter.input,
)?))
}
_ => Transformed::No(plan),
})
}
fn rewrite_subquery(expr: Expr) -> Result<Transformed<Expr>> {
match expr {
Expr::Exists(Exists { subquery, negated }) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal)?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::Yes(Expr::Exists(Exists { subquery, negated })))
}
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal)?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::Yes(Expr::InSubquery(InSubquery::new(
expr, subquery, negated,
))))
}
Expr::ScalarSubquery(subquery) => {
let plan = subquery.subquery.as_ref().clone();
let new_plan = plan.transform_up(&analyze_internal)?;
let subquery = subquery.with_plan(Arc::new(new_plan));
Ok(Transformed::Yes(Expr::ScalarSubquery(subquery)))
}
_ => Ok(Transformed::No(expr)),
}
}
fn generate_projection_expr(
projection: &Option<Vec<usize>>,
sub_plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let mut exprs = vec![];
if let Some(projection) = projection {
for i in projection {
exprs.push(Expr::Column(
sub_plan.schema().fields()[*i].qualified_column(),
));
}
} else {
exprs.push(Expr::Wildcard);
}
Ok(exprs)
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::test::assert_analyzed_plan_eq;
pub struct RawTableSource {}
impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]))
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}
pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}
#[test]
fn inline_table_scan() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
None,
)?;
let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
let expected = "Filter: x.a = Int32(1)\
\n SubqueryAlias: x\
\n Projection: y.a, y.b\
\n TableScan: y";
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan, expected)
}
#[test]
fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
Some(vec![0]),
)?;
let plan = scan.build()?;
let expected = "SubqueryAlias: x\
\n Projection: y.a\
\n TableScan: y";
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), &plan, expected)
}
}