1use std::collections::HashMap;
5
6use arrow_schema::DataType;
7use async_recursion::async_recursion;
8use lance_arrow::bfloat16::ARROW_EXT_NAME_KEY;
9use lance_arrow::DataTypeExt;
10use lance_core::datatypes::{Dictionary, Encoding, Field, LogicalType, Schema};
11use lance_core::{Error, Result};
12use lance_io::traits::Reader;
13use lance_io::utils::{read_binary_array, read_fixed_stride_array};
14use snafu::location;
15
16use crate::format::pb;
17
18#[allow(clippy::fallible_impl_from)]
19impl From<&pb::Field> for Field {
20 fn from(field: &pb::Field) -> Self {
21 let mut lance_metadata: HashMap<String, String> = field
22 .metadata
23 .iter()
24 .map(|(key, value)| {
25 let string_value = String::from_utf8_lossy(value).to_string();
26 (key.clone(), string_value)
27 })
28 .collect();
29 if !field.extension_name.is_empty() {
30 lance_metadata.insert(ARROW_EXT_NAME_KEY.to_string(), field.extension_name.clone());
31 }
32 Self {
33 name: field.name.clone(),
34 id: field.id,
35 parent_id: field.parent_id,
36 logical_type: LogicalType::from(field.logical_type.as_str()),
37 metadata: lance_metadata,
38 encoding: match field.encoding {
39 1 => Some(Encoding::Plain),
40 2 => Some(Encoding::VarBinary),
41 3 => Some(Encoding::Dictionary),
42 4 => Some(Encoding::RLE),
43 _ => None,
44 },
45 nullable: field.nullable,
46 children: vec![],
47 dictionary: field.dictionary.as_ref().map(Dictionary::from),
48 storage_class: field.storage_class.parse().unwrap(),
49 }
50 }
51}
52
53impl From<&Field> for pb::Field {
54 fn from(field: &Field) -> Self {
55 let pb_metadata = field
56 .metadata
57 .clone()
58 .into_iter()
59 .map(|(key, value)| (key, value.into_bytes()))
60 .collect();
61 Self {
62 id: field.id,
63 parent_id: field.parent_id,
64 name: field.name.clone(),
65 logical_type: field.logical_type.to_string(),
66 encoding: match field.encoding {
67 Some(Encoding::Plain) => 1,
68 Some(Encoding::VarBinary) => 2,
69 Some(Encoding::Dictionary) => 3,
70 Some(Encoding::RLE) => 4,
71 _ => 0,
72 },
73 nullable: field.nullable,
74 dictionary: field.dictionary.as_ref().map(pb::Dictionary::from),
75 metadata: pb_metadata,
76 extension_name: field
77 .extension_name()
78 .map(|name| name.to_owned())
79 .unwrap_or_default(),
80 r#type: 0,
81 storage_class: field.storage_class.to_string(),
82 }
83 }
84}
85
86pub struct Fields(pub Vec<pb::Field>);
87
88impl From<&Field> for Fields {
89 fn from(field: &Field) -> Self {
90 let mut protos = vec![pb::Field::from(field)];
91 protos.extend(field.children.iter().flat_map(|val| Self::from(val).0));
92 Self(protos)
93 }
94}
95
96impl From<&Fields> for Schema {
98 fn from(fields: &Fields) -> Self {
99 let mut schema = Self {
100 fields: vec![],
101 metadata: HashMap::default(),
102 };
103
104 fields.0.iter().for_each(|f| {
105 if f.parent_id == -1 {
106 schema.fields.push(Field::from(f));
107 } else {
108 let parent = schema.mut_field_by_id(f.parent_id).unwrap();
109 parent.children.push(Field::from(f));
110 }
111 });
112
113 schema
114 }
115}
116
117pub struct FieldsWithMeta {
118 pub fields: Fields,
119 pub metadata: HashMap<String, Vec<u8>>,
120}
121
122impl From<FieldsWithMeta> for Schema {
124 fn from(fields_with_meta: FieldsWithMeta) -> Self {
125 let lance_metadata = fields_with_meta
126 .metadata
127 .into_iter()
128 .map(|(key, value)| {
129 let string_value = String::from_utf8_lossy(&value).to_string();
130 (key, string_value)
131 })
132 .collect();
133
134 let schema_with_fields = Self::from(&fields_with_meta.fields);
135 Self {
136 fields: schema_with_fields.fields,
137 metadata: lance_metadata,
138 }
139 }
140}
141
142impl From<&Schema> for Fields {
144 fn from(schema: &Schema) -> Self {
145 let mut protos = vec![];
146 schema.fields.iter().for_each(|f| {
147 protos.extend(Self::from(f).0);
148 });
149 Self(protos)
150 }
151}
152
153impl From<&Schema> for FieldsWithMeta {
155 fn from(schema: &Schema) -> Self {
156 let fields = schema.into();
157 let metadata = schema
158 .metadata
159 .clone()
160 .into_iter()
161 .map(|(key, value)| (key, value.into_bytes()))
162 .collect();
163 Self { fields, metadata }
164 }
165}
166
167impl From<&pb::Dictionary> for Dictionary {
168 fn from(proto: &pb::Dictionary) -> Self {
169 Self {
170 offset: proto.offset as usize,
171 length: proto.length as usize,
172 values: None,
173 }
174 }
175}
176
177impl From<&Dictionary> for pb::Dictionary {
178 fn from(d: &Dictionary) -> Self {
179 Self {
180 offset: d.offset as i64,
181 length: d.length as i64,
182 }
183 }
184}
185
186impl From<Encoding> for pb::Encoding {
187 fn from(e: Encoding) -> Self {
188 match e {
189 Encoding::Plain => Self::Plain,
190 Encoding::VarBinary => Self::VarBinary,
191 Encoding::Dictionary => Self::Dictionary,
192 Encoding::RLE => Self::Rle,
193 }
194 }
195}
196
197#[async_recursion]
198async fn load_field_dictionary<'a>(field: &mut Field, reader: &dyn Reader) -> Result<()> {
199 if let DataType::Dictionary(_, value_type) = field.data_type() {
200 assert!(field.dictionary.is_some());
201 if let Some(dict_info) = field.dictionary.as_mut() {
202 use DataType::*;
203 match value_type.as_ref() {
204 _ if value_type.is_binary_like() => {
205 dict_info.values = Some(
206 read_binary_array(
207 reader,
208 value_type.as_ref(),
209 true, dict_info.offset,
211 dict_info.length,
212 ..,
213 )
214 .await?,
215 );
216 }
217 Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => {
218 dict_info.values = Some(
219 read_fixed_stride_array(
220 reader,
221 value_type.as_ref(),
222 dict_info.offset,
223 dict_info.length,
224 ..,
225 )
226 .await?,
227 );
228 }
229 _ => {
230 return Err(Error::Schema {
231 message: format!(
232 "Does not support {} as dictionary value type",
233 value_type
234 ),
235 location: location!(),
236 });
237 }
238 }
239 } else {
240 panic!("Should not reach here: dictionary field does not load dictionary info")
241 }
242 Ok(())
243 } else {
244 for child in field.children.as_mut_slice() {
245 load_field_dictionary(child, reader).await?;
246 }
247 Ok(())
248 }
249}
250
251pub async fn populate_schema_dictionary(schema: &mut Schema, reader: &dyn Reader) -> Result<()> {
254 for field in schema.fields.as_mut_slice() {
255 load_field_dictionary(field, reader).await?;
256 }
257 Ok(())
258}
259
260#[cfg(test)]
261mod tests {
262 use std::collections::HashMap;
263
264 use arrow_schema::DataType;
265 use arrow_schema::Field as ArrowField;
266 use arrow_schema::Fields as ArrowFields;
267 use arrow_schema::Schema as ArrowSchema;
268 use lance_core::datatypes::Schema;
269
270 use crate::datatypes::Fields;
271 use crate::datatypes::FieldsWithMeta;
272
273 #[test]
274 fn test_schema_set_ids() {
275 let arrow_schema = ArrowSchema::new(vec![
276 ArrowField::new("a", DataType::Int32, false),
277 ArrowField::new(
278 "b",
279 DataType::Struct(ArrowFields::from(vec![
280 ArrowField::new("f1", DataType::Utf8, true),
281 ArrowField::new("f2", DataType::Boolean, false),
282 ArrowField::new("f3", DataType::Float32, false),
283 ])),
284 true,
285 ),
286 ArrowField::new("c", DataType::Float64, false),
287 ]);
288 let schema = Schema::try_from(&arrow_schema).unwrap();
289
290 let protos: Fields = (&schema).into();
291 assert_eq!(
292 protos.0.iter().map(|p| p.id).collect::<Vec<_>>(),
293 (0..6).collect::<Vec<_>>()
294 );
295 }
296
297 #[test]
298 fn test_schema_metadata() {
299 let mut metadata: HashMap<String, String> = HashMap::new();
300 metadata.insert(String::from("k1"), String::from("v1"));
301 metadata.insert(String::from("k2"), String::from("v2"));
302
303 let arrow_schema = ArrowSchema::new_with_metadata(
304 vec![ArrowField::new("a", DataType::Int32, false)],
305 metadata,
306 );
307
308 let expected_schema = Schema::try_from(&arrow_schema).unwrap();
309 let fields_with_meta: FieldsWithMeta = (&expected_schema).into();
310
311 let schema = Schema::from(fields_with_meta);
312 assert_eq!(expected_schema, schema);
313 }
314}