datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    #[allow(deprecated)]
163    pub fn from_unqualified_fields(
164        fields: Fields,
165        metadata: HashMap<String, String>,
166    ) -> Result<Self> {
167        Self::from_unqualifed_fields(fields, metadata)
168    }
169
170    /// Create a new `DFSchema` from a list of Arrow [Field]s
171    #[deprecated(
172        since = "40.0.0",
173        note = "Please use `from_unqualified_fields` instead (this one's name is a typo). This method is subject to be removed soon"
174    )]
175    pub fn from_unqualifed_fields(
176        fields: Fields,
177        metadata: HashMap<String, String>,
178    ) -> Result<Self> {
179        let field_count = fields.len();
180        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
181        let dfschema = Self {
182            inner: schema,
183            field_qualifiers: vec![None; field_count],
184            functional_dependencies: FunctionalDependencies::empty(),
185        };
186        dfschema.check_names()?;
187        Ok(dfschema)
188    }
189
190    /// Create a `DFSchema` from an Arrow schema and a given qualifier
191    ///
192    /// To create a schema from an Arrow schema without a qualifier, use
193    /// `DFSchema::try_from`.
194    pub fn try_from_qualified_schema(
195        qualifier: impl Into<TableReference>,
196        schema: &Schema,
197    ) -> Result<Self> {
198        let qualifier = qualifier.into();
199        let schema = DFSchema {
200            inner: schema.clone().into(),
201            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
202            functional_dependencies: FunctionalDependencies::empty(),
203        };
204        schema.check_names()?;
205        Ok(schema)
206    }
207
208    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
209    pub fn from_field_specific_qualified_schema(
210        qualifiers: Vec<Option<TableReference>>,
211        schema: &SchemaRef,
212    ) -> Result<Self> {
213        let dfschema = Self {
214            inner: Arc::clone(schema),
215            field_qualifiers: qualifiers,
216            functional_dependencies: FunctionalDependencies::empty(),
217        };
218        dfschema.check_names()?;
219        Ok(dfschema)
220    }
221
222    /// Check if the schema have some fields with the same name
223    pub fn check_names(&self) -> Result<()> {
224        let mut qualified_names = BTreeSet::new();
225        let mut unqualified_names = BTreeSet::new();
226
227        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
228            if let Some(qualifier) = qualifier {
229                if !qualified_names.insert((qualifier, field.name())) {
230                    return _schema_err!(SchemaError::DuplicateQualifiedField {
231                        qualifier: Box::new(qualifier.clone()),
232                        name: field.name().to_string(),
233                    });
234                }
235            } else if !unqualified_names.insert(field.name()) {
236                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
237                    name: field.name().to_string()
238                });
239            }
240        }
241
242        for (qualifier, name) in qualified_names {
243            if unqualified_names.contains(name) {
244                return _schema_err!(SchemaError::AmbiguousReference {
245                    field: Column::new(Some(qualifier.clone()), name)
246                });
247            }
248        }
249        Ok(())
250    }
251
252    /// Assigns functional dependencies.
253    pub fn with_functional_dependencies(
254        mut self,
255        functional_dependencies: FunctionalDependencies,
256    ) -> Result<Self> {
257        if functional_dependencies.is_valid(self.inner.fields.len()) {
258            self.functional_dependencies = functional_dependencies;
259            Ok(self)
260        } else {
261            _plan_err!(
262                "Invalid functional dependency: {:?}",
263                functional_dependencies
264            )
265        }
266    }
267
268    /// Create a new schema that contains the fields from this schema followed by the fields
269    /// from the supplied schema. An error will be returned if there are duplicate field names.
270    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
271        let mut schema_builder = SchemaBuilder::new();
272        schema_builder.extend(self.inner.fields().iter().cloned());
273        schema_builder.extend(schema.fields().iter().cloned());
274        let new_schema = schema_builder.finish();
275
276        let mut new_metadata = self.inner.metadata.clone();
277        new_metadata.extend(schema.inner.metadata.clone());
278        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
279
280        let mut new_qualifiers = self.field_qualifiers.clone();
281        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
282
283        let new_self = Self {
284            inner: Arc::new(new_schema_with_metadata),
285            field_qualifiers: new_qualifiers,
286            functional_dependencies: FunctionalDependencies::empty(),
287        };
288        new_self.check_names()?;
289        Ok(new_self)
290    }
291
292    /// Modify this schema by appending the fields from the supplied schema, ignoring any
293    /// duplicate fields.
294    pub fn merge(&mut self, other_schema: &DFSchema) {
295        if other_schema.inner.fields.is_empty() {
296            return;
297        }
298
299        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
300            self.iter().collect();
301        let self_unqualified_names: HashSet<&str> = self
302            .inner
303            .fields
304            .iter()
305            .map(|field| field.name().as_str())
306            .collect();
307
308        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
309        let mut qualifiers = Vec::new();
310        for (qualifier, field) in other_schema.iter() {
311            // skip duplicate columns
312            let duplicated_field = match qualifier {
313                Some(q) => self_fields.contains(&(Some(q), field)),
314                // for unqualified columns, check as unqualified name
315                None => self_unqualified_names.contains(field.name().as_str()),
316            };
317            if !duplicated_field {
318                schema_builder.push(Arc::clone(field));
319                qualifiers.push(qualifier.cloned());
320            }
321        }
322        let mut metadata = self.inner.metadata.clone();
323        metadata.extend(other_schema.inner.metadata.clone());
324
325        let finished = schema_builder.finish();
326        let finished_with_metadata = finished.with_metadata(metadata);
327        self.inner = finished_with_metadata.into();
328        self.field_qualifiers.extend(qualifiers);
329    }
330
331    /// Get a list of fields
332    pub fn fields(&self) -> &Fields {
333        &self.inner.fields
334    }
335
336    /// Returns an immutable reference of a specific `Field` instance selected using an
337    /// offset within the internal `fields` vector
338    pub fn field(&self, i: usize) -> &Field {
339        &self.inner.fields[i]
340    }
341
342    /// Returns an immutable reference of a specific `Field` instance selected using an
343    /// offset within the internal `fields` vector and its qualifier
344    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
345        (self.field_qualifiers[i].as_ref(), self.field(i))
346    }
347
348    pub fn index_of_column_by_name(
349        &self,
350        qualifier: Option<&TableReference>,
351        name: &str,
352    ) -> Option<usize> {
353        let mut matches = self
354            .iter()
355            .enumerate()
356            .filter(|(_, (q, f))| match (qualifier, q) {
357                // field to lookup is qualified.
358                // current field is qualified and not shared between relations, compare both
359                // qualifier and name.
360                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
361                // field to lookup is qualified but current field is unqualified.
362                (Some(_), None) => false,
363                // field to lookup is unqualified, no need to compare qualifier
364                (None, Some(_)) | (None, None) => f.name() == name,
365            })
366            .map(|(idx, _)| idx);
367        matches.next()
368    }
369
370    /// Find the index of the column with the given qualifier and name,
371    /// returning `None` if not found
372    ///
373    /// See [Self::index_of_column] for a version that returns an error if the
374    /// column is not found
375    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
376        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
377    }
378
379    /// Find the index of the column with the given qualifier and name,
380    /// returning `Err` if not found
381    ///
382    /// See [Self::maybe_index_of_column] for a version that returns `None` if
383    /// the column is not found
384    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
385        self.maybe_index_of_column(col)
386            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
387    }
388
389    /// Check if the column is in the current schema
390    pub fn is_column_from_schema(&self, col: &Column) -> bool {
391        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
392            .is_some()
393    }
394
395    /// Find the field with the given name
396    pub fn field_with_name(
397        &self,
398        qualifier: Option<&TableReference>,
399        name: &str,
400    ) -> Result<&Field> {
401        if let Some(qualifier) = qualifier {
402            self.field_with_qualified_name(qualifier, name)
403        } else {
404            self.field_with_unqualified_name(name)
405        }
406    }
407
408    /// Find the qualified field with the given name
409    pub fn qualified_field_with_name(
410        &self,
411        qualifier: Option<&TableReference>,
412        name: &str,
413    ) -> Result<(Option<&TableReference>, &Field)> {
414        if let Some(qualifier) = qualifier {
415            let idx = self
416                .index_of_column_by_name(Some(qualifier), name)
417                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
418            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
419        } else {
420            self.qualified_field_with_unqualified_name(name)
421        }
422    }
423
424    /// Find all fields having the given qualifier
425    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
426        self.iter()
427            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
428            .map(|(_, f)| f.as_ref())
429            .collect()
430    }
431
432    /// Find all fields indices having the given qualifier
433    pub fn fields_indices_with_qualified(
434        &self,
435        qualifier: &TableReference,
436    ) -> Vec<usize> {
437        self.iter()
438            .enumerate()
439            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
440            .collect()
441    }
442
443    /// Find all fields that match the given name
444    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
445        self.fields()
446            .iter()
447            .filter(|field| field.name() == name)
448            .map(|f| f.as_ref())
449            .collect()
450    }
451
452    /// Find all fields that match the given name and return them with their qualifier
453    pub fn qualified_fields_with_unqualified_name(
454        &self,
455        name: &str,
456    ) -> Vec<(Option<&TableReference>, &Field)> {
457        self.iter()
458            .filter(|(_, field)| field.name() == name)
459            .map(|(qualifier, field)| (qualifier, field.as_ref()))
460            .collect()
461    }
462
463    /// Find all fields that match the given name and convert to column
464    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
465        self.iter()
466            .filter(|(_, field)| field.name() == name)
467            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
468            .collect()
469    }
470
471    /// Return all `Column`s for the schema
472    pub fn columns(&self) -> Vec<Column> {
473        self.iter()
474            .map(|(qualifier, field)| {
475                Column::new(qualifier.cloned(), field.name().clone())
476            })
477            .collect()
478    }
479
480    /// Find the qualified field with the given unqualified name
481    pub fn qualified_field_with_unqualified_name(
482        &self,
483        name: &str,
484    ) -> Result<(Option<&TableReference>, &Field)> {
485        let matches = self.qualified_fields_with_unqualified_name(name);
486        match matches.len() {
487            0 => Err(unqualified_field_not_found(name, self)),
488            1 => Ok((matches[0].0, (matches[0].1))),
489            _ => {
490                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
491                // Because name may generate from Alias/... . It means that it don't own qualifier.
492                // For example:
493                //             Join on id = b.id
494                // Project a.id as id   TableScan b id
495                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
496                // one field without qualifier, we should return it.
497                let fields_without_qualifier = matches
498                    .iter()
499                    .filter(|(q, _)| q.is_none())
500                    .collect::<Vec<_>>();
501                if fields_without_qualifier.len() == 1 {
502                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
503                } else {
504                    _schema_err!(SchemaError::AmbiguousReference {
505                        field: Column::new_unqualified(name.to_string(),),
506                    })
507                }
508            }
509        }
510    }
511
512    /// Find the field with the given name
513    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
514        self.qualified_field_with_unqualified_name(name)
515            .map(|(_, field)| field)
516    }
517
518    /// Find the field with the given qualified name
519    pub fn field_with_qualified_name(
520        &self,
521        qualifier: &TableReference,
522        name: &str,
523    ) -> Result<&Field> {
524        let idx = self
525            .index_of_column_by_name(Some(qualifier), name)
526            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
527
528        Ok(self.field(idx))
529    }
530
531    /// Find the field with the given qualified column
532    pub fn field_from_column(&self, column: &Column) -> Result<&Field> {
533        match &column.relation {
534            Some(r) => self.field_with_qualified_name(r, &column.name),
535            None => self.field_with_unqualified_name(&column.name),
536        }
537    }
538
539    /// Find the field with the given qualified column
540    pub fn qualified_field_from_column(
541        &self,
542        column: &Column,
543    ) -> Result<(Option<&TableReference>, &Field)> {
544        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
545    }
546
547    /// Find if the field exists with the given name
548    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
549        self.fields().iter().any(|field| field.name() == name)
550    }
551
552    /// Find if the field exists with the given qualified name
553    pub fn has_column_with_qualified_name(
554        &self,
555        qualifier: &TableReference,
556        name: &str,
557    ) -> bool {
558        self.iter()
559            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
560    }
561
562    /// Find if the field exists with the given qualified column
563    pub fn has_column(&self, column: &Column) -> bool {
564        match &column.relation {
565            Some(r) => self.has_column_with_qualified_name(r, &column.name),
566            None => self.has_column_with_unqualified_name(&column.name),
567        }
568    }
569
570    /// Check to see if unqualified field names matches field names in Arrow schema
571    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
572        self.inner
573            .fields
574            .iter()
575            .zip(arrow_schema.fields().iter())
576            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
577    }
578
579    /// Check to see if fields in 2 Arrow schemas are compatible
580    pub fn check_arrow_schema_type_compatible(
581        &self,
582        arrow_schema: &Schema,
583    ) -> Result<()> {
584        let self_arrow_schema: Schema = self.into();
585        self_arrow_schema
586            .fields()
587            .iter()
588            .zip(arrow_schema.fields().iter())
589            .try_for_each(|(l_field, r_field)| {
590                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
591                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
592                                r_field.name(),
593                                r_field.data_type(),
594                                l_field.name(),
595                                l_field.data_type())
596                } else {
597                    Ok(())
598                }
599            })
600    }
601
602    /// Returns true if the two schemas have the same qualified named
603    /// fields with logically equivalent data types. Returns false otherwise.
604    ///
605    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
606    /// equivalence checking.
607    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
608        if self.fields().len() != other.fields().len() {
609            return false;
610        }
611        let self_fields = self.iter();
612        let other_fields = other.iter();
613        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
614            q1 == q2
615                && f1.name() == f2.name()
616                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
617        })
618    }
619
620    /// Returns true if the two schemas have the same qualified named
621    /// fields with the same data types. Returns false otherwise.
622    ///
623    /// This is a specialized version of Eq that ignores differences
624    /// in nullability and metadata.
625    ///
626    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
627    /// logical type checking, which for example would consider a dictionary
628    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
629    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
630        if self.fields().len() != other.fields().len() {
631            return false;
632        }
633        let self_fields = self.iter();
634        let other_fields = other.iter();
635        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
636            q1 == q2
637                && f1.name() == f2.name()
638                && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
639        })
640    }
641
642    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
643    /// than datatype_is_semantically_equal in that a Dictionary<K,V> type is logically
644    /// equal to a plain V type, but not semantically equal. Dictionary<K1, V1> is also
645    /// logically equal to Dictionary<K2, V1>.
646    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
647        // check nested fields
648        match (dt1, dt2) {
649            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
650                v1.as_ref() == v2.as_ref()
651            }
652            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
653            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
654            (DataType::List(f1), DataType::List(f2))
655            | (DataType::LargeList(f1), DataType::LargeList(f2))
656            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
657                // Don't compare the names of the technical inner field
658                // Usually "item" but that's not mandated
659                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
660            }
661            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
662                // Don't compare the names of the technical inner fields
663                // Usually "entries", "key", "value" but that's not mandated
664                match (f1.data_type(), f2.data_type()) {
665                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
666                        f1_inner.len() == f2_inner.len()
667                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
668                                Self::datatype_is_logically_equal(
669                                    f1.data_type(),
670                                    f2.data_type(),
671                                )
672                            })
673                    }
674                    _ => panic!("Map type should have an inner struct field"),
675                }
676            }
677            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
678                let iter1 = fields1.iter();
679                let iter2 = fields2.iter();
680                fields1.len() == fields2.len() &&
681                        // all fields have to be the same
682                    iter1
683                    .zip(iter2)
684                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
685            }
686            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
687                let iter1 = fields1.iter();
688                let iter2 = fields2.iter();
689                fields1.len() == fields2.len() &&
690                    // all fields have to be the same
691                    iter1
692                        .zip(iter2)
693                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
694            }
695            _ => dt1 == dt2,
696        }
697    }
698
699    /// Returns true of two [`DataType`]s are semantically equal (same
700    /// name and type), ignoring both metadata and nullability.
701    ///
702    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
703    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
704        // check nested fields
705        match (dt1, dt2) {
706            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
707                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
708                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
709            }
710            (DataType::List(f1), DataType::List(f2))
711            | (DataType::LargeList(f1), DataType::LargeList(f2))
712            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
713                // Don't compare the names of the technical inner field
714                // Usually "item" but that's not mandated
715                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
716            }
717            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
718                // Don't compare the names of the technical inner fields
719                // Usually "entries", "key", "value" but that's not mandated
720                match (f1.data_type(), f2.data_type()) {
721                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
722                        f1_inner.len() == f2_inner.len()
723                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
724                                Self::datatype_is_semantically_equal(
725                                    f1.data_type(),
726                                    f2.data_type(),
727                                )
728                            })
729                    }
730                    _ => panic!("Map type should have an inner struct field"),
731                }
732            }
733            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
734                let iter1 = fields1.iter();
735                let iter2 = fields2.iter();
736                fields1.len() == fields2.len() &&
737                        // all fields have to be the same
738                    iter1
739                    .zip(iter2)
740                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
741            }
742            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
743                let iter1 = fields1.iter();
744                let iter2 = fields2.iter();
745                fields1.len() == fields2.len() &&
746                    // all fields have to be the same
747                    iter1
748                        .zip(iter2)
749                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
750            }
751            (
752                DataType::Decimal128(_l_precision, _l_scale),
753                DataType::Decimal128(_r_precision, _r_scale),
754            ) => true,
755            (
756                DataType::Decimal256(_l_precision, _l_scale),
757                DataType::Decimal256(_r_precision, _r_scale),
758            ) => true,
759            _ => dt1 == dt2,
760        }
761    }
762
763    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
764        f1.name() == f2.name()
765            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
766    }
767
768    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
769        f1.name() == f2.name()
770            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
771    }
772
773    /// Strip all field qualifier in schema
774    pub fn strip_qualifiers(self) -> Self {
775        DFSchema {
776            field_qualifiers: vec![None; self.inner.fields.len()],
777            inner: self.inner,
778            functional_dependencies: self.functional_dependencies,
779        }
780    }
781
782    /// Replace all field qualifier with new value in schema
783    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
784        let qualifier = qualifier.into();
785        DFSchema {
786            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
787            inner: self.inner,
788            functional_dependencies: self.functional_dependencies,
789        }
790    }
791
792    /// Get list of fully-qualified field names in this schema
793    pub fn field_names(&self) -> Vec<String> {
794        self.iter()
795            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
796            .collect::<Vec<_>>()
797    }
798
799    /// Get metadata of this schema
800    pub fn metadata(&self) -> &HashMap<String, String> {
801        &self.inner.metadata
802    }
803
804    /// Get functional dependencies
805    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
806        &self.functional_dependencies
807    }
808
809    /// Iterate over the qualifiers and fields in the DFSchema
810    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
811        self.field_qualifiers
812            .iter()
813            .zip(self.inner.fields().iter())
814            .map(|(qualifier, field)| (qualifier.as_ref(), field))
815    }
816}
817
818impl From<DFSchema> for Schema {
819    /// Convert DFSchema into a Schema
820    fn from(df_schema: DFSchema) -> Self {
821        let fields: Fields = df_schema.inner.fields.clone();
822        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
823    }
824}
825
826impl From<&DFSchema> for Schema {
827    /// Convert DFSchema reference into a Schema
828    fn from(df_schema: &DFSchema) -> Self {
829        let fields: Fields = df_schema.inner.fields.clone();
830        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
831    }
832}
833
834/// Allow DFSchema to be converted into an Arrow `&Schema`
835impl AsRef<Schema> for DFSchema {
836    fn as_ref(&self) -> &Schema {
837        self.as_arrow()
838    }
839}
840
841/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
842/// example)
843impl AsRef<SchemaRef> for DFSchema {
844    fn as_ref(&self) -> &SchemaRef {
845        self.inner()
846    }
847}
848
849/// Create a `DFSchema` from an Arrow schema
850impl TryFrom<Schema> for DFSchema {
851    type Error = DataFusionError;
852    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
853        Self::try_from(Arc::new(schema))
854    }
855}
856
857impl TryFrom<SchemaRef> for DFSchema {
858    type Error = DataFusionError;
859    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
860        let field_count = schema.fields.len();
861        let dfschema = Self {
862            inner: schema,
863            field_qualifiers: vec![None; field_count],
864            functional_dependencies: FunctionalDependencies::empty(),
865        };
866        Ok(dfschema)
867    }
868}
869
870impl From<DFSchema> for SchemaRef {
871    fn from(df_schema: DFSchema) -> Self {
872        SchemaRef::new(df_schema.into())
873    }
874}
875
876// Hashing refers to a subset of fields considered in PartialEq.
877impl Hash for DFSchema {
878    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
879        self.inner.fields.hash(state);
880        self.inner.metadata.len().hash(state); // HashMap is not hashable
881    }
882}
883
884/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
885pub trait ToDFSchema
886where
887    Self: Sized,
888{
889    /// Attempt to create a DSSchema
890    fn to_dfschema(self) -> Result<DFSchema>;
891
892    /// Attempt to create a DSSchemaRef
893    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
894        Ok(Arc::new(self.to_dfschema()?))
895    }
896}
897
898impl ToDFSchema for Schema {
899    fn to_dfschema(self) -> Result<DFSchema> {
900        DFSchema::try_from(self)
901    }
902}
903
904impl ToDFSchema for SchemaRef {
905    fn to_dfschema(self) -> Result<DFSchema> {
906        DFSchema::try_from(self)
907    }
908}
909
910impl ToDFSchema for Vec<Field> {
911    fn to_dfschema(self) -> Result<DFSchema> {
912        let field_count = self.len();
913        let schema = Schema {
914            fields: self.into(),
915            metadata: HashMap::new(),
916        };
917        let dfschema = DFSchema {
918            inner: schema.into(),
919            field_qualifiers: vec![None; field_count],
920            functional_dependencies: FunctionalDependencies::empty(),
921        };
922        Ok(dfschema)
923    }
924}
925
926impl Display for DFSchema {
927    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
928        write!(
929            f,
930            "fields:[{}], metadata:{:?}",
931            self.iter()
932                .map(|(q, f)| qualified_name(q, f.name()))
933                .collect::<Vec<String>>()
934                .join(", "),
935            self.inner.metadata
936        )
937    }
938}
939
940/// Provides schema information needed by certain methods of `Expr`
941/// (defined in the datafusion-common crate).
942///
943/// Note that this trait is implemented for &[DFSchema] which is
944/// widely used in the DataFusion codebase.
945pub trait ExprSchema: std::fmt::Debug {
946    /// Is this column reference nullable?
947    fn nullable(&self, col: &Column) -> Result<bool>;
948
949    /// What is the datatype of this column?
950    fn data_type(&self, col: &Column) -> Result<&DataType>;
951
952    /// Returns the column's optional metadata.
953    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>>;
954
955    /// Return the column's datatype and nullability
956    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)>;
957}
958
959// Implement `ExprSchema` for `Arc<DFSchema>`
960impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
961    fn nullable(&self, col: &Column) -> Result<bool> {
962        self.as_ref().nullable(col)
963    }
964
965    fn data_type(&self, col: &Column) -> Result<&DataType> {
966        self.as_ref().data_type(col)
967    }
968
969    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
970        ExprSchema::metadata(self.as_ref(), col)
971    }
972
973    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
974        self.as_ref().data_type_and_nullable(col)
975    }
976}
977
978impl ExprSchema for DFSchema {
979    fn nullable(&self, col: &Column) -> Result<bool> {
980        Ok(self.field_from_column(col)?.is_nullable())
981    }
982
983    fn data_type(&self, col: &Column) -> Result<&DataType> {
984        Ok(self.field_from_column(col)?.data_type())
985    }
986
987    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
988        Ok(self.field_from_column(col)?.metadata())
989    }
990
991    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
992        let field = self.field_from_column(col)?;
993        Ok((field.data_type(), field.is_nullable()))
994    }
995}
996
997/// DataFusion-specific extensions to [`Schema`].
998pub trait SchemaExt {
999    /// This is a specialized version of Eq that ignores differences
1000    /// in nullability and metadata.
1001    ///
1002    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1003    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1004
1005    /// Returns nothing if the two schemas have the same qualified named
1006    /// fields with logically equivalent data types. Returns internal error otherwise.
1007    ///
1008    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1009    /// equivalence checking.
1010    ///
1011    /// It is only used by insert into cases.
1012    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1013}
1014
1015impl SchemaExt for Schema {
1016    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1017        if self.fields().len() != other.fields().len() {
1018            return false;
1019        }
1020
1021        self.fields()
1022            .iter()
1023            .zip(other.fields().iter())
1024            .all(|(f1, f2)| {
1025                f1.name() == f2.name()
1026                    && DFSchema::datatype_is_semantically_equal(
1027                        f1.data_type(),
1028                        f2.data_type(),
1029                    )
1030            })
1031    }
1032
1033    // It is only used by insert into cases.
1034    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1035        // case 1 : schema length mismatch
1036        if self.fields().len() != other.fields().len() {
1037            _plan_err!(
1038                "Inserting query must have the same schema length as the table. \
1039            Expected table schema length: {}, got: {}",
1040                self.fields().len(),
1041                other.fields().len()
1042            )
1043        } else {
1044            // case 2 : schema length match, but fields mismatch
1045            // check if the fields name are the same and have the same data types
1046            self.fields()
1047                .iter()
1048                .zip(other.fields().iter())
1049                .try_for_each(|(f1, f2)| {
1050                    if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
1051                        _plan_err!(
1052                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1053                            but got '{}' with type {:?}.",
1054                            f1.name(),
1055                            f1.data_type(),
1056                            f2.name(),
1057                            f2.data_type())
1058                    } else {
1059                        Ok(())
1060                    }
1061                })
1062        }
1063    }
1064}
1065
1066pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1067    match qualifier {
1068        Some(q) => format!("{}.{}", q, name),
1069        None => name.to_string(),
1070    }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075    use crate::assert_contains;
1076
1077    use super::*;
1078
1079    #[test]
1080    fn qualifier_in_name() -> Result<()> {
1081        let col = Column::from_name("t1.c0");
1082        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1083        // lookup with unqualified name "t1.c0"
1084        let err = schema.index_of_column(&col).unwrap_err();
1085        let expected = "Schema error: No field named \"t1.c0\". \
1086            Column names are case sensitive. \
1087            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1088            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1089            Did you mean 't1.c0'?.";
1090        assert_eq!(err.strip_backtrace(), expected);
1091        Ok(())
1092    }
1093
1094    #[test]
1095    fn quoted_qualifiers_in_name() -> Result<()> {
1096        let col = Column::from_name("t1.c0");
1097        let schema = DFSchema::try_from_qualified_schema(
1098            "t1",
1099            &Schema::new(vec![
1100                Field::new("CapitalColumn", DataType::Boolean, true),
1101                Field::new("field.with.period", DataType::Boolean, true),
1102            ]),
1103        )?;
1104
1105        // lookup with unqualified name "t1.c0"
1106        let err = schema.index_of_column(&col).unwrap_err();
1107        let expected = "Schema error: No field named \"t1.c0\". \
1108            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1109        assert_eq!(err.strip_backtrace(), expected);
1110        Ok(())
1111    }
1112
1113    #[test]
1114    fn from_unqualified_schema() -> Result<()> {
1115        let schema = DFSchema::try_from(test_schema_1())?;
1116        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1117        Ok(())
1118    }
1119
1120    #[test]
1121    fn from_qualified_schema() -> Result<()> {
1122        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1123        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1124        Ok(())
1125    }
1126
1127    #[test]
1128    fn test_from_field_specific_qualified_schema() -> Result<()> {
1129        let schema = DFSchema::from_field_specific_qualified_schema(
1130            vec![Some("t1".into()), None],
1131            &Arc::new(Schema::new(vec![
1132                Field::new("c0", DataType::Boolean, true),
1133                Field::new("c1", DataType::Boolean, true),
1134            ])),
1135        )?;
1136        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn test_from_qualified_fields() -> Result<()> {
1142        let schema = DFSchema::new_with_metadata(
1143            vec![
1144                (
1145                    Some("t0".into()),
1146                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1147                ),
1148                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1149            ],
1150            HashMap::new(),
1151        )?;
1152        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1153        Ok(())
1154    }
1155
1156    #[test]
1157    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1158        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1159        let arrow_schema: Schema = schema.into();
1160        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1161        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1162        assert_eq!(expected, arrow_schema.to_string());
1163        Ok(())
1164    }
1165
1166    #[test]
1167    fn join_qualified() -> Result<()> {
1168        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1169        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1170        let join = left.join(&right)?;
1171        assert_eq!(
1172            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1173            join.to_string()
1174        );
1175        // test valid access
1176        assert!(join
1177            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1178            .is_ok());
1179        assert!(join
1180            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1181            .is_ok());
1182        // test invalid access
1183        assert!(join.field_with_unqualified_name("c0").is_err());
1184        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1185        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1186        Ok(())
1187    }
1188
1189    #[test]
1190    fn join_qualified_duplicate() -> Result<()> {
1191        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1192        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1193        let join = left.join(&right);
1194        assert_eq!(
1195            join.unwrap_err().strip_backtrace(),
1196            "Schema error: Schema contains duplicate qualified field name t1.c0",
1197        );
1198        Ok(())
1199    }
1200
1201    #[test]
1202    fn join_unqualified_duplicate() -> Result<()> {
1203        let left = DFSchema::try_from(test_schema_1())?;
1204        let right = DFSchema::try_from(test_schema_1())?;
1205        let join = left.join(&right);
1206        assert_eq!(
1207            join.unwrap_err().strip_backtrace(),
1208            "Schema error: Schema contains duplicate unqualified field name c0"
1209        );
1210        Ok(())
1211    }
1212
1213    #[test]
1214    fn join_mixed() -> Result<()> {
1215        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1216        let right = DFSchema::try_from(test_schema_2())?;
1217        let join = left.join(&right)?;
1218        assert_eq!(
1219            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1220            join.to_string()
1221        );
1222        // test valid access
1223        assert!(join
1224            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1225            .is_ok());
1226        assert!(join.field_with_unqualified_name("c0").is_ok());
1227        assert!(join.field_with_unqualified_name("c100").is_ok());
1228        assert!(join.field_with_name(None, "c100").is_ok());
1229        // test invalid access
1230        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1231        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1232        assert!(join
1233            .field_with_qualified_name(&TableReference::bare(""), "c100")
1234            .is_err());
1235        Ok(())
1236    }
1237
1238    #[test]
1239    fn join_mixed_duplicate() -> Result<()> {
1240        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1241        let right = DFSchema::try_from(test_schema_1())?;
1242        let join = left.join(&right);
1243        assert_contains!(join.unwrap_err().to_string(),
1244                         "Schema error: Schema contains qualified \
1245                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1246        Ok(())
1247    }
1248
1249    #[test]
1250    fn helpful_error_messages() -> Result<()> {
1251        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1252        let expected_help = "Valid fields are t1.c0, t1.c1.";
1253        assert_contains!(
1254            schema
1255                .field_with_qualified_name(&TableReference::bare("x"), "y")
1256                .unwrap_err()
1257                .to_string(),
1258            expected_help
1259        );
1260        assert_contains!(
1261            schema
1262                .field_with_unqualified_name("y")
1263                .unwrap_err()
1264                .to_string(),
1265            expected_help
1266        );
1267        assert!(schema.index_of_column_by_name(None, "y").is_none());
1268        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1269
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn select_without_valid_fields() {
1275        let schema = DFSchema::empty();
1276
1277        let col = Column::from_qualified_name("t1.c0");
1278        let err = schema.index_of_column(&col).unwrap_err();
1279        let expected = "Schema error: No field named t1.c0.";
1280        assert_eq!(err.strip_backtrace(), expected);
1281
1282        // the same check without qualifier
1283        let col = Column::from_name("c0");
1284        let err = schema.index_of_column(&col).err().unwrap();
1285        let expected = "Schema error: No field named c0.";
1286        assert_eq!(err.strip_backtrace(), expected);
1287    }
1288
1289    #[test]
1290    fn into() {
1291        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1292        let arrow_schema = Schema::new_with_metadata(
1293            vec![Field::new("c0", DataType::Int64, true)],
1294            test_metadata(),
1295        );
1296        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1297
1298        let df_schema = DFSchema {
1299            inner: Arc::clone(&arrow_schema_ref),
1300            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1301            functional_dependencies: FunctionalDependencies::empty(),
1302        };
1303        let df_schema_ref = Arc::new(df_schema.clone());
1304
1305        {
1306            let arrow_schema = arrow_schema.clone();
1307            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1308
1309            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1310            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1311        }
1312
1313        {
1314            let arrow_schema = arrow_schema.clone();
1315            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1316
1317            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1318            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1319        }
1320
1321        // Now, consume the refs
1322        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1323        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1324    }
1325
1326    fn test_schema_1() -> Schema {
1327        Schema::new(vec![
1328            Field::new("c0", DataType::Boolean, true),
1329            Field::new("c1", DataType::Boolean, true),
1330        ])
1331    }
1332    #[test]
1333    fn test_dfschema_to_schema_conversion() {
1334        let mut a_metadata = HashMap::new();
1335        a_metadata.insert("key".to_string(), "value".to_string());
1336        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1337
1338        let mut b_metadata = HashMap::new();
1339        b_metadata.insert("key".to_string(), "value".to_string());
1340        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1341
1342        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1343
1344        let df_schema = DFSchema {
1345            inner: Arc::clone(&schema),
1346            field_qualifiers: vec![None; schema.fields.len()],
1347            functional_dependencies: FunctionalDependencies::empty(),
1348        };
1349
1350        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1351    }
1352
1353    #[test]
1354    fn test_contain_column() -> Result<()> {
1355        // qualified exists
1356        {
1357            let col = Column::from_qualified_name("t1.c0");
1358            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1359            assert!(schema.is_column_from_schema(&col));
1360        }
1361
1362        // qualified not exists
1363        {
1364            let col = Column::from_qualified_name("t1.c2");
1365            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1366            assert!(!schema.is_column_from_schema(&col));
1367        }
1368
1369        // unqualified exists
1370        {
1371            let col = Column::from_name("c0");
1372            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1373            assert!(schema.is_column_from_schema(&col));
1374        }
1375
1376        // unqualified not exists
1377        {
1378            let col = Column::from_name("c2");
1379            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1380            assert!(!schema.is_column_from_schema(&col));
1381        }
1382
1383        Ok(())
1384    }
1385
1386    #[test]
1387    fn test_datatype_is_logically_equal() {
1388        assert!(DFSchema::datatype_is_logically_equal(
1389            &DataType::Int8,
1390            &DataType::Int8
1391        ));
1392
1393        assert!(!DFSchema::datatype_is_logically_equal(
1394            &DataType::Int8,
1395            &DataType::Int16
1396        ));
1397
1398        // Test lists
1399
1400        // Succeeds if both have the same element type, disregards names and nullability
1401        assert!(DFSchema::datatype_is_logically_equal(
1402            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1403            &DataType::List(Field::new("element", DataType::Int8, false).into())
1404        ));
1405
1406        // Fails if element type is different
1407        assert!(!DFSchema::datatype_is_logically_equal(
1408            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1409            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1410        ));
1411
1412        // Test maps
1413        let map_field = DataType::Map(
1414            Field::new(
1415                "entries",
1416                DataType::Struct(Fields::from(vec![
1417                    Field::new("key", DataType::Int8, false),
1418                    Field::new("value", DataType::Int8, true),
1419                ])),
1420                true,
1421            )
1422            .into(),
1423            true,
1424        );
1425
1426        // Succeeds if both maps have the same key and value types, disregards names and nullability
1427        assert!(DFSchema::datatype_is_logically_equal(
1428            &map_field,
1429            &DataType::Map(
1430                Field::new(
1431                    "pairs",
1432                    DataType::Struct(Fields::from(vec![
1433                        Field::new("one", DataType::Int8, false),
1434                        Field::new("two", DataType::Int8, false)
1435                    ])),
1436                    true
1437                )
1438                .into(),
1439                true
1440            )
1441        ));
1442        // Fails if value type is different
1443        assert!(!DFSchema::datatype_is_logically_equal(
1444            &map_field,
1445            &DataType::Map(
1446                Field::new(
1447                    "entries",
1448                    DataType::Struct(Fields::from(vec![
1449                        Field::new("key", DataType::Int8, false),
1450                        Field::new("value", DataType::Int16, true)
1451                    ])),
1452                    true
1453                )
1454                .into(),
1455                true
1456            )
1457        ));
1458
1459        // Fails if key type is different
1460        assert!(!DFSchema::datatype_is_logically_equal(
1461            &map_field,
1462            &DataType::Map(
1463                Field::new(
1464                    "entries",
1465                    DataType::Struct(Fields::from(vec![
1466                        Field::new("key", DataType::Int16, false),
1467                        Field::new("value", DataType::Int8, true)
1468                    ])),
1469                    true
1470                )
1471                .into(),
1472                true
1473            )
1474        ));
1475
1476        // Test structs
1477
1478        let struct_field = DataType::Struct(Fields::from(vec![
1479            Field::new("a", DataType::Int8, true),
1480            Field::new("b", DataType::Int8, true),
1481        ]));
1482
1483        // Succeeds if both have same names and datatypes, ignores nullability
1484        assert!(DFSchema::datatype_is_logically_equal(
1485            &struct_field,
1486            &DataType::Struct(Fields::from(vec![
1487                Field::new("a", DataType::Int8, false),
1488                Field::new("b", DataType::Int8, true),
1489            ]))
1490        ));
1491
1492        // Fails if field names are different
1493        assert!(!DFSchema::datatype_is_logically_equal(
1494            &struct_field,
1495            &DataType::Struct(Fields::from(vec![
1496                Field::new("x", DataType::Int8, true),
1497                Field::new("y", DataType::Int8, true),
1498            ]))
1499        ));
1500
1501        // Fails if types are different
1502        assert!(!DFSchema::datatype_is_logically_equal(
1503            &struct_field,
1504            &DataType::Struct(Fields::from(vec![
1505                Field::new("a", DataType::Int16, true),
1506                Field::new("b", DataType::Int8, true),
1507            ]))
1508        ));
1509
1510        // Fails if more or less fields
1511        assert!(!DFSchema::datatype_is_logically_equal(
1512            &struct_field,
1513            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1514        ));
1515    }
1516
1517    #[test]
1518    fn test_datatype_is_logically_equivalent_to_dictionary() {
1519        // Dictionary is logically equal to its value type
1520        assert!(DFSchema::datatype_is_logically_equal(
1521            &DataType::Utf8,
1522            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1523        ));
1524    }
1525
1526    #[test]
1527    fn test_datatype_is_semantically_equal() {
1528        assert!(DFSchema::datatype_is_semantically_equal(
1529            &DataType::Int8,
1530            &DataType::Int8
1531        ));
1532
1533        assert!(!DFSchema::datatype_is_semantically_equal(
1534            &DataType::Int8,
1535            &DataType::Int16
1536        ));
1537
1538        // Test lists
1539
1540        // Succeeds if both have the same element type, disregards names and nullability
1541        assert!(DFSchema::datatype_is_semantically_equal(
1542            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1543            &DataType::List(Field::new("element", DataType::Int8, false).into())
1544        ));
1545
1546        // Fails if element type is different
1547        assert!(!DFSchema::datatype_is_semantically_equal(
1548            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1549            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1550        ));
1551
1552        // Test maps
1553        let map_field = DataType::Map(
1554            Field::new(
1555                "entries",
1556                DataType::Struct(Fields::from(vec![
1557                    Field::new("key", DataType::Int8, false),
1558                    Field::new("value", DataType::Int8, true),
1559                ])),
1560                true,
1561            )
1562            .into(),
1563            true,
1564        );
1565
1566        // Succeeds if both maps have the same key and value types, disregards names and nullability
1567        assert!(DFSchema::datatype_is_semantically_equal(
1568            &map_field,
1569            &DataType::Map(
1570                Field::new(
1571                    "pairs",
1572                    DataType::Struct(Fields::from(vec![
1573                        Field::new("one", DataType::Int8, false),
1574                        Field::new("two", DataType::Int8, false)
1575                    ])),
1576                    true
1577                )
1578                .into(),
1579                true
1580            )
1581        ));
1582        // Fails if value type is different
1583        assert!(!DFSchema::datatype_is_semantically_equal(
1584            &map_field,
1585            &DataType::Map(
1586                Field::new(
1587                    "entries",
1588                    DataType::Struct(Fields::from(vec![
1589                        Field::new("key", DataType::Int8, false),
1590                        Field::new("value", DataType::Int16, true)
1591                    ])),
1592                    true
1593                )
1594                .into(),
1595                true
1596            )
1597        ));
1598
1599        // Fails if key type is different
1600        assert!(!DFSchema::datatype_is_semantically_equal(
1601            &map_field,
1602            &DataType::Map(
1603                Field::new(
1604                    "entries",
1605                    DataType::Struct(Fields::from(vec![
1606                        Field::new("key", DataType::Int16, false),
1607                        Field::new("value", DataType::Int8, true)
1608                    ])),
1609                    true
1610                )
1611                .into(),
1612                true
1613            )
1614        ));
1615
1616        // Test structs
1617
1618        let struct_field = DataType::Struct(Fields::from(vec![
1619            Field::new("a", DataType::Int8, true),
1620            Field::new("b", DataType::Int8, true),
1621        ]));
1622
1623        // Succeeds if both have same names and datatypes, ignores nullability
1624        assert!(DFSchema::datatype_is_logically_equal(
1625            &struct_field,
1626            &DataType::Struct(Fields::from(vec![
1627                Field::new("a", DataType::Int8, false),
1628                Field::new("b", DataType::Int8, true),
1629            ]))
1630        ));
1631
1632        // Fails if field names are different
1633        assert!(!DFSchema::datatype_is_logically_equal(
1634            &struct_field,
1635            &DataType::Struct(Fields::from(vec![
1636                Field::new("x", DataType::Int8, true),
1637                Field::new("y", DataType::Int8, true),
1638            ]))
1639        ));
1640
1641        // Fails if types are different
1642        assert!(!DFSchema::datatype_is_logically_equal(
1643            &struct_field,
1644            &DataType::Struct(Fields::from(vec![
1645                Field::new("a", DataType::Int16, true),
1646                Field::new("b", DataType::Int8, true),
1647            ]))
1648        ));
1649
1650        // Fails if more or less fields
1651        assert!(!DFSchema::datatype_is_logically_equal(
1652            &struct_field,
1653            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1654        ));
1655    }
1656
1657    #[test]
1658    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1659        // Dictionary is not semantically equal to its value type
1660        assert!(!DFSchema::datatype_is_semantically_equal(
1661            &DataType::Utf8,
1662            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1663        ));
1664    }
1665
1666    fn test_schema_2() -> Schema {
1667        Schema::new(vec![
1668            Field::new("c100", DataType::Boolean, true),
1669            Field::new("c101", DataType::Boolean, true),
1670        ])
1671    }
1672
1673    fn test_metadata() -> HashMap<String, String> {
1674        test_metadata_n(2)
1675    }
1676
1677    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1678        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1679    }
1680}