use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Column, Result};
use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder, TableScan};
#[derive(Default)]
pub struct InlineTableScan;
impl InlineTableScan {
pub fn new() -> Self {
Self {}
}
}
impl AnalyzerRule for InlineTableScan {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform_up(analyze_internal).data()
}
fn name(&self) -> &str {
"inline_table_scan"
}
}
fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
let transformed_plan =
plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
let transformed_plan = transformed_plan.transform_data(|plan| {
match plan {
LogicalPlan::TableScan(TableScan {
table_name,
source,
projection,
filters,
..
}) if filters.is_empty() && source.get_logical_plan().is_some() => {
let sub_plan = source.get_logical_plan().unwrap();
let projection_exprs = generate_projection_expr(&projection, sub_plan)?;
LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
.alias(table_name)?
.build()
.map(Transformed::yes)
}
_ => Ok(Transformed::no(plan)),
}
})?;
Ok(transformed_plan)
}
fn generate_projection_expr(
projection: &Option<Vec<usize>>,
sub_plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let mut exprs = vec![];
if let Some(projection) = projection {
for i in projection {
exprs.push(Expr::Column(Column::from(
sub_plan.schema().qualified_field(*i),
)));
}
} else {
exprs.push(Expr::Wildcard { qualifier: None });
}
Ok(exprs)
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::test::assert_analyzed_plan_eq;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};
pub struct RawTableSource {}
impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]))
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}
pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}
#[test]
fn inline_table_scan() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
None,
)?;
let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
let expected = "Filter: x.a = Int32(1)\
\n SubqueryAlias: x\
\n Projection: y.a, y.b\
\n TableScan: y";
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
}
#[test]
fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
let scan = LogicalPlanBuilder::scan(
"x".to_string(),
Arc::new(CustomSource::new()),
Some(vec![0]),
)?;
let plan = scan.build()?;
let expected = "SubqueryAlias: x\
\n Projection: y.a\
\n TableScan: y";
assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan, expected)
}
}