datafusion_common/column.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::{parse_identifiers_normalized, quote_identifier};
22use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
23use arrow::datatypes::{Field, FieldRef};
24use std::collections::HashSet;
25use std::convert::Infallible;
26use std::fmt;
27use std::str::FromStr;
28
29/// A named reference to a qualified field in a schema.
30#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
31pub struct Column {
32 /// relation/table reference.
33 pub relation: Option<TableReference>,
34 /// field/column name.
35 pub name: String,
36 /// Original source code location, if known
37 pub spans: Spans,
38}
39
40impl fmt::Debug for Column {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("Column")
43 .field("relation", &self.relation)
44 .field("name", &self.name)
45 .finish()
46 }
47}
48
49impl Column {
50 /// Create Column from optional qualifier and name. The optional qualifier, if present,
51 /// will be parsed and normalized by default.
52 ///
53 /// See full details on [`TableReference::parse_str`]
54 ///
55 /// [`TableReference::parse_str`]: crate::TableReference::parse_str
56 pub fn new(
57 relation: Option<impl Into<TableReference>>,
58 name: impl Into<String>,
59 ) -> Self {
60 Self {
61 relation: relation.map(|r| r.into()),
62 name: name.into(),
63 spans: Spans::new(),
64 }
65 }
66
67 /// Convenience method for when there is no qualifier
68 pub fn new_unqualified(name: impl Into<String>) -> Self {
69 Self {
70 relation: None,
71 name: name.into(),
72 spans: Spans::new(),
73 }
74 }
75
76 /// Create Column from unqualified name.
77 ///
78 /// Alias for `Column::new_unqualified`
79 pub fn from_name(name: impl Into<String>) -> Self {
80 Self {
81 relation: None,
82 name: name.into(),
83 spans: Spans::new(),
84 }
85 }
86
87 /// Create a Column from multiple normalized identifiers
88 ///
89 /// For example, `foo.bar` would be represented as a two element vector
90 /// `["foo", "bar"]`
91 fn from_idents(mut idents: Vec<String>) -> Option<Self> {
92 let (relation, name) = match idents.len() {
93 1 => (None, idents.remove(0)),
94 2 => (
95 Some(TableReference::Bare {
96 table: idents.remove(0).into(),
97 }),
98 idents.remove(0),
99 ),
100 3 => (
101 Some(TableReference::Partial {
102 schema: idents.remove(0).into(),
103 table: idents.remove(0).into(),
104 }),
105 idents.remove(0),
106 ),
107 4 => (
108 Some(TableReference::Full {
109 catalog: idents.remove(0).into(),
110 schema: idents.remove(0).into(),
111 table: idents.remove(0).into(),
112 }),
113 idents.remove(0),
114 ),
115 // any expression that failed to parse or has more than 4 period delimited
116 // identifiers will be treated as an unqualified column name
117 _ => return None,
118 };
119 Some(Self {
120 relation,
121 name,
122 spans: Spans::new(),
123 })
124 }
125
126 /// Deserialize a fully qualified name string into a column
127 ///
128 /// Treats the name as a SQL identifier. For example
129 /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
130 /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
131 pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
132 let flat_name = flat_name.into();
133 Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or(
134 Self {
135 relation: None,
136 name: flat_name,
137 spans: Spans::new(),
138 },
139 )
140 }
141
142 /// Deserialize a fully qualified name string into a column preserving column text case
143 pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
144 let flat_name = flat_name.into();
145 Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or(
146 Self {
147 relation: None,
148 name: flat_name,
149 spans: Spans::new(),
150 },
151 )
152 }
153
154 /// return the column's name.
155 ///
156 /// Note: This ignores the relation and returns the column name only.
157 pub fn name(&self) -> &str {
158 &self.name
159 }
160
161 /// Serialize column into a flat name string
162 pub fn flat_name(&self) -> String {
163 match &self.relation {
164 Some(r) => format!("{}.{}", r, self.name),
165 None => self.name.clone(),
166 }
167 }
168
169 /// Serialize column into a quoted flat name string
170 pub fn quoted_flat_name(&self) -> String {
171 match &self.relation {
172 Some(r) => {
173 format!(
174 "{}.{}",
175 r.to_quoted_string(),
176 quote_identifier(self.name.as_str())
177 )
178 }
179 None => quote_identifier(&self.name).to_string(),
180 }
181 }
182
183 /// Qualify column if not done yet.
184 ///
185 /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
186 /// ignored. Otherwise this will search through the given schemas to find the column.
187 ///
188 /// Will check for ambiguity at each level of `schemas`.
189 ///
190 /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
191 /// exception for `USING` statements, see below.
192 ///
193 /// # Using columns
194 /// Take the following SQL statement:
195 ///
196 /// ```sql
197 /// SELECT id FROM t1 JOIN t2 USING(id)
198 /// ```
199 ///
200 /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
201 /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
202 /// in this example this would be `[{t1.id, t2.id}]`.
203 ///
204 /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
205 /// For example:
206 ///
207 /// ```text
208 /// schemas = &[
209 /// &[schema1, schema2], // first level
210 /// &[schema3, schema4], // second level
211 /// ]
212 /// ```
213 ///
214 /// Will search for a matching field in all schemas in the first level. If a matching field according to above
215 /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
216 /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
217 ///
218 /// If checked all levels and couldn't find field, will return field not found error.
219 pub fn normalize_with_schemas_and_ambiguity_check(
220 self,
221 schemas: &[&[&DFSchema]],
222 using_columns: &[HashSet<Column>],
223 ) -> Result<Self> {
224 if self.relation.is_some() {
225 return Ok(self);
226 }
227
228 for schema_level in schemas {
229 let qualified_fields = schema_level
230 .iter()
231 .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
232 .collect::<Vec<_>>();
233 match qualified_fields.len() {
234 0 => continue,
235 1 => return Ok(Column::from(qualified_fields[0])),
236 _ => {
237 // More than 1 fields in this schema have their names set to self.name.
238 //
239 // This should only happen when a JOIN query with USING constraint references
240 // join columns using unqualified column name. For example:
241 //
242 // ```sql
243 // SELECT id FROM t1 JOIN t2 USING(id)
244 // ```
245 //
246 // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
247 // We will use the relation from the first matched field to normalize self.
248
249 // Compare matched fields with one USING JOIN clause at a time
250 let columns = schema_level
251 .iter()
252 .flat_map(|s| s.columns_with_unqualified_name(&self.name))
253 .collect::<Vec<_>>();
254 for using_col in using_columns {
255 let all_matched = columns.iter().all(|c| using_col.contains(c));
256 // All matched fields belong to the same using column set, in other words
257 // the same join clause. We simply pick the qualifier from the first match.
258 if all_matched {
259 return Ok(columns[0].clone());
260 }
261 }
262
263 // If not due to USING columns then due to ambiguous column name
264 return _schema_err!(SchemaError::AmbiguousReference {
265 field: Column::new_unqualified(&self.name),
266 })
267 .map_err(|err| {
268 let mut diagnostic = Diagnostic::new_error(
269 format!("column '{}' is ambiguous", &self.name),
270 self.spans().first(),
271 );
272 // TODO If [`DFSchema`] had spans, we could show the
273 // user which columns are candidates, or which table
274 // they come from. For now, let's list the table names
275 // only.
276 add_possible_columns_to_diag(
277 &mut diagnostic,
278 &Column::new_unqualified(&self.name),
279 &columns,
280 );
281 err.with_diagnostic(diagnostic)
282 });
283 }
284 }
285 }
286
287 _schema_err!(SchemaError::FieldNotFound {
288 field: Box::new(self),
289 valid_fields: schemas
290 .iter()
291 .flat_map(|s| s.iter())
292 .flat_map(|s| s.columns())
293 .collect(),
294 })
295 }
296
297 /// Returns a reference to the set of locations in the SQL query where this
298 /// column appears, if known.
299 pub fn spans(&self) -> &Spans {
300 &self.spans
301 }
302
303 /// Returns a mutable reference to the set of locations in the SQL query
304 /// where this column appears, if known.
305 pub fn spans_mut(&mut self) -> &mut Spans {
306 &mut self.spans
307 }
308
309 /// Replaces the set of locations in the SQL query where this column
310 /// appears, if known.
311 pub fn with_spans(mut self, spans: Spans) -> Self {
312 self.spans = spans;
313 self
314 }
315
316 /// Qualifies the column with the given table reference.
317 pub fn with_relation(&self, relation: TableReference) -> Self {
318 Self {
319 relation: Some(relation),
320 ..self.clone()
321 }
322 }
323}
324
325impl From<&str> for Column {
326 fn from(c: &str) -> Self {
327 Self::from_qualified_name(c)
328 }
329}
330
331/// Create a column, cloning the string
332impl From<&String> for Column {
333 fn from(c: &String) -> Self {
334 Self::from_qualified_name(c)
335 }
336}
337
338/// Create a column, reusing the existing string
339impl From<String> for Column {
340 fn from(c: String) -> Self {
341 Self::from_qualified_name(c)
342 }
343}
344
345/// Create a column, use qualifier and field name
346impl From<(Option<&TableReference>, &Field)> for Column {
347 fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
348 Self::new(relation.cloned(), field.name())
349 }
350}
351
352/// Create a column, use qualifier and field name
353impl From<(Option<&TableReference>, &FieldRef)> for Column {
354 fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
355 Self::new(relation.cloned(), field.name())
356 }
357}
358
359impl FromStr for Column {
360 type Err = Infallible;
361
362 fn from_str(s: &str) -> Result<Self, Self::Err> {
363 Ok(s.into())
364 }
365}
366
367impl fmt::Display for Column {
368 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
369 write!(f, "{}", self.flat_name())
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use arrow::datatypes::{DataType, SchemaBuilder};
377 use std::sync::Arc;
378
379 fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
380 let mut schema_builder = SchemaBuilder::new();
381 schema_builder.extend(
382 names
383 .iter()
384 .map(|f| Field::new(*f, DataType::Boolean, true)),
385 );
386 let schema = Arc::new(schema_builder.finish());
387 DFSchema::try_from_qualified_schema(qualifier, &schema)
388 }
389
390 #[test]
391 fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
392 let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
393 let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
394 let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
395
396 // already normalized
397 let col = Column::new(Some("t1"), "a");
398 let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
399 assert_eq!(col, Column::new(Some("t1"), "a"));
400
401 // should find in first level (schema1)
402 let col = Column::from_name("a");
403 let col = col.normalize_with_schemas_and_ambiguity_check(
404 &[&[&schema1, &schema2], &[&schema3]],
405 &[],
406 )?;
407 assert_eq!(col, Column::new(Some("t1"), "a"));
408
409 // should find in second level (schema3)
410 let col = Column::from_name("e");
411 let col = col.normalize_with_schemas_and_ambiguity_check(
412 &[&[&schema1, &schema2], &[&schema3]],
413 &[],
414 )?;
415 assert_eq!(col, Column::new(Some("t3"), "e"));
416
417 // using column in first level (pick schema1)
418 let mut using_columns = HashSet::new();
419 using_columns.insert(Column::new(Some("t1"), "a"));
420 using_columns.insert(Column::new(Some("t3"), "a"));
421 let col = Column::from_name("a");
422 let col = col.normalize_with_schemas_and_ambiguity_check(
423 &[&[&schema1, &schema3], &[&schema2]],
424 &[using_columns],
425 )?;
426 assert_eq!(col, Column::new(Some("t1"), "a"));
427
428 // not found in any level
429 let col = Column::from_name("z");
430 let err = col
431 .normalize_with_schemas_and_ambiguity_check(
432 &[&[&schema1, &schema2], &[&schema3]],
433 &[],
434 )
435 .expect_err("should've failed to find field");
436 let expected = "Schema error: No field named z. \
437 Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
438 assert_eq!(err.strip_backtrace(), expected);
439
440 // ambiguous column reference
441 let col = Column::from_name("a");
442 let err = col
443 .normalize_with_schemas_and_ambiguity_check(
444 &[&[&schema1, &schema3], &[&schema2]],
445 &[],
446 )
447 .expect_err("should've found ambiguous field");
448 let expected = "Schema error: Ambiguous reference to unqualified field a";
449 assert_eq!(err.strip_backtrace(), expected);
450
451 Ok(())
452 }
453}