lance_file/
datatypes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5
6use arrow_schema::DataType;
7use async_recursion::async_recursion;
8use lance_arrow::bfloat16::ARROW_EXT_NAME_KEY;
9use lance_arrow::DataTypeExt;
10use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
11use lance_core::{Error, Result};
12use lance_io::traits::Reader;
13use lance_io::utils::{read_binary_array, read_fixed_stride_array};
14use snafu::location;
15
16use crate::format::pb;
17
18#[allow(clippy::fallible_impl_from)]
19impl From<&pb::Field> for Field {
20    fn from(field: &pb::Field) -> Self {
21        let mut lance_metadata: HashMap<String, String> = field
22            .metadata
23            .iter()
24            .map(|(key, value)| {
25                let string_value = String::from_utf8_lossy(value).to_string();
26                (key.clone(), string_value)
27            })
28            .collect();
29        if !field.extension_name.is_empty() {
30            lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31        }
32        Self {
33            name: field.name.clone(),
34            id: field.id,
35            parent_id: field.parent_id,
36            logical_type: LogicalType::from(field.logical_type.as_str()),
37            metadata: lance_metadata,
38            encoding: match field.encoding {
39                1 => Some(Encoding::Plain),
40                2 => Some(Encoding::VarBinary),
41                3 => Some(Encoding::Dictionary),
42                4 => Some(Encoding::RLE),
43                _ => None,
44            },
45            nullable: field.nullable,
46            children: vec![],
47            dictionary: field.dictionary.as_ref().map(Dictionary::from),
48            storage_class: field.storage_class.parse().unwrap(),
49        }
50    }
51}
52
53impl From<&Field> for pb::Field {
54    fn from(field: &Field) -> Self {
55        let pb_metadata = field
56            .metadata
57            .clone()
58            .into_iter()
59            .map(|(key, value)| (key, value.into_bytes()))
60            .collect();
61        Self {
62            id: field.id,
63            parent_id: field.parent_id,
64            name: field.name.clone(),
65            logical_type: field.logical_type.to_string(),
66            encoding: match field.encoding {
67                Some(Encoding::Plain) => 1,
68                Some(Encoding::VarBinary) => 2,
69                Some(Encoding::Dictionary) => 3,
70                Some(Encoding::RLE) => 4,
71                _ => 0,
72            },
73            nullable: field.nullable,
74            dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
75            metadata: pb_metadata,
76            extension_name: field
77                .extension_name()
78                .map(|name| name.to_owned())
79                .unwrap_or_default(),
80            r#type: 0,
81            storage_class: field.storage_class.to_string(),
82        }
83    }
84}
85
86pub struct Fields(pub Vec<pb::Field>);
87
88impl From<&Field> for Fields {
89    fn from(field: &Field) -> Self {
90        let mut protos = vec![pb::Field::from(field)];
91        protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
92        Self(protos)
93    }
94}
95
96/// Convert list of protobuf `Field` to a Schema.
97impl From<&Fields> for Schema {
98    fn from(fields: &Fields) -> Self {
99        let mut schema = Self {
100            fields: vec![],
101            metadata: HashMap::default(),
102        };
103
104        fields.0.iter().for_each(|f| {
105            if f.parent_id == -1 {
106                schema.fields.push(Field::from(f));
107            } else {
108                let parent = schema.mut_field_by_id(f.parent_id).unwrap();
109                parent.children.push(Field::from(f));
110            }
111        });
112
113        schema
114    }
115}
116
117pub struct FieldsWithMeta {
118    pub fields: Fields,
119    pub metadata: HashMap<String, Vec<u8>>,
120}
121
122/// Convert list of protobuf `Field` and Metadata to a Schema.
123impl From<FieldsWithMeta> for Schema {
124    fn from(fields_with_meta: FieldsWithMeta) -> Self {
125        let lance_metadata = fields_with_meta
126            .metadata
127            .into_iter()
128            .map(|(key, value)| {
129                let string_value = String::from_utf8_lossy(&value).to_string();
130                (key, string_value)
131            })
132            .collect();
133
134        let schema_with_fields = Self::from(&fields_with_meta.fields);
135        Self {
136            fields: schema_with_fields.fields,
137            metadata: lance_metadata,
138        }
139    }
140}
141
142/// Convert a Schema to a list of protobuf Field.
143impl From<&Schema> for Fields {
144    fn from(schema: &Schema) -> Self {
145        let mut protos = vec![];
146        schema.fields.iter().for_each(|f| {
147            protos.extend(Self::from(f).0);
148        });
149        Self(protos)
150    }
151}
152
153/// Convert a Schema to a list of protobuf Field and Metadata
154impl From<&Schema> for FieldsWithMeta {
155    fn from(schema: &Schema) -> Self {
156        let fields = schema.into();
157        let metadata = schema
158            .metadata
159            .clone()
160            .into_iter()
161            .map(|(key, value)| (key, value.into_bytes()))
162            .collect();
163        Self { fields, metadata }
164    }
165}
166
167impl From<&pb::Dictionary> for Dictionary {
168    fn from(proto: &pb::Dictionary) -> Self {
169        Self {
170            offset: proto.offset as usize,
171            length: proto.length as usize,
172            values: None,
173        }
174    }
175}
176
177impl From<&Dictionary> for pb::Dictionary {
178    fn from(d: &Dictionary) -> Self {
179        Self {
180            offset: d.offset as i64,
181            length: d.length as i64,
182        }
183    }
184}
185
186impl From<Encoding> for pb::Encoding {
187    fn from(e: Encoding) -> Self {
188        match e {
189            Encoding::Plain => Self::Plain,
190            Encoding::VarBinary => Self::VarBinary,
191            Encoding::Dictionary => Self::Dictionary,
192            Encoding::RLE => Self::Rle,
193        }
194    }
195}
196
197#[async_recursion]
198async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
199    if let DataType::Dictionary(_, value_type) = field.data_type() {
200        assert!(field.dictionary.is_some());
201        if let Some(dict_info) = field.dictionary.as_mut() {
202            use DataType::*;
203            match value_type.as_ref() {
204                _ if value_type.is_binary_like() => {
205                    dict_info.values = Some(
206                        read_binary_array(
207                            reader,
208                            value_type.as_ref(),
209                            true, // Empty values are null
210                            dict_info.offset,
211                            dict_info.length,
212                            ..,
213                        )
214                        .await?,
215                    );
216                }
217                Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
218                    dict_info.values = Some(
219                        read_fixed_stride_array(
220                            reader,
221                            value_type.as_ref(),
222                            dict_info.offset,
223                            dict_info.length,
224                            ..,
225                        )
226                        .await?,
227                    );
228                }
229                _ => {
230                    return Err(Error::Schema {
231                        message: format!(
232                            "Does not support {} as dictionary value type",
233                            value_type
234                        ),
235                        location: location!(),
236                    });
237                }
238            }
239        } else {
240            panic!("Should not reach here: dictionary field does not load dictionary info")
241        }
242        Ok(())
243    } else {
244        for child in field.children.as_mut_slice() {
245            load_field_dictionary(child, reader).await?;
246        }
247        Ok(())
248    }
249}
250
251/// Load dictionary value array from manifest files.
252// TODO: pub(crate)
253pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
254    for field in schema.fields.as_mut_slice() {
255        load_field_dictionary(field, reader).await?;
256    }
257    Ok(())
258}
259
260#[cfg(test)]
261mod tests {
262    use std::collections::HashMap;
263
264    use arrow_schema::DataType;
265    use arrow_schema::Field as ArrowField;
266    use arrow_schema::Fields as ArrowFields;
267    use arrow_schema::Schema as ArrowSchema;
268    use lance_core::datatypes::Schema;
269
270    use crate::datatypes::Fields;
271    use crate::datatypes::FieldsWithMeta;
272
273    #[test]
274    fn test_schema_set_ids() {
275        let arrow_schema = ArrowSchema::new(vec![
276            ArrowField::new("a", DataType::Int32, false),
277            ArrowField::new(
278                "b",
279                DataType::Struct(ArrowFields::from(vec![
280                    ArrowField::new("f1", DataType::Utf8, true),
281                    ArrowField::new("f2", DataType::Boolean, false),
282                    ArrowField::new("f3", DataType::Float32, false),
283                ])),
284                true,
285            ),
286            ArrowField::new("c", DataType::Float64, false),
287        ]);
288        let schema = Schema::try_from(&arrow_schema).unwrap();
289
290        let protos: Fields = (&schema).into();
291        assert_eq!(
292            protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
293            (0..6).collect::<Vec<_>>()
294        );
295    }
296
297    #[test]
298    fn test_schema_metadata() {
299        let mut metadata: HashMap<String, String> = HashMap::new();
300        metadata.insert(String::from("k1"), String::from("v1"));
301        metadata.insert(String::from("k2"), String::from("v2"));
302
303        let arrow_schema = ArrowSchema::new_with_metadata(
304            vec![ArrowField::new("a", DataType::Int32, false)],
305            metadata,
306        );
307
308        let expected_schema = Schema::try_from(&arrow_schema).unwrap();
309        let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
310
311        let schema = Schema::from(fields_with_meta);
312        assert_eq!(expected_schema, schema);
313    }
314}