lance_encoding/encodings/physical/
packed_struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_schema::{DataType, Fields};
7use bytes::Bytes;
8use bytes::BytesMut;
9use futures::{future::BoxFuture, FutureExt};
10use lance_arrow::DataTypeExt;
11use lance_core::{Error, Result};
12use snafu::location;
13
14use crate::data::BlockInfo;
15use crate::data::FixedSizeListBlock;
16use crate::format::ProtobufUtils;
17use crate::{
18    buffer::LanceBuffer,
19    data::{DataBlock, FixedWidthDataBlock, StructDataBlock},
20    decoder::{PageScheduler, PrimitivePageDecoder},
21    encoder::{ArrayEncoder, EncodedArray},
22    EncodingsIo,
23};
24
25#[derive(Debug)]
26pub struct PackedStructPageScheduler {
27    // We don't actually need these schedulers right now since we decode all the field bytes directly
28    // But they can be useful if we actually need to use the decoders for the inner fields later
29    // e.g. once bitpacking is added
30    _inner_schedulers: Vec<Box<dyn PageScheduler>>,
31    fields: Fields,
32    buffer_offset: u64,
33}
34
35impl PackedStructPageScheduler {
36    pub fn new(
37        _inner_schedulers: Vec<Box<dyn PageScheduler>>,
38        struct_datatype: DataType,
39        buffer_offset: u64,
40    ) -> Self {
41        let DataType::Struct(fields) = struct_datatype else {
42            panic!("Struct datatype expected");
43        };
44        Self {
45            _inner_schedulers,
46            fields,
47            buffer_offset,
48        }
49    }
50}
51
52impl PageScheduler for PackedStructPageScheduler {
53    fn schedule_ranges(
54        &self,
55        ranges: &[std::ops::Range<u64>],
56        scheduler: &Arc<dyn EncodingsIo>,
57        top_level_row: u64,
58    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
59        let mut total_bytes_per_row: u64 = 0;
60
61        for field in &self.fields {
62            let bytes_per_field = field.data_type().byte_width() as u64;
63            total_bytes_per_row += bytes_per_field;
64        }
65
66        // Parts of the arrays in a page may be encoded in different encoding tasks
67        // In that case decoding two different sets of rows can result in the same ranges parameter being passed in
68        // e.g. we may get ranges[0..2] and ranges[0..2] to decode 4 rows through 2 tasks
69        // So to get the correct byte ranges we need to know the position of the buffer in the page (i.e. the buffer offset)
70        // This is computed directly from the buffer stored in the protobuf
71        let byte_ranges = ranges
72            .iter()
73            .map(|range| {
74                let start = self.buffer_offset + (range.start * total_bytes_per_row);
75                let end = self.buffer_offset + (range.end * total_bytes_per_row);
76                start..end
77            })
78            .collect::<Vec<_>>();
79
80        // Directly creates a future to decode the bytes
81        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
82
83        let copy_struct_fields = self.fields.clone();
84
85        tokio::spawn(async move {
86            let bytes = bytes.await?;
87
88            let mut combined_bytes = BytesMut::default();
89            for byte_slice in bytes {
90                combined_bytes.extend_from_slice(&byte_slice);
91            }
92
93            Ok(Box::new(PackedStructPageDecoder {
94                data: combined_bytes.freeze(),
95                fields: copy_struct_fields,
96                total_bytes_per_row: total_bytes_per_row as usize,
97            }) as Box<dyn PrimitivePageDecoder>)
98        })
99        .map(|join_handle| join_handle.unwrap())
100        .boxed()
101    }
102}
103
104struct PackedStructPageDecoder {
105    data: Bytes,
106    fields: Fields,
107    total_bytes_per_row: usize,
108}
109
110impl PrimitivePageDecoder for PackedStructPageDecoder {
111    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
112        // Decoding workflow:
113        // rows 0-2: {x: [1, 2, 3], y: [4, 5, 6], z: [7, 8, 9]}
114        // rows 3-5: {x: [10, 11, 12], y: [13, 14, 15], z: [16, 17, 18]}
115        // packed encoding: [
116        // [1, 4, 7, 2, 5, 8, 3, 6, 9],
117        // [10, 13, 16, 11, 14, 17, 12, 15, 18]
118        // ]
119        // suppose bytes_per_field=1, 4, 8 for fields x, y, and z, respectively.
120        // Then total_bytes_per_row = 13
121        // Suppose rows_to_skip=1 and num_rows=2. Then we will slice bytes 13 to 39.
122        // Now we have [2, 5, 8, 3, 6, 9]
123        // We rearrange this to get [BytesMut(2, 3), BytesMut(5, 6), BytesMut(8, 9)] as a Vec<BytesMut>
124        // This is used to reconstruct the struct array later
125
126        let bytes_to_skip = (rows_to_skip as usize) * self.total_bytes_per_row;
127
128        let mut children = Vec::with_capacity(self.fields.len());
129
130        let mut start_index = 0;
131
132        for field in &self.fields {
133            let bytes_per_field = field.data_type().byte_width();
134            let mut field_bytes = Vec::with_capacity(bytes_per_field * num_rows as usize);
135
136            let mut byte_index = start_index;
137
138            for _ in 0..num_rows {
139                let start = bytes_to_skip + byte_index;
140                field_bytes.extend_from_slice(&self.data[start..(start + bytes_per_field)]);
141                byte_index += self.total_bytes_per_row;
142            }
143
144            start_index += bytes_per_field;
145            let child_block = FixedWidthDataBlock {
146                data: LanceBuffer::from(field_bytes),
147                bits_per_value: bytes_per_field as u64 * 8,
148                num_values: num_rows,
149                block_info: BlockInfo::new(),
150            };
151            let child_block = FixedSizeListBlock::from_flat(child_block, field.data_type());
152            children.push(child_block);
153        }
154        Ok(DataBlock::Struct(StructDataBlock {
155            children,
156            block_info: BlockInfo::default(),
157        }))
158    }
159}
160
161#[derive(Debug)]
162pub struct PackedStructEncoder {
163    inner_encoders: Vec<Box<dyn ArrayEncoder>>,
164}
165
166impl PackedStructEncoder {
167    pub fn new(inner_encoders: Vec<Box<dyn ArrayEncoder>>) -> Self {
168        Self { inner_encoders }
169    }
170}
171
172impl ArrayEncoder for PackedStructEncoder {
173    fn encode(
174        &self,
175        data: DataBlock,
176        data_type: &DataType,
177        buffer_index: &mut u32,
178    ) -> Result<EncodedArray> {
179        let struct_data = data.as_struct().unwrap();
180
181        let DataType::Struct(child_types) = data_type else {
182            panic!("Struct datatype expected");
183        };
184
185        // Encode individual fields
186        let mut encoded_fields = Vec::with_capacity(struct_data.children.len());
187        for ((child, encoder), child_type) in struct_data
188            .children
189            .into_iter()
190            .zip(&self.inner_encoders)
191            .zip(child_types)
192        {
193            encoded_fields.push(encoder.encode(child, child_type.data_type(), &mut 0)?);
194        }
195
196        let (encoded_data_vec, child_encodings): (Vec<_>, Vec<_>) = encoded_fields
197            .into_iter()
198            .map(|field| (field.data, field.encoding))
199            .unzip();
200
201        // Zip together encoded data
202        //
203        // We can currently encode both FixedWidth and FixedSizeList.  In order
204        // to encode the latter we "flatten" it converting a FixedSizeList into
205        // a FixedWidth with very wide items.
206        let fixed_fields = encoded_data_vec
207            .into_iter()
208            .map(|child| match child {
209                DataBlock::FixedWidth(fixed) => Ok(fixed),
210                DataBlock::FixedSizeList(fixed_size_list) => {
211                    let flattened = fixed_size_list.try_into_flat().ok_or_else(|| {
212                        Error::invalid_input(
213                            "Packed struct encoder cannot pack nullable fixed-width data blocks",
214                            location!(),
215                        )
216                    })?;
217                    Ok(flattened)
218                }
219                _ => Err(Error::invalid_input(
220                    "Packed struct encoder currently only implemented for fixed-width data blocks",
221                    location!(),
222                )),
223            })
224            .collect::<Result<Vec<_>>>()?;
225        let total_bits_per_value = fixed_fields.iter().map(|f| f.bits_per_value).sum::<u64>();
226
227        let num_values = fixed_fields[0].num_values;
228        debug_assert!(fixed_fields
229            .iter()
230            .all(|field| field.num_values == num_values));
231
232        let zipped_input = fixed_fields
233            .into_iter()
234            .map(|field| (field.data, field.bits_per_value))
235            .collect::<Vec<_>>();
236        let zipped = LanceBuffer::zip_into_one(zipped_input, num_values)?;
237
238        // Create encoding protobuf
239        let index = *buffer_index;
240        *buffer_index += 1;
241
242        let packed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
243            data: zipped,
244            bits_per_value: total_bits_per_value,
245            num_values,
246            block_info: BlockInfo::new(),
247        });
248
249        let encoding = ProtobufUtils::packed_struct(child_encodings, index);
250
251        Ok(EncodedArray {
252            data: packed_data,
253            encoding,
254        })
255    }
256}
257
258#[cfg(test)]
259pub mod tests {
260
261    use arrow::array::ArrayData;
262    use arrow_array::{
263        Array, ArrayRef, FixedSizeListArray, Int32Array, StructArray, UInt64Array, UInt8Array,
264    };
265    use arrow_schema::{DataType, Field, Fields};
266    use std::{collections::HashMap, sync::Arc, vec};
267
268    use crate::{
269        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
270        version::LanceFileVersion,
271    };
272    use rstest::rstest;
273
274    #[rstest]
275    #[test_log::test(tokio::test)]
276    async fn test_random_packed_struct(
277        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
278    ) {
279        let data_type = DataType::Struct(Fields::from(vec![
280            Field::new("a", DataType::UInt64, false),
281            Field::new("b", DataType::UInt32, false),
282        ]));
283        let mut metadata = HashMap::new();
284        metadata.insert("packed".to_string(), "true".to_string());
285
286        let field = Field::new("", data_type, false).with_metadata(metadata);
287
288        check_round_trip_encoding_random(field, version).await;
289    }
290
291    #[rstest]
292    #[test_log::test(tokio::test)]
293    async fn test_specific_packed_struct(
294        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
295    ) {
296        let array1 = Arc::new(UInt64Array::from(vec![1, 2, 3, 4]));
297        let array2 = Arc::new(Int32Array::from(vec![5, 6, 7, 8]));
298        let array3 = Arc::new(UInt8Array::from(vec![9, 10, 11, 12]));
299
300        let struct_array1 = Arc::new(StructArray::from(vec![
301            (
302                Arc::new(Field::new("x", DataType::UInt64, false)),
303                array1.clone() as ArrayRef,
304            ),
305            (
306                Arc::new(Field::new("y", DataType::Int32, false)),
307                array2.clone() as ArrayRef,
308            ),
309            (
310                Arc::new(Field::new("z", DataType::UInt8, false)),
311                array3.clone() as ArrayRef,
312            ),
313        ]));
314
315        let array4 = Arc::new(UInt64Array::from(vec![13, 14, 15, 16]));
316        let array5 = Arc::new(Int32Array::from(vec![17, 18, 19, 20]));
317        let array6 = Arc::new(UInt8Array::from(vec![21, 22, 23, 24]));
318
319        let struct_array2 = Arc::new(StructArray::from(vec![
320            (
321                Arc::new(Field::new("x", DataType::UInt64, false)),
322                array4.clone() as ArrayRef,
323            ),
324            (
325                Arc::new(Field::new("y", DataType::Int32, false)),
326                array5.clone() as ArrayRef,
327            ),
328            (
329                Arc::new(Field::new("z", DataType::UInt8, false)),
330                array6.clone() as ArrayRef,
331            ),
332        ]));
333
334        let test_cases = TestCases::default()
335            .with_range(0..2)
336            .with_range(0..6)
337            .with_range(1..4)
338            .with_indices(vec![1, 3, 7])
339            .with_file_version(version);
340
341        let mut metadata = HashMap::new();
342        metadata.insert("packed".to_string(), "true".to_string());
343
344        check_round_trip_encoding_of_data(
345            vec![struct_array1, struct_array2],
346            &test_cases,
347            metadata,
348        )
349        .await;
350    }
351
352    // the current Lance V2.1 `packed-struct encoding` doesn't support `fixed size list`.
353    // the current Lance V2.0 test is disabled for now as we don't have statistics for `FixedSizeList`
354    #[rstest]
355    #[test_log::test(tokio::test)]
356    async fn test_fsl_packed_struct(
357        #[values(/*LanceFileVersion::V2_0,*/ /*LanceFileVersion::V2_1)*/)]
358        version: LanceFileVersion,
359    ) {
360        let int_array = Arc::new(Int32Array::from(vec![12, 13, 14, 15]));
361
362        let list_data_type =
363            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3);
364        let inner_array = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
365        let list_data = ArrayData::builder(list_data_type.clone())
366            .len(4)
367            .add_child_data(inner_array.into_data())
368            .build()
369            .unwrap();
370        let list_array = FixedSizeListArray::from(list_data);
371
372        let struct_array = Arc::new(StructArray::from(vec![
373            (
374                Arc::new(Field::new("x", list_data_type.clone(), false)),
375                Arc::new(list_array) as ArrayRef,
376            ),
377            (
378                Arc::new(Field::new("x", DataType::Int32, false)),
379                int_array as ArrayRef,
380            ),
381        ]));
382
383        let test_cases = TestCases::default()
384            .with_range(1..3)
385            .with_range(0..1)
386            .with_range(2..4)
387            .with_indices(vec![0, 2, 3])
388            .with_file_version(version);
389
390        let mut metadata = HashMap::new();
391        metadata.insert("packed".to_string(), "true".to_string());
392
393        check_round_trip_encoding_of_data(vec![struct_array], &test_cases, metadata).await;
394    }
395}