use crate::{Expr, LogicalPlan, SortExpr, Volatility};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;
use std::{
fmt::{self, Display},
hash::{Hash, Hasher},
};
use crate::expr::Sort;
use arrow::datatypes::DataType;
use datafusion_common::{Constraints, DFSchemaRef, SchemaReference, TableReference};
use sqlparser::ast::Ident;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum DdlStatement {
CreateExternalTable(CreateExternalTable),
CreateMemoryTable(CreateMemoryTable),
CreateView(CreateView),
CreateCatalogSchema(CreateCatalogSchema),
CreateCatalog(CreateCatalog),
CreateIndex(CreateIndex),
DropTable(DropTable),
DropView(DropView),
DropCatalogSchema(DropCatalogSchema),
CreateFunction(CreateFunction),
DropFunction(DropFunction),
}
impl DdlStatement {
pub fn schema(&self) -> &DFSchemaRef {
match self {
DdlStatement::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. })
| DdlStatement::CreateView(CreateView { input, .. }) => input.schema(),
DdlStatement::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema,
DdlStatement::DropFunction(DropFunction { schema, .. }) => schema,
}
}
pub fn name(&self) -> &str {
match self {
DdlStatement::CreateExternalTable(_) => "CreateExternalTable",
DdlStatement::CreateMemoryTable(_) => "CreateMemoryTable",
DdlStatement::CreateView(_) => "CreateView",
DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
DdlStatement::CreateCatalog(_) => "CreateCatalog",
DdlStatement::CreateIndex(_) => "CreateIndex",
DdlStatement::DropTable(_) => "DropTable",
DdlStatement::DropView(_) => "DropView",
DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
DdlStatement::CreateFunction(_) => "CreateFunction",
DdlStatement::DropFunction(_) => "DropFunction",
}
}
pub fn inputs(&self) -> Vec<&LogicalPlan> {
match self {
DdlStatement::CreateExternalTable(_) => vec![],
DdlStatement::CreateCatalogSchema(_) => vec![],
DdlStatement::CreateCatalog(_) => vec![],
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
vec![input]
}
DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
DdlStatement::CreateIndex(_) => vec![],
DdlStatement::DropTable(_) => vec![],
DdlStatement::DropView(_) => vec![],
DdlStatement::DropCatalogSchema(_) => vec![],
DdlStatement::CreateFunction(_) => vec![],
DdlStatement::DropFunction(_) => vec![],
}
}
pub fn display(&self) -> impl Display + '_ {
struct Wrapper<'a>(&'a DdlStatement);
impl<'a> Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
DdlStatement::CreateExternalTable(CreateExternalTable {
ref name,
constraints,
..
}) => {
write!(f, "CreateExternalTable: {name:?}{constraints}")
}
DdlStatement::CreateMemoryTable(CreateMemoryTable {
name,
constraints,
..
}) => {
write!(f, "CreateMemoryTable: {name:?}{constraints}")
}
DdlStatement::CreateView(CreateView { name, .. }) => {
write!(f, "CreateView: {name:?}")
}
DdlStatement::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
..
}) => {
write!(f, "CreateCatalogSchema: {schema_name:?}")
}
DdlStatement::CreateCatalog(CreateCatalog {
catalog_name, ..
}) => {
write!(f, "CreateCatalog: {catalog_name:?}")
}
DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
write!(f, "CreateIndex: {name:?}")
}
DdlStatement::DropTable(DropTable {
name, if_exists, ..
}) => {
write!(f, "DropTable: {name:?} if not exist:={if_exists}")
}
DdlStatement::DropView(DropView {
name, if_exists, ..
}) => {
write!(f, "DropView: {name:?} if not exist:={if_exists}")
}
DdlStatement::DropCatalogSchema(DropCatalogSchema {
name,
if_exists,
cascade,
..
}) => {
write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
}
DdlStatement::CreateFunction(CreateFunction { name, .. }) => {
write!(f, "CreateFunction: name {name:?}")
}
DdlStatement::DropFunction(DropFunction { name, .. }) => {
write!(f, "CreateFunction: name {name:?}")
}
}
}
}
Wrapper(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateExternalTable {
pub schema: DFSchemaRef,
pub name: TableReference,
pub location: String,
pub file_type: String,
pub table_partition_cols: Vec<String>,
pub if_not_exists: bool,
pub temporary: bool,
pub definition: Option<String>,
pub order_exprs: Vec<Vec<Sort>>,
pub unbounded: bool,
pub options: HashMap<String, String>,
pub constraints: Constraints,
pub column_defaults: HashMap<String, Expr>,
}
impl Hash for CreateExternalTable {
fn hash<H: Hasher>(&self, state: &mut H) {
self.schema.hash(state);
self.name.hash(state);
self.location.hash(state);
self.file_type.hash(state);
self.table_partition_cols.hash(state);
self.if_not_exists.hash(state);
self.definition.hash(state);
self.order_exprs.hash(state);
self.unbounded.hash(state);
self.options.len().hash(state); }
}
impl PartialOrd for CreateExternalTable {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
#[derive(PartialEq, PartialOrd)]
struct ComparableCreateExternalTable<'a> {
pub name: &'a TableReference,
pub location: &'a String,
pub file_type: &'a String,
pub table_partition_cols: &'a Vec<String>,
pub if_not_exists: &'a bool,
pub definition: &'a Option<String>,
pub order_exprs: &'a Vec<Vec<Sort>>,
pub unbounded: &'a bool,
pub constraints: &'a Constraints,
}
let comparable_self = ComparableCreateExternalTable {
name: &self.name,
location: &self.location,
file_type: &self.file_type,
table_partition_cols: &self.table_partition_cols,
if_not_exists: &self.if_not_exists,
definition: &self.definition,
order_exprs: &self.order_exprs,
unbounded: &self.unbounded,
constraints: &self.constraints,
};
let comparable_other = ComparableCreateExternalTable {
name: &other.name,
location: &other.location,
file_type: &other.file_type,
table_partition_cols: &other.table_partition_cols,
if_not_exists: &other.if_not_exists,
definition: &other.definition,
order_exprs: &other.order_exprs,
unbounded: &other.unbounded,
constraints: &other.constraints,
};
comparable_self.partial_cmp(&comparable_other)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct CreateMemoryTable {
pub name: TableReference,
pub constraints: Constraints,
pub input: Arc<LogicalPlan>,
pub if_not_exists: bool,
pub or_replace: bool,
pub column_defaults: Vec<(String, Expr)>,
pub temporary: bool,
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
pub struct CreateView {
pub name: TableReference,
pub input: Arc<LogicalPlan>,
pub or_replace: bool,
pub definition: Option<String>,
pub temporary: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateCatalog {
pub catalog_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for CreateCatalog {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.catalog_name.partial_cmp(&other.catalog_name) {
Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
cmp => cmp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateCatalogSchema {
pub schema_name: String,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for CreateCatalogSchema {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.schema_name.partial_cmp(&other.schema_name) {
Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
cmp => cmp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropTable {
pub name: TableReference,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for DropTable {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
cmp => cmp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropView {
pub name: TableReference,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for DropView {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
cmp => cmp,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DropCatalogSchema {
pub name: SchemaReference,
pub if_exists: bool,
pub cascade: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for DropCatalogSchema {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => match self.if_exists.partial_cmp(&other.if_exists) {
Some(Ordering::Equal) => self.cascade.partial_cmp(&other.cascade),
cmp => cmp,
},
cmp => cmp,
}
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CreateFunction {
pub or_replace: bool,
pub temporary: bool,
pub name: String,
pub args: Option<Vec<OperateFunctionArg>>,
pub return_type: Option<DataType>,
pub params: CreateFunctionBody,
pub schema: DFSchemaRef,
}
impl PartialOrd for CreateFunction {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
#[derive(PartialEq, PartialOrd)]
struct ComparableCreateFunction<'a> {
pub or_replace: &'a bool,
pub temporary: &'a bool,
pub name: &'a String,
pub args: &'a Option<Vec<OperateFunctionArg>>,
pub return_type: &'a Option<DataType>,
pub params: &'a CreateFunctionBody,
}
let comparable_self = ComparableCreateFunction {
or_replace: &self.or_replace,
temporary: &self.temporary,
name: &self.name,
args: &self.args,
return_type: &self.return_type,
params: &self.params,
};
let comparable_other = ComparableCreateFunction {
or_replace: &other.or_replace,
temporary: &other.temporary,
name: &other.name,
args: &other.args,
return_type: &other.return_type,
params: &other.params,
};
comparable_self.partial_cmp(&comparable_other)
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct OperateFunctionArg {
pub name: Option<Ident>,
pub data_type: DataType,
pub default_expr: Option<Expr>,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct CreateFunctionBody {
pub language: Option<Ident>,
pub behavior: Option<Volatility>,
pub function_body: Option<Expr>,
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct DropFunction {
pub name: String,
pub if_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for DropFunction {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.name.partial_cmp(&other.name) {
Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
cmp => cmp,
}
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CreateIndex {
pub name: Option<String>,
pub table: TableReference,
pub using: Option<String>,
pub columns: Vec<SortExpr>,
pub unique: bool,
pub if_not_exists: bool,
pub schema: DFSchemaRef,
}
impl PartialOrd for CreateIndex {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
#[derive(PartialEq, PartialOrd)]
struct ComparableCreateIndex<'a> {
pub name: &'a Option<String>,
pub table: &'a TableReference,
pub using: &'a Option<String>,
pub columns: &'a Vec<SortExpr>,
pub unique: &'a bool,
pub if_not_exists: &'a bool,
}
let comparable_self = ComparableCreateIndex {
name: &self.name,
table: &self.table,
using: &self.using,
columns: &self.columns,
unique: &self.unique,
if_not_exists: &self.if_not_exists,
};
let comparable_other = ComparableCreateIndex {
name: &other.name,
table: &other.table,
using: &other.using,
columns: &other.columns,
unique: &other.unique,
if_not_exists: &other.if_not_exists,
};
comparable_self.partial_cmp(&comparable_other)
}
}
#[cfg(test)]
mod test {
use crate::{CreateCatalog, DdlStatement, DropView};
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
use std::cmp::Ordering;
#[test]
fn test_partial_ord() {
let catalog = DdlStatement::CreateCatalog(CreateCatalog {
catalog_name: "name".to_string(),
if_not_exists: false,
schema: DFSchemaRef::new(DFSchema::empty()),
});
let catalog_2 = DdlStatement::CreateCatalog(CreateCatalog {
catalog_name: "name".to_string(),
if_not_exists: true,
schema: DFSchemaRef::new(DFSchema::empty()),
});
assert_eq!(catalog.partial_cmp(&catalog_2), Some(Ordering::Less));
let drop_view = DdlStatement::DropView(DropView {
name: TableReference::from("table"),
if_exists: false,
schema: DFSchemaRef::new(DFSchema::empty()),
});
assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
}
}