1use std::sync::Arc;
5
6use arrow_schema::{DataType, Fields};
7use bytes::Bytes;
8use bytes::BytesMut;
9use futures::{future::BoxFuture, FutureExt};
10use lance_arrow::DataTypeExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::data::BlockInfo;
15use crate::data::FixedSizeListBlock;
16use crate::format::ProtobufUtils;
17use crate::{
18 buffer::LanceBuffer,
19 data::{DataBlock, FixedWidthDataBlock, StructDataBlock},
20 decoder::{PageScheduler, PrimitivePageDecoder},
21 encoder::{ArrayEncoder, EncodedArray},
22 EncodingsIo,
23};
24
25#[derive(Debug)]
26pub struct PackedStructPageScheduler {
27 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
31 fields: Fields,
32 buffer_offset: u64,
33}
34
35impl PackedStructPageScheduler {
36 pub fn new(
37 _inner_schedulers: Vec<Box<dyn PageScheduler>>,
38 struct_datatype: DataType,
39 buffer_offset: u64,
40 ) -> Self {
41 let DataType::Struct(fields) = struct_datatype else {
42 panic!("Struct datatype expected");
43 };
44 Self {
45 _inner_schedulers,
46 fields,
47 buffer_offset,
48 }
49 }
50}
51
52impl PageScheduler for PackedStructPageScheduler {
53 fn schedule_ranges(
54 &self,
55 ranges: &[std::ops::Range<u64>],
56 scheduler: &Arc<dyn EncodingsIo>,
57 top_level_row: u64,
58 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
59 let mut total_bytes_per_row: u64 = 0;
60
61 for field in &self.fields {
62 let bytes_per_field = field.data_type().byte_width() as u64;
63 total_bytes_per_row += bytes_per_field;
64 }
65
66 let byte_ranges = ranges
72 .iter()
73 .map(|range| {
74 let start = self.buffer_offset + (range.start * total_bytes_per_row);
75 let end = self.buffer_offset + (range.end * total_bytes_per_row);
76 start..end
77 })
78 .collect::<Vec<_>>();
79
80 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
82
83 let copy_struct_fields = self.fields.clone();
84
85 tokio::spawn(async move {
86 let bytes = bytes.await?;
87
88 let mut combined_bytes = BytesMut::default();
89 for byte_slice in bytes {
90 combined_bytes.extend_from_slice(&byte_slice);
91 }
92
93 Ok(Box::new(PackedStructPageDecoder {
94 data: combined_bytes.freeze(),
95 fields: copy_struct_fields,
96 total_bytes_per_row: total_bytes_per_row as usize,
97 }) as Box<dyn PrimitivePageDecoder>)
98 })
99 .map(|join_handle| join_handle.unwrap())
100 .boxed()
101 }
102}
103
104struct PackedStructPageDecoder {
105 data: Bytes,
106 fields: Fields,
107 total_bytes_per_row: usize,
108}
109
110impl PrimitivePageDecoder for PackedStructPageDecoder {
111 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
112 let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row;
127
128 let mut children = Vec::with_capacity(self.fields.len());
129
130 let mut start_index = 0;
131
132 for field in &self.fields {
133 let bytes_per_field = field.data_type().byte_width();
134 let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize);
135
136 let mut byte_index = start_index;
137
138 for _ in 0..num_rows {
139 let start = bytes_to_skip + byte_index;
140 field_bytes.extend_from_slice(&self.data[start..(start + bytes_per_field)]);
141 byte_index += self.total_bytes_per_row;
142 }
143
144 start_index += bytes_per_field;
145 let child_block = FixedWidthDataBlock {
146 data: LanceBuffer::from(field_bytes),
147 bits_per_value: bytes_per_field as u64 * 8,
148 num_values: num_rows,
149 block_info: BlockInfo::new(),
150 };
151 let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type());
152 children.push(child_block);
153 }
154 Ok(DataBlock::Struct(StructDataBlock {
155 children,
156 block_info: BlockInfo::default(),
157 }))
158 }
159}
160
161#[derive(Debug)]
162pub struct PackedStructEncoder {
163 inner_encoders: Vec<Box<dyn ArrayEncoder>>,
164}
165
166impl PackedStructEncoder {
167 pub fn new(inner_encoders: Vec<Box<dyn ArrayEncoder>>) -> Self {
168 Self { inner_encoders }
169 }
170}
171
172impl ArrayEncoder for PackedStructEncoder {
173 fn encode(
174 &self,
175 data: DataBlock,
176 data_type: &DataType,
177 buffer_index: &mut u32,
178 ) -> Result<EncodedArray> {
179 let struct_data = data.as_struct().unwrap();
180
181 let DataType::Struct(child_types) = data_type else {
182 panic!("Struct datatype expected");
183 };
184
185 let mut encoded_fields = Vec::with_capacity(struct_data.children.len());
187 for ((child, encoder), child_type) in struct_data
188 .children
189 .into_iter()
190 .zip(&self.inner_encoders)
191 .zip(child_types)
192 {
193 encoded_fields.push(encoder.encode(child, child_type.data_type(), &mut 0)?);
194 }
195
196 let (encoded_data_vec, child_encodings): (Vec<_>, Vec<_>) = encoded_fields
197 .into_iter()
198 .map(|field| (field.data, field.encoding))
199 .unzip();
200
201 let fixed_fields = encoded_data_vec
207 .into_iter()
208 .map(|child| match child {
209 DataBlock::FixedWidth(fixed) => Ok(fixed),
210 DataBlock::FixedSizeList(fixed_size_list) => {
211 let flattened = fixed_size_list.try_into_flat().ok_or_else(|| {
212 Error::invalid_input(
213 "Packed struct encoder cannot pack nullable fixed-width data blocks",
214 location!(),
215 )
216 })?;
217 Ok(flattened)
218 }
219 _ => Err(Error::invalid_input(
220 "Packed struct encoder currently only implemented for fixed-width data blocks",
221 location!(),
222 )),
223 })
224 .collect::<Result<Vec<_>>>()?;
225 let total_bits_per_value = fixed_fields.iter().map(|f| f.bits_per_value).sum::<u64>();
226
227 let num_values = fixed_fields[0].num_values;
228 debug_assert!(fixed_fields
229 .iter()
230 .all(|field| field.num_values == num_values));
231
232 let zipped_input = fixed_fields
233 .into_iter()
234 .map(|field| (field.data, field.bits_per_value))
235 .collect::<Vec<_>>();
236 let zipped = LanceBuffer::zip_into_one(zipped_input, num_values)?;
237
238 let index = *buffer_index;
240 *buffer_index += 1;
241
242 let packed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
243 data: zipped,
244 bits_per_value: total_bits_per_value,
245 num_values,
246 block_info: BlockInfo::new(),
247 });
248
249 let encoding = ProtobufUtils::packed_struct(child_encodings, index);
250
251 Ok(EncodedArray {
252 data: packed_data,
253 encoding,
254 })
255 }
256}
257
258#[cfg(test)]
259pub mod tests {
260
261 use arrow::array::ArrayData;
262 use arrow_array::{
263 Array, ArrayRef, FixedSizeListArray, Int32Array, StructArray, UInt64Array, UInt8Array,
264 };
265 use arrow_schema::{DataType, Field, Fields};
266 use std::{collections::HashMap, sync::Arc, vec};
267
268 use crate::{
269 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
270 version::LanceFileVersion,
271 };
272 use rstest::rstest;
273
274 #[rstest]
275 #[test_log::test(tokio::test)]
276 async fn test_random_packed_struct(
277 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
278 ) {
279 let data_type = DataType::Struct(Fields::from(vec![
280 Field::new("a", DataType::UInt64, false),
281 Field::new("b", DataType::UInt32, false),
282 ]));
283 let mut metadata = HashMap::new();
284 metadata.insert("packed".to_string(), "true".to_string());
285
286 let field = Field::new("", data_type, false).with_metadata(metadata);
287
288 check_round_trip_encoding_random(field, version).await;
289 }
290
291 #[rstest]
292 #[test_log::test(tokio::test)]
293 async fn test_specific_packed_struct(
294 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
295 ) {
296 let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4]));
297 let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
298 let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12]));
299
300 let struct_array1 = Arc::new(StructArray::from(vec![
301 (
302 Arc::new(Field::new("x", DataType::UInt64, false)),
303 array1.clone() as ArrayRef,
304 ),
305 (
306 Arc::new(Field::new("y", DataType::Int32, false)),
307 array2.clone() as ArrayRef,
308 ),
309 (
310 Arc::new(Field::new("z", DataType::UInt8, false)),
311 array3.clone() as ArrayRef,
312 ),
313 ]));
314
315 let array4 = Arc::new(UInt64Array::from(vec![13, 14, 15, 16]));
316 let array5 = Arc::new(Int32Array::from(vec![17, 18, 19, 20]));
317 let array6 = Arc::new(UInt8Array::from(vec![21, 22, 23, 24]));
318
319 let struct_array2 = Arc::new(StructArray::from(vec![
320 (
321 Arc::new(Field::new("x", DataType::UInt64, false)),
322 array4.clone() as ArrayRef,
323 ),
324 (
325 Arc::new(Field::new("y", DataType::Int32, false)),
326 array5.clone() as ArrayRef,
327 ),
328 (
329 Arc::new(Field::new("z", DataType::UInt8, false)),
330 array6.clone() as ArrayRef,
331 ),
332 ]));
333
334 let test_cases = TestCases::default()
335 .with_range(0..2)
336 .with_range(0..6)
337 .with_range(1..4)
338 .with_indices(vec![1, 3, 7])
339 .with_file_version(version);
340
341 let mut metadata = HashMap::new();
342 metadata.insert("packed".to_string(), "true".to_string());
343
344 check_round_trip_encoding_of_data(
345 vec![struct_array1, struct_array2],
346 &test_cases,
347 metadata,
348 )
349 .await;
350 }
351
352 #[rstest]
355 #[test_log::test(tokio::test)]
356 async fn test_fsl_packed_struct(
357 #[values()]
358 version: LanceFileVersion,
359 ) {
360 let int_array = Arc::new(Int32Array::from(vec![12, 13, 14, 15]));
361
362 let list_data_type =
363 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3);
364 let inner_array = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
365 let list_data = ArrayData::builder(list_data_type.clone())
366 .len(4)
367 .add_child_data(inner_array.into_data())
368 .build()
369 .unwrap();
370 let list_array = FixedSizeListArray::from(list_data);
371
372 let struct_array = Arc::new(StructArray::from(vec![
373 (
374 Arc::new(Field::new("x", list_data_type.clone(), false)),
375 Arc::new(list_array) as ArrayRef,
376 ),
377 (
378 Arc::new(Field::new("x", DataType::Int32, false)),
379 int_array as ArrayRef,
380 ),
381 ]));
382
383 let test_cases = TestCases::default()
384 .with_range(1..3)
385 .with_range(0..1)
386 .with_range(2..4)
387 .with_indices(vec![0, 2, 3])
388 .with_file_version(version);
389
390 let mut metadata = HashMap::new();
391 metadata.insert("packed".to_string(), "true".to_string());
392
393 check_round_trip_encoding_of_data(vec![struct_array], &test_cases, metadata).await;
394 }
395}