datafusion_expr/logical_plan/
ddl.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{Expr, LogicalPlan, SortExpr, Volatility};
19use std::cmp::Ordering;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::{
23    fmt::{self, Display},
24    hash::{Hash, Hasher},
25};
26
27use crate::expr::Sort;
28use arrow::datatypes::DataType;
29use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion};
30use datafusion_common::{
31    Constraints, DFSchemaRef, Result, SchemaReference, TableReference,
32};
33use sqlparser::ast::Ident;
34
35/// Various types of DDL  (CREATE / DROP) catalog manipulation
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
37pub enum DdlStatement {
38    /// Creates an external table.
39    CreateExternalTable(CreateExternalTable),
40    /// Creates an in memory table.
41    CreateMemoryTable(CreateMemoryTable),
42    /// Creates a new view.
43    CreateView(CreateView),
44    /// Creates a new catalog schema.
45    CreateCatalogSchema(CreateCatalogSchema),
46    /// Creates a new catalog (aka "Database").
47    CreateCatalog(CreateCatalog),
48    /// Creates a new index.
49    CreateIndex(CreateIndex),
50    /// Drops a table.
51    DropTable(DropTable),
52    /// Drops a view.
53    DropView(DropView),
54    /// Drops a catalog schema
55    DropCatalogSchema(DropCatalogSchema),
56    /// Create function statement
57    CreateFunction(CreateFunction),
58    /// Drop function statement
59    DropFunction(DropFunction),
60}
61
62impl DdlStatement {
63    /// Get a reference to the logical plan's schema
64    pub fn schema(&self) -> &DFSchemaRef {
65        match self {
66            DdlStatement::CreateExternalTable(CreateExternalTable { schema, .. }) => {
67                schema
68            }
69            DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. })
70            | DdlStatement::CreateView(CreateView { input, .. }) => input.schema(),
71            DdlStatement::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
72                schema
73            }
74            DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema,
75            DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema,
76            DdlStatement::DropTable(DropTable { schema, .. }) => schema,
77            DdlStatement::DropView(DropView { schema, .. }) => schema,
78            DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
79            DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema,
80            DdlStatement::DropFunction(DropFunction { schema, .. }) => schema,
81        }
82    }
83
84    /// Return a descriptive string describing the type of this
85    /// [`DdlStatement`]
86    pub fn name(&self) -> &str {
87        match self {
88            DdlStatement::CreateExternalTable(_) => "CreateExternalTable",
89            DdlStatement::CreateMemoryTable(_) => "CreateMemoryTable",
90            DdlStatement::CreateView(_) => "CreateView",
91            DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema",
92            DdlStatement::CreateCatalog(_) => "CreateCatalog",
93            DdlStatement::CreateIndex(_) => "CreateIndex",
94            DdlStatement::DropTable(_) => "DropTable",
95            DdlStatement::DropView(_) => "DropView",
96            DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema",
97            DdlStatement::CreateFunction(_) => "CreateFunction",
98            DdlStatement::DropFunction(_) => "DropFunction",
99        }
100    }
101
102    /// Return all inputs for this plan
103    pub fn inputs(&self) -> Vec<&LogicalPlan> {
104        match self {
105            DdlStatement::CreateExternalTable(_) => vec![],
106            DdlStatement::CreateCatalogSchema(_) => vec![],
107            DdlStatement::CreateCatalog(_) => vec![],
108            DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
109                vec![input]
110            }
111            DdlStatement::CreateView(CreateView { input, .. }) => vec![input],
112            DdlStatement::CreateIndex(_) => vec![],
113            DdlStatement::DropTable(_) => vec![],
114            DdlStatement::DropView(_) => vec![],
115            DdlStatement::DropCatalogSchema(_) => vec![],
116            DdlStatement::CreateFunction(_) => vec![],
117            DdlStatement::DropFunction(_) => vec![],
118        }
119    }
120
121    /// Return a `format`able structure with the a human readable
122    /// description of this LogicalPlan node per node, not including
123    /// children.
124    ///
125    /// See [crate::LogicalPlan::display] for an example
126    pub fn display(&self) -> impl Display + '_ {
127        struct Wrapper<'a>(&'a DdlStatement);
128        impl Display for Wrapper<'_> {
129            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130                match self.0 {
131                    DdlStatement::CreateExternalTable(CreateExternalTable {
132                        ref name,
133                        constraints,
134                        ..
135                    }) => {
136                        if constraints.is_empty() {
137                            write!(f, "CreateExternalTable: {name:?}")
138                        } else {
139                            write!(f, "CreateExternalTable: {name:?} {constraints}")
140                        }
141                    }
142                    DdlStatement::CreateMemoryTable(CreateMemoryTable {
143                        name,
144                        constraints,
145                        ..
146                    }) => {
147                        if constraints.is_empty() {
148                            write!(f, "CreateMemoryTable: {name:?}")
149                        } else {
150                            write!(f, "CreateMemoryTable: {name:?} {constraints}")
151                        }
152                    }
153                    DdlStatement::CreateView(CreateView { name, .. }) => {
154                        write!(f, "CreateView: {name:?}")
155                    }
156                    DdlStatement::CreateCatalogSchema(CreateCatalogSchema {
157                        schema_name,
158                        ..
159                    }) => {
160                        write!(f, "CreateCatalogSchema: {schema_name:?}")
161                    }
162                    DdlStatement::CreateCatalog(CreateCatalog {
163                        catalog_name, ..
164                    }) => {
165                        write!(f, "CreateCatalog: {catalog_name:?}")
166                    }
167                    DdlStatement::CreateIndex(CreateIndex { name, .. }) => {
168                        write!(f, "CreateIndex: {name:?}")
169                    }
170                    DdlStatement::DropTable(DropTable {
171                        name, if_exists, ..
172                    }) => {
173                        write!(f, "DropTable: {name:?} if not exist:={if_exists}")
174                    }
175                    DdlStatement::DropView(DropView {
176                        name, if_exists, ..
177                    }) => {
178                        write!(f, "DropView: {name:?} if not exist:={if_exists}")
179                    }
180                    DdlStatement::DropCatalogSchema(DropCatalogSchema {
181                        name,
182                        if_exists,
183                        cascade,
184                        ..
185                    }) => {
186                        write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}")
187                    }
188                    DdlStatement::CreateFunction(CreateFunction { name, .. }) => {
189                        write!(f, "CreateFunction: name {name:?}")
190                    }
191                    DdlStatement::DropFunction(DropFunction { name, .. }) => {
192                        write!(f, "CreateFunction: name {name:?}")
193                    }
194                }
195            }
196        }
197        Wrapper(self)
198    }
199}
200
201/// Creates an external table.
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct CreateExternalTable {
204    /// The table schema
205    pub schema: DFSchemaRef,
206    /// The table name
207    pub name: TableReference,
208    /// The physical location
209    pub location: String,
210    /// The file type of physical file
211    pub file_type: String,
212    /// Partition Columns
213    pub table_partition_cols: Vec<String>,
214    /// Option to not error if table already exists
215    pub if_not_exists: bool,
216    /// Whether the table is a temporary table
217    pub temporary: bool,
218    /// SQL used to create the table, if available
219    pub definition: Option<String>,
220    /// Order expressions supplied by user
221    pub order_exprs: Vec<Vec<Sort>>,
222    /// Whether the table is an infinite streams
223    pub unbounded: bool,
224    /// Table(provider) specific options
225    pub options: HashMap<String, String>,
226    /// The list of constraints in the schema, such as primary key, unique, etc.
227    pub constraints: Constraints,
228    /// Default values for columns
229    pub column_defaults: HashMap<String, Expr>,
230}
231
232// Hashing refers to a subset of fields considered in PartialEq.
233impl Hash for CreateExternalTable {
234    fn hash<H: Hasher>(&self, state: &mut H) {
235        self.schema.hash(state);
236        self.name.hash(state);
237        self.location.hash(state);
238        self.file_type.hash(state);
239        self.table_partition_cols.hash(state);
240        self.if_not_exists.hash(state);
241        self.definition.hash(state);
242        self.order_exprs.hash(state);
243        self.unbounded.hash(state);
244        self.options.len().hash(state); // HashMap is not hashable
245    }
246}
247
248// Manual implementation needed because of `schema`, `options`, and `column_defaults` fields.
249// Comparison excludes these fields.
250impl PartialOrd for CreateExternalTable {
251    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
252        #[derive(PartialEq, PartialOrd)]
253        struct ComparableCreateExternalTable<'a> {
254            /// The table name
255            pub name: &'a TableReference,
256            /// The physical location
257            pub location: &'a String,
258            /// The file type of physical file
259            pub file_type: &'a String,
260            /// Partition Columns
261            pub table_partition_cols: &'a Vec<String>,
262            /// Option to not error if table already exists
263            pub if_not_exists: &'a bool,
264            /// SQL used to create the table, if available
265            pub definition: &'a Option<String>,
266            /// Order expressions supplied by user
267            pub order_exprs: &'a Vec<Vec<Sort>>,
268            /// Whether the table is an infinite streams
269            pub unbounded: &'a bool,
270            /// The list of constraints in the schema, such as primary key, unique, etc.
271            pub constraints: &'a Constraints,
272        }
273        let comparable_self = ComparableCreateExternalTable {
274            name: &self.name,
275            location: &self.location,
276            file_type: &self.file_type,
277            table_partition_cols: &self.table_partition_cols,
278            if_not_exists: &self.if_not_exists,
279            definition: &self.definition,
280            order_exprs: &self.order_exprs,
281            unbounded: &self.unbounded,
282            constraints: &self.constraints,
283        };
284        let comparable_other = ComparableCreateExternalTable {
285            name: &other.name,
286            location: &other.location,
287            file_type: &other.file_type,
288            table_partition_cols: &other.table_partition_cols,
289            if_not_exists: &other.if_not_exists,
290            definition: &other.definition,
291            order_exprs: &other.order_exprs,
292            unbounded: &other.unbounded,
293            constraints: &other.constraints,
294        };
295        comparable_self.partial_cmp(&comparable_other)
296    }
297}
298
299/// Creates an in memory table.
300#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
301pub struct CreateMemoryTable {
302    /// The table name
303    pub name: TableReference,
304    /// The list of constraints in the schema, such as primary key, unique, etc.
305    pub constraints: Constraints,
306    /// The logical plan
307    pub input: Arc<LogicalPlan>,
308    /// Option to not error if table already exists
309    pub if_not_exists: bool,
310    /// Option to replace table content if table already exists
311    pub or_replace: bool,
312    /// Default values for columns
313    pub column_defaults: Vec<(String, Expr)>,
314    /// Whether the table is `TableType::Temporary`
315    pub temporary: bool,
316}
317
318/// Creates a view.
319#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
320pub struct CreateView {
321    /// The table name
322    pub name: TableReference,
323    /// The logical plan
324    pub input: Arc<LogicalPlan>,
325    /// Option to not error if table already exists
326    pub or_replace: bool,
327    /// SQL used to create the view, if available
328    pub definition: Option<String>,
329    /// Whether the view is ephemeral
330    pub temporary: bool,
331}
332
333/// Creates a catalog (aka "Database").
334#[derive(Debug, Clone, PartialEq, Eq, Hash)]
335pub struct CreateCatalog {
336    /// The catalog name
337    pub catalog_name: String,
338    /// Do nothing (except issuing a notice) if a schema with the same name already exists
339    pub if_not_exists: bool,
340    /// Empty schema
341    pub schema: DFSchemaRef,
342}
343
344// Manual implementation needed because of `schema` field. Comparison excludes this field.
345impl PartialOrd for CreateCatalog {
346    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
347        match self.catalog_name.partial_cmp(&other.catalog_name) {
348            Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
349            cmp => cmp,
350        }
351    }
352}
353
354/// Creates a schema.
355#[derive(Debug, Clone, PartialEq, Eq, Hash)]
356pub struct CreateCatalogSchema {
357    /// The table schema
358    pub schema_name: String,
359    /// Do nothing (except issuing a notice) if a schema with the same name already exists
360    pub if_not_exists: bool,
361    /// Empty schema
362    pub schema: DFSchemaRef,
363}
364
365// Manual implementation needed because of `schema` field. Comparison excludes this field.
366impl PartialOrd for CreateCatalogSchema {
367    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
368        match self.schema_name.partial_cmp(&other.schema_name) {
369            Some(Ordering::Equal) => self.if_not_exists.partial_cmp(&other.if_not_exists),
370            cmp => cmp,
371        }
372    }
373}
374
375/// Drops a table.
376#[derive(Debug, Clone, PartialEq, Eq, Hash)]
377pub struct DropTable {
378    /// The table name
379    pub name: TableReference,
380    /// If the table exists
381    pub if_exists: bool,
382    /// Dummy schema
383    pub schema: DFSchemaRef,
384}
385
386// Manual implementation needed because of `schema` field. Comparison excludes this field.
387impl PartialOrd for DropTable {
388    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
389        match self.name.partial_cmp(&other.name) {
390            Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
391            cmp => cmp,
392        }
393    }
394}
395
396/// Drops a view.
397#[derive(Debug, Clone, PartialEq, Eq, Hash)]
398pub struct DropView {
399    /// The view name
400    pub name: TableReference,
401    /// If the view exists
402    pub if_exists: bool,
403    /// Dummy schema
404    pub schema: DFSchemaRef,
405}
406
407// Manual implementation needed because of `schema` field. Comparison excludes this field.
408impl PartialOrd for DropView {
409    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
410        match self.name.partial_cmp(&other.name) {
411            Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
412            cmp => cmp,
413        }
414    }
415}
416
417/// Drops a schema
418#[derive(Debug, Clone, PartialEq, Eq, Hash)]
419pub struct DropCatalogSchema {
420    /// The schema name
421    pub name: SchemaReference,
422    /// If the schema exists
423    pub if_exists: bool,
424    /// Whether drop should cascade
425    pub cascade: bool,
426    /// Dummy schema
427    pub schema: DFSchemaRef,
428}
429
430// Manual implementation needed because of `schema` field. Comparison excludes this field.
431impl PartialOrd for DropCatalogSchema {
432    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
433        match self.name.partial_cmp(&other.name) {
434            Some(Ordering::Equal) => match self.if_exists.partial_cmp(&other.if_exists) {
435                Some(Ordering::Equal) => self.cascade.partial_cmp(&other.cascade),
436                cmp => cmp,
437            },
438            cmp => cmp,
439        }
440    }
441}
442
443/// Arguments passed to `CREATE FUNCTION`
444///
445/// Note this meant to be the same as from sqlparser's [`sqlparser::ast::Statement::CreateFunction`]
446#[derive(Clone, PartialEq, Eq, Hash, Debug)]
447pub struct CreateFunction {
448    // TODO: There is open question should we expose sqlparser types or redefine them here?
449    //       At the moment it make more sense to expose sqlparser types and leave
450    //       user to convert them as needed
451    pub or_replace: bool,
452    pub temporary: bool,
453    pub name: String,
454    pub args: Option<Vec<OperateFunctionArg>>,
455    pub return_type: Option<DataType>,
456    pub params: CreateFunctionBody,
457    /// Dummy schema
458    pub schema: DFSchemaRef,
459}
460
461// Manual implementation needed because of `schema` field. Comparison excludes this field.
462impl PartialOrd for CreateFunction {
463    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
464        #[derive(PartialEq, PartialOrd)]
465        struct ComparableCreateFunction<'a> {
466            pub or_replace: &'a bool,
467            pub temporary: &'a bool,
468            pub name: &'a String,
469            pub args: &'a Option<Vec<OperateFunctionArg>>,
470            pub return_type: &'a Option<DataType>,
471            pub params: &'a CreateFunctionBody,
472        }
473        let comparable_self = ComparableCreateFunction {
474            or_replace: &self.or_replace,
475            temporary: &self.temporary,
476            name: &self.name,
477            args: &self.args,
478            return_type: &self.return_type,
479            params: &self.params,
480        };
481        let comparable_other = ComparableCreateFunction {
482            or_replace: &other.or_replace,
483            temporary: &other.temporary,
484            name: &other.name,
485            args: &other.args,
486            return_type: &other.return_type,
487            params: &other.params,
488        };
489        comparable_self.partial_cmp(&comparable_other)
490    }
491}
492
493#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
494pub struct OperateFunctionArg {
495    // TODO: figure out how to support mode
496    // pub mode: Option<ArgMode>,
497    pub name: Option<Ident>,
498    pub data_type: DataType,
499    pub default_expr: Option<Expr>,
500}
501
502impl<'a> TreeNodeContainer<'a, Expr> for OperateFunctionArg {
503    fn apply_elements<F: FnMut(&'a Expr) -> Result<TreeNodeRecursion>>(
504        &'a self,
505        f: F,
506    ) -> Result<TreeNodeRecursion> {
507        self.default_expr.apply_elements(f)
508    }
509
510    fn map_elements<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
511        self,
512        f: F,
513    ) -> Result<Transformed<Self>> {
514        self.default_expr.map_elements(f)?.map_data(|default_expr| {
515            Ok(Self {
516                default_expr,
517                ..self
518            })
519        })
520    }
521}
522
523#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
524pub struct CreateFunctionBody {
525    /// LANGUAGE lang_name
526    pub language: Option<Ident>,
527    /// IMMUTABLE | STABLE | VOLATILE
528    pub behavior: Option<Volatility>,
529    /// RETURN or AS function body
530    pub function_body: Option<Expr>,
531}
532
533impl<'a> TreeNodeContainer<'a, Expr> for CreateFunctionBody {
534    fn apply_elements<F: FnMut(&'a Expr) -> Result<TreeNodeRecursion>>(
535        &'a self,
536        f: F,
537    ) -> Result<TreeNodeRecursion> {
538        self.function_body.apply_elements(f)
539    }
540
541    fn map_elements<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
542        self,
543        f: F,
544    ) -> Result<Transformed<Self>> {
545        self.function_body
546            .map_elements(f)?
547            .map_data(|function_body| {
548                Ok(Self {
549                    function_body,
550                    ..self
551                })
552            })
553    }
554}
555
556#[derive(Clone, PartialEq, Eq, Hash, Debug)]
557pub struct DropFunction {
558    pub name: String,
559    pub if_exists: bool,
560    pub schema: DFSchemaRef,
561}
562
563impl PartialOrd for DropFunction {
564    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
565        match self.name.partial_cmp(&other.name) {
566            Some(Ordering::Equal) => self.if_exists.partial_cmp(&other.if_exists),
567            cmp => cmp,
568        }
569    }
570}
571
572#[derive(Clone, PartialEq, Eq, Hash, Debug)]
573pub struct CreateIndex {
574    pub name: Option<String>,
575    pub table: TableReference,
576    pub using: Option<String>,
577    pub columns: Vec<SortExpr>,
578    pub unique: bool,
579    pub if_not_exists: bool,
580    pub schema: DFSchemaRef,
581}
582
583// Manual implementation needed because of `schema` field. Comparison excludes this field.
584impl PartialOrd for CreateIndex {
585    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
586        #[derive(PartialEq, PartialOrd)]
587        struct ComparableCreateIndex<'a> {
588            pub name: &'a Option<String>,
589            pub table: &'a TableReference,
590            pub using: &'a Option<String>,
591            pub columns: &'a Vec<SortExpr>,
592            pub unique: &'a bool,
593            pub if_not_exists: &'a bool,
594        }
595        let comparable_self = ComparableCreateIndex {
596            name: &self.name,
597            table: &self.table,
598            using: &self.using,
599            columns: &self.columns,
600            unique: &self.unique,
601            if_not_exists: &self.if_not_exists,
602        };
603        let comparable_other = ComparableCreateIndex {
604            name: &other.name,
605            table: &other.table,
606            using: &other.using,
607            columns: &other.columns,
608            unique: &other.unique,
609            if_not_exists: &other.if_not_exists,
610        };
611        comparable_self.partial_cmp(&comparable_other)
612    }
613}
614
615#[cfg(test)]
616mod test {
617    use crate::{CreateCatalog, DdlStatement, DropView};
618    use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
619    use std::cmp::Ordering;
620
621    #[test]
622    fn test_partial_ord() {
623        let catalog = DdlStatement::CreateCatalog(CreateCatalog {
624            catalog_name: "name".to_string(),
625            if_not_exists: false,
626            schema: DFSchemaRef::new(DFSchema::empty()),
627        });
628        let catalog_2 = DdlStatement::CreateCatalog(CreateCatalog {
629            catalog_name: "name".to_string(),
630            if_not_exists: true,
631            schema: DFSchemaRef::new(DFSchema::empty()),
632        });
633
634        assert_eq!(catalog.partial_cmp(&catalog_2), Some(Ordering::Less));
635
636        let drop_view = DdlStatement::DropView(DropView {
637            name: TableReference::from("table"),
638            if_exists: false,
639            schema: DFSchemaRef::new(DFSchema::empty()),
640        });
641
642        assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
643    }
644}