datafusion_expr/logical_plan/
dml.rs1use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30#[derive(Clone)]
32pub struct CopyTo {
33 pub input: Arc<LogicalPlan>,
35 pub output_url: String,
37 pub partition_by: Vec<String>,
39 pub file_type: Arc<dyn FileType>,
41 pub options: HashMap<String, String>,
43}
44
45impl Debug for CopyTo {
46 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47 f.debug_struct("CopyTo")
48 .field("input", &self.input)
49 .field("output_url", &self.output_url)
50 .field("partition_by", &self.partition_by)
51 .field("file_type", &"...")
52 .field("options", &self.options)
53 .finish_non_exhaustive()
54 }
55}
56
57impl PartialEq for CopyTo {
59 fn eq(&self, other: &Self) -> bool {
60 self.input == other.input && self.output_url == other.output_url
61 }
62}
63
64impl Eq for CopyTo {}
66
67impl PartialOrd for CopyTo {
70 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
71 match self.input.partial_cmp(&other.input) {
72 Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
73 {
74 Some(Ordering::Equal) => {
75 self.partition_by.partial_cmp(&other.partition_by)
76 }
77 cmp => cmp,
78 },
79 cmp => cmp,
80 }
81 }
82}
83
84impl Hash for CopyTo {
86 fn hash<H: Hasher>(&self, state: &mut H) {
87 self.input.hash(state);
88 self.output_url.hash(state);
89 }
90}
91
92#[derive(Clone)]
95pub struct DmlStatement {
96 pub table_name: TableReference,
98 pub target: Arc<dyn TableSource>,
100 pub op: WriteOp,
102 pub input: Arc<LogicalPlan>,
104 pub output_schema: DFSchemaRef,
106}
107impl Eq for DmlStatement {}
108impl Hash for DmlStatement {
109 fn hash<H: Hasher>(&self, state: &mut H) {
110 self.table_name.hash(state);
111 self.target.schema().hash(state);
112 self.op.hash(state);
113 self.input.hash(state);
114 self.output_schema.hash(state);
115 }
116}
117
118impl PartialEq for DmlStatement {
119 fn eq(&self, other: &Self) -> bool {
120 self.table_name == other.table_name
121 && self.target.schema() == other.target.schema()
122 && self.op == other.op
123 && self.input == other.input
124 && self.output_schema == other.output_schema
125 }
126}
127
128impl Debug for DmlStatement {
129 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
130 f.debug_struct("DmlStatement")
131 .field("table_name", &self.table_name)
132 .field("target", &"...")
133 .field("target_schema", &self.target.schema())
134 .field("op", &self.op)
135 .field("input", &self.input)
136 .field("output_schema", &self.output_schema)
137 .finish()
138 }
139}
140
141impl DmlStatement {
142 pub fn new(
144 table_name: TableReference,
145 target: Arc<dyn TableSource>,
146 op: WriteOp,
147 input: Arc<LogicalPlan>,
148 ) -> Self {
149 Self {
150 table_name,
151 target,
152 op,
153 input,
154
155 output_schema: make_count_schema(),
157 }
158 }
159
160 pub fn name(&self) -> &str {
162 self.op.name()
163 }
164}
165
166impl PartialOrd for DmlStatement {
169 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
170 match self.table_name.partial_cmp(&other.table_name) {
171 Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
172 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
173 cmp => cmp,
174 },
175 cmp => cmp,
176 }
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
181pub enum WriteOp {
182 Insert(InsertOp),
183 Delete,
184 Update,
185 Ctas,
186}
187
188impl WriteOp {
189 pub fn name(&self) -> &str {
191 match self {
192 WriteOp::Insert(insert) => insert.name(),
193 WriteOp::Delete => "Delete",
194 WriteOp::Update => "Update",
195 WriteOp::Ctas => "Ctas",
196 }
197 }
198}
199
200impl Display for WriteOp {
201 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
202 write!(f, "{}", self.name())
203 }
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
207pub enum InsertOp {
208 Append,
211 Overwrite,
214 Replace,
218}
219
220impl InsertOp {
221 pub fn name(&self) -> &str {
223 match self {
224 InsertOp::Append => "Insert Into",
225 InsertOp::Overwrite => "Insert Overwrite",
226 InsertOp::Replace => "Replace Into",
227 }
228 }
229}
230
231impl Display for InsertOp {
232 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
233 write!(f, "{}", self.name())
234 }
235}
236
237fn make_count_schema() -> DFSchemaRef {
238 Arc::new(
239 Schema::new(vec![Field::new("count", DataType::UInt64, false)])
240 .try_into()
241 .unwrap(),
242 )
243}