datafusion_common/
column.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Column
19
20use crate::error::{_schema_err, add_possible_columns_to_diag};
21use crate::utils::{parse_identifiers_normalized, quote_identifier};
22use crate::{DFSchema, Diagnostic, Result, SchemaError, Spans, TableReference};
23use arrow::datatypes::{Field, FieldRef};
24use std::collections::HashSet;
25use std::convert::Infallible;
26use std::fmt;
27use std::str::FromStr;
28
29/// A named reference to a qualified field in a schema.
30#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
31pub struct Column {
32    /// relation/table reference.
33    pub relation: Option<TableReference>,
34    /// field/column name.
35    pub name: String,
36    /// Original source code location, if known
37    pub spans: Spans,
38}
39
40impl fmt::Debug for Column {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("Column")
43            .field("relation", &self.relation)
44            .field("name", &self.name)
45            .finish()
46    }
47}
48
49impl Column {
50    /// Create Column from optional qualifier and name. The optional qualifier, if present,
51    /// will be parsed and normalized by default.
52    ///
53    /// See full details on [`TableReference::parse_str`]
54    ///
55    /// [`TableReference::parse_str`]: crate::TableReference::parse_str
56    pub fn new(
57        relation: Option<impl Into<TableReference>>,
58        name: impl Into<String>,
59    ) -> Self {
60        Self {
61            relation: relation.map(|r| r.into()),
62            name: name.into(),
63            spans: Spans::new(),
64        }
65    }
66
67    /// Convenience method for when there is no qualifier
68    pub fn new_unqualified(name: impl Into<String>) -> Self {
69        Self {
70            relation: None,
71            name: name.into(),
72            spans: Spans::new(),
73        }
74    }
75
76    /// Create Column from unqualified name.
77    ///
78    /// Alias for `Column::new_unqualified`
79    pub fn from_name(name: impl Into<String>) -> Self {
80        Self {
81            relation: None,
82            name: name.into(),
83            spans: Spans::new(),
84        }
85    }
86
87    /// Create a Column from multiple normalized identifiers
88    ///
89    /// For example, `foo.bar` would be represented as a two element vector
90    /// `["foo", "bar"]`
91    fn from_idents(mut idents: Vec<String>) -> Option<Self> {
92        let (relation, name) = match idents.len() {
93            1 => (None, idents.remove(0)),
94            2 => (
95                Some(TableReference::Bare {
96                    table: idents.remove(0).into(),
97                }),
98                idents.remove(0),
99            ),
100            3 => (
101                Some(TableReference::Partial {
102                    schema: idents.remove(0).into(),
103                    table: idents.remove(0).into(),
104                }),
105                idents.remove(0),
106            ),
107            4 => (
108                Some(TableReference::Full {
109                    catalog: idents.remove(0).into(),
110                    schema: idents.remove(0).into(),
111                    table: idents.remove(0).into(),
112                }),
113                idents.remove(0),
114            ),
115            // any expression that failed to parse or has more than 4 period delimited
116            // identifiers will be treated as an unqualified column name
117            _ => return None,
118        };
119        Some(Self {
120            relation,
121            name,
122            spans: Spans::new(),
123        })
124    }
125
126    /// Deserialize a fully qualified name string into a column
127    ///
128    /// Treats the name as a SQL identifier. For example
129    /// `foo.BAR` would be parsed to a reference to relation `foo`, column name `bar` (lower case)
130    /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
131    pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
132        let flat_name = flat_name.into();
133        Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or(
134            Self {
135                relation: None,
136                name: flat_name,
137                spans: Spans::new(),
138            },
139        )
140    }
141
142    /// Deserialize a fully qualified name string into a column preserving column text case
143    pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
144        let flat_name = flat_name.into();
145        Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or(
146            Self {
147                relation: None,
148                name: flat_name,
149                spans: Spans::new(),
150            },
151        )
152    }
153
154    /// return the column's name.
155    ///
156    /// Note: This ignores the relation and returns the column name only.
157    pub fn name(&self) -> &str {
158        &self.name
159    }
160
161    /// Serialize column into a flat name string
162    pub fn flat_name(&self) -> String {
163        match &self.relation {
164            Some(r) => format!("{}.{}", r, self.name),
165            None => self.name.clone(),
166        }
167    }
168
169    /// Serialize column into a quoted flat name string
170    pub fn quoted_flat_name(&self) -> String {
171        match &self.relation {
172            Some(r) => {
173                format!(
174                    "{}.{}",
175                    r.to_quoted_string(),
176                    quote_identifier(self.name.as_str())
177                )
178            }
179            None => quote_identifier(&self.name).to_string(),
180        }
181    }
182
183    /// Qualify column if not done yet.
184    ///
185    /// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
186    /// ignored. Otherwise this will search through the given schemas to find the column.
187    ///
188    /// Will check for ambiguity at each level of `schemas`.
189    ///
190    /// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
191    /// exception for `USING` statements, see below.
192    ///
193    /// # Using columns
194    /// Take the following SQL statement:
195    ///
196    /// ```sql
197    /// SELECT id FROM t1 JOIN t2 USING(id)
198    /// ```
199    ///
200    /// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
201    /// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
202    /// in this example this would be `[{t1.id, t2.id}]`.
203    ///
204    /// Regarding ambiguity check, `schemas` is structured to allow levels of schemas to be passed in.
205    /// For example:
206    ///
207    /// ```text
208    /// schemas = &[
209    ///    &[schema1, schema2], // first level
210    ///    &[schema3, schema4], // second level
211    /// ]
212    /// ```
213    ///
214    /// Will search for a matching field in all schemas in the first level. If a matching field according to above
215    /// mentioned conditions is not found, then will check the next level. If found more than one matching column across
216    /// all schemas in a level, that isn't a USING column, will return an error due to ambiguous column.
217    ///
218    /// If checked all levels and couldn't find field, will return field not found error.
219    pub fn normalize_with_schemas_and_ambiguity_check(
220        self,
221        schemas: &[&[&DFSchema]],
222        using_columns: &[HashSet<Column>],
223    ) -> Result<Self> {
224        if self.relation.is_some() {
225            return Ok(self);
226        }
227
228        for schema_level in schemas {
229            let qualified_fields = schema_level
230                .iter()
231                .flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
232                .collect::<Vec<_>>();
233            match qualified_fields.len() {
234                0 => continue,
235                1 => return Ok(Column::from(qualified_fields[0])),
236                _ => {
237                    // More than 1 fields in this schema have their names set to self.name.
238                    //
239                    // This should only happen when a JOIN query with USING constraint references
240                    // join columns using unqualified column name. For example:
241                    //
242                    // ```sql
243                    // SELECT id FROM t1 JOIN t2 USING(id)
244                    // ```
245                    //
246                    // In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
247                    // We will use the relation from the first matched field to normalize self.
248
249                    // Compare matched fields with one USING JOIN clause at a time
250                    let columns = schema_level
251                        .iter()
252                        .flat_map(|s| s.columns_with_unqualified_name(&self.name))
253                        .collect::<Vec<_>>();
254                    for using_col in using_columns {
255                        let all_matched = columns.iter().all(|c| using_col.contains(c));
256                        // All matched fields belong to the same using column set, in other words
257                        // the same join clause. We simply pick the qualifier from the first match.
258                        if all_matched {
259                            return Ok(columns[0].clone());
260                        }
261                    }
262
263                    // If not due to USING columns then due to ambiguous column name
264                    return _schema_err!(SchemaError::AmbiguousReference {
265                        field: Column::new_unqualified(&self.name),
266                    })
267                    .map_err(|err| {
268                        let mut diagnostic = Diagnostic::new_error(
269                            format!("column '{}' is ambiguous", &self.name),
270                            self.spans().first(),
271                        );
272                        // TODO If [`DFSchema`] had spans, we could show the
273                        // user which columns are candidates, or which table
274                        // they come from. For now, let's list the table names
275                        // only.
276                        add_possible_columns_to_diag(
277                            &mut diagnostic,
278                            &Column::new_unqualified(&self.name),
279                            &columns,
280                        );
281                        err.with_diagnostic(diagnostic)
282                    });
283                }
284            }
285        }
286
287        _schema_err!(SchemaError::FieldNotFound {
288            field: Box::new(self),
289            valid_fields: schemas
290                .iter()
291                .flat_map(|s| s.iter())
292                .flat_map(|s| s.columns())
293                .collect(),
294        })
295    }
296
297    /// Returns a reference to the set of locations in the SQL query where this
298    /// column appears, if known.
299    pub fn spans(&self) -> &Spans {
300        &self.spans
301    }
302
303    /// Returns a mutable reference to the set of locations in the SQL query
304    /// where this column appears, if known.
305    pub fn spans_mut(&mut self) -> &mut Spans {
306        &mut self.spans
307    }
308
309    /// Replaces the set of locations in the SQL query where this column
310    /// appears, if known.
311    pub fn with_spans(mut self, spans: Spans) -> Self {
312        self.spans = spans;
313        self
314    }
315
316    /// Qualifies the column with the given table reference.
317    pub fn with_relation(&self, relation: TableReference) -> Self {
318        Self {
319            relation: Some(relation),
320            ..self.clone()
321        }
322    }
323}
324
325impl From<&str> for Column {
326    fn from(c: &str) -> Self {
327        Self::from_qualified_name(c)
328    }
329}
330
331/// Create a column, cloning the string
332impl From<&String> for Column {
333    fn from(c: &String) -> Self {
334        Self::from_qualified_name(c)
335    }
336}
337
338/// Create a column, reusing the existing string
339impl From<String> for Column {
340    fn from(c: String) -> Self {
341        Self::from_qualified_name(c)
342    }
343}
344
345/// Create a column, use qualifier and field name
346impl From<(Option<&TableReference>, &Field)> for Column {
347    fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
348        Self::new(relation.cloned(), field.name())
349    }
350}
351
352/// Create a column, use qualifier and field name
353impl From<(Option<&TableReference>, &FieldRef)> for Column {
354    fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
355        Self::new(relation.cloned(), field.name())
356    }
357}
358
359impl FromStr for Column {
360    type Err = Infallible;
361
362    fn from_str(s: &str) -> Result<Self, Self::Err> {
363        Ok(s.into())
364    }
365}
366
367impl fmt::Display for Column {
368    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
369        write!(f, "{}", self.flat_name())
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use arrow::datatypes::{DataType, SchemaBuilder};
377    use std::sync::Arc;
378
379    fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
380        let mut schema_builder = SchemaBuilder::new();
381        schema_builder.extend(
382            names
383                .iter()
384                .map(|f| Field::new(*f, DataType::Boolean, true)),
385        );
386        let schema = Arc::new(schema_builder.finish());
387        DFSchema::try_from_qualified_schema(qualifier, &schema)
388    }
389
390    #[test]
391    fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
392        let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
393        let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
394        let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;
395
396        // already normalized
397        let col = Column::new(Some("t1"), "a");
398        let col = col.normalize_with_schemas_and_ambiguity_check(&[], &[])?;
399        assert_eq!(col, Column::new(Some("t1"), "a"));
400
401        // should find in first level (schema1)
402        let col = Column::from_name("a");
403        let col = col.normalize_with_schemas_and_ambiguity_check(
404            &[&[&schema1, &schema2], &[&schema3]],
405            &[],
406        )?;
407        assert_eq!(col, Column::new(Some("t1"), "a"));
408
409        // should find in second level (schema3)
410        let col = Column::from_name("e");
411        let col = col.normalize_with_schemas_and_ambiguity_check(
412            &[&[&schema1, &schema2], &[&schema3]],
413            &[],
414        )?;
415        assert_eq!(col, Column::new(Some("t3"), "e"));
416
417        // using column in first level (pick schema1)
418        let mut using_columns = HashSet::new();
419        using_columns.insert(Column::new(Some("t1"), "a"));
420        using_columns.insert(Column::new(Some("t3"), "a"));
421        let col = Column::from_name("a");
422        let col = col.normalize_with_schemas_and_ambiguity_check(
423            &[&[&schema1, &schema3], &[&schema2]],
424            &[using_columns],
425        )?;
426        assert_eq!(col, Column::new(Some("t1"), "a"));
427
428        // not found in any level
429        let col = Column::from_name("z");
430        let err = col
431            .normalize_with_schemas_and_ambiguity_check(
432                &[&[&schema1, &schema2], &[&schema3]],
433                &[],
434            )
435            .expect_err("should've failed to find field");
436        let expected = "Schema error: No field named z. \
437            Valid fields are t1.a, t1.b, t2.c, t2.d, t3.a, t3.b, t3.c, t3.d, t3.e.";
438        assert_eq!(err.strip_backtrace(), expected);
439
440        // ambiguous column reference
441        let col = Column::from_name("a");
442        let err = col
443            .normalize_with_schemas_and_ambiguity_check(
444                &[&[&schema1, &schema3], &[&schema2]],
445                &[],
446            )
447            .expect_err("should've found ambiguous field");
448        let expected = "Schema error: Ambiguous reference to unqualified field a";
449        assert_eq!(err.strip_backtrace(), expected);
450
451        Ok(())
452    }
453}