datafusion_expr/logical_plan/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33    /// The relation that determines the tuples to write to the output file(s)
34    pub input: Arc<LogicalPlan>,
35    /// The location to write the file(s)
36    pub output_url: String,
37    /// Determines which, if any, columns should be used for hive-style partitioned writes
38    pub partition_by: Vec<String>,
39    /// File type trait
40    pub file_type: Arc<dyn FileType>,
41    /// SQL Options that can affect the formats
42    pub options: HashMap<String, String>,
43}
44
45impl Debug for CopyTo {
46    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47        f.debug_struct("CopyTo")
48            .field("input", &self.input)
49            .field("output_url", &self.output_url)
50            .field("partition_by", &self.partition_by)
51            .field("file_type", &"...")
52            .field("options", &self.options)
53            .finish_non_exhaustive()
54    }
55}
56
57// Implement PartialEq manually
58impl PartialEq for CopyTo {
59    fn eq(&self, other: &Self) -> bool {
60        self.input == other.input && self.output_url == other.output_url
61    }
62}
63
64// Implement Eq (no need for additional logic over PartialEq)
65impl Eq for CopyTo {}
66
67// Manual implementation needed because of `file_type` and `options` fields.
68// Comparison excludes these field.
69impl PartialOrd for CopyTo {
70    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
71        match self.input.partial_cmp(&other.input) {
72            Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
73            {
74                Some(Ordering::Equal) => {
75                    self.partition_by.partial_cmp(&other.partition_by)
76                }
77                cmp => cmp,
78            },
79            cmp => cmp,
80        }
81    }
82}
83
84// Implement Hash manually
85impl Hash for CopyTo {
86    fn hash<H: Hasher>(&self, state: &mut H) {
87        self.input.hash(state);
88        self.output_url.hash(state);
89    }
90}
91
92/// The operator that modifies the content of a database (adapted from
93/// substrait WriteRel)
94#[derive(Clone)]
95pub struct DmlStatement {
96    /// The table name
97    pub table_name: TableReference,
98    /// this is target table to insert into
99    pub target: Arc<dyn TableSource>,
100    /// The type of operation to perform
101    pub op: WriteOp,
102    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
103    pub input: Arc<LogicalPlan>,
104    /// The schema of the output relation
105    pub output_schema: DFSchemaRef,
106}
107impl Eq for DmlStatement {}
108impl Hash for DmlStatement {
109    fn hash<H: Hasher>(&self, state: &mut H) {
110        self.table_name.hash(state);
111        self.target.schema().hash(state);
112        self.op.hash(state);
113        self.input.hash(state);
114        self.output_schema.hash(state);
115    }
116}
117
118impl PartialEq for DmlStatement {
119    fn eq(&self, other: &Self) -> bool {
120        self.table_name == other.table_name
121            && self.target.schema() == other.target.schema()
122            && self.op == other.op
123            && self.input == other.input
124            && self.output_schema == other.output_schema
125    }
126}
127
128impl Debug for DmlStatement {
129    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130        f.debug_struct("DmlStatement")
131            .field("table_name", &self.table_name)
132            .field("target", &"...")
133            .field("target_schema", &self.target.schema())
134            .field("op", &self.op)
135            .field("input", &self.input)
136            .field("output_schema", &self.output_schema)
137            .finish()
138    }
139}
140
141impl DmlStatement {
142    /// Creates a new DML statement with the output schema set to a single `count` column.
143    pub fn new(
144        table_name: TableReference,
145        target: Arc<dyn TableSource>,
146        op: WriteOp,
147        input: Arc<LogicalPlan>,
148    ) -> Self {
149        Self {
150            table_name,
151            target,
152            op,
153            input,
154
155            // The output schema is always a single column with the number of rows affected
156            output_schema: make_count_schema(),
157        }
158    }
159
160    /// Return a descriptive name of this [`DmlStatement`]
161    pub fn name(&self) -> &str {
162        self.op.name()
163    }
164}
165
166// Manual implementation needed because of `table_schema` and `output_schema` fields.
167// Comparison excludes these fields.
168impl PartialOrd for DmlStatement {
169    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
170        match self.table_name.partial_cmp(&other.table_name) {
171            Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
172                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
173                cmp => cmp,
174            },
175            cmp => cmp,
176        }
177    }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
181pub enum WriteOp {
182    Insert(InsertOp),
183    Delete,
184    Update,
185    Ctas,
186}
187
188impl WriteOp {
189    /// Return a descriptive name of this [`WriteOp`]
190    pub fn name(&self) -> &str {
191        match self {
192            WriteOp::Insert(insert) => insert.name(),
193            WriteOp::Delete => "Delete",
194            WriteOp::Update => "Update",
195            WriteOp::Ctas => "Ctas",
196        }
197    }
198}
199
200impl Display for WriteOp {
201    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
202        write!(f, "{}", self.name())
203    }
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
207pub enum InsertOp {
208    /// Appends new rows to the existing table without modifying any
209    /// existing rows. This corresponds to the SQL `INSERT INTO` query.
210    Append,
211    /// Overwrites all existing rows in the table with the new rows.
212    /// This corresponds to the SQL `INSERT OVERWRITE` query.
213    Overwrite,
214    /// If any existing rows collides with the inserted rows (typically based
215    /// on a unique key or primary key), those existing rows are replaced.
216    /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
217    Replace,
218}
219
220impl InsertOp {
221    /// Return a descriptive name of this [`InsertOp`]
222    pub fn name(&self) -> &str {
223        match self {
224            InsertOp::Append => "Insert Into",
225            InsertOp::Overwrite => "Insert Overwrite",
226            InsertOp::Replace => "Replace Into",
227        }
228    }
229}
230
231impl Display for InsertOp {
232    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233        write!(f, "{}", self.name())
234    }
235}
236
237fn make_count_schema() -> DFSchemaRef {
238    Arc::new(
239        Schema::new(vec![Field::new("count", DataType::UInt64, false)])
240            .try_into()
241            .unwrap(),
242    )
243}