lance_encoding/encodings/physical/
struct_encoding.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow::datatypes::UInt64Type;
5
6use lance_core::{Error, Result};
7use snafu::location;
8
9use crate::{
10    buffer::LanceBuffer,
11    data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
12    decoder::MiniBlockDecompressor,
13    encoder::{MiniBlockCompressed, MiniBlockCompressor},
14    format::{
15        pb::{self},
16        ProtobufUtils,
17    },
18    statistics::{GetStat, Stat},
19};
20
21use super::value::{ValueDecompressor, ValueEncoder};
22
23// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`.
24// Only fields with fixed-width fields are supported for now, and the
25// assumption that all fields has `bits_per_value % 8 == 0` is made.
26fn struct_data_block_to_fixed_width_data_block(
27    struct_data_block: StructDataBlock,
28    bits_per_values: &[u32],
29) -> DataBlock {
30    let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
31    let mut output = Vec::with_capacity(data_size as usize);
32    let num_values = struct_data_block.children[0].num_values();
33
34    for i in 0..num_values as usize {
35        for (j, child) in struct_data_block.children.iter().enumerate() {
36            let bytes_per_value = (bits_per_values[j] / 8) as usize;
37            let this_data = child
38                .as_fixed_width_ref()
39                .unwrap()
40                .data
41                .slice_with_length(bytes_per_value * i, bytes_per_value);
42            output.extend_from_slice(&this_data);
43        }
44    }
45
46    DataBlock::FixedWidth(FixedWidthDataBlock {
47        bits_per_value: bits_per_values
48            .iter()
49            .map(|bits_per_value| *bits_per_value as u64)
50            .sum(),
51        data: LanceBuffer::Owned(output),
52        num_values,
53        block_info: BlockInfo::default(),
54    })
55}
56
57#[derive(Debug, Default)]
58pub struct PackedStructFixedWidthMiniBlockEncoder {}
59
60impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
61    fn compress(
62        &self,
63        data: DataBlock,
64    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
65        match data {
66            DataBlock::Struct(struct_data_block) => {
67                let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::<Vec<_>>();
68
69                // transform struct datablock to fixed-width data block.
70                let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
71
72                // store and transformed fixed-width data block.
73                let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
74                let (value_miniblock_compressed, value_array_encoding) =
75                value_miniblock_compressor.compress(data_block)?;
76
77                Ok((
78                    value_miniblock_compressed,
79                    ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values),
80                ))
81            }
82            _ => Err(Error::InvalidInput {
83                source: format!(
84                    "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
85                    data.name()
86                )
87                .into(),
88                location: location!(),
89            }),
90        }
91    }
92}
93
94#[derive(Debug)]
95pub struct PackedStructFixedWidthMiniBlockDecompressor {
96    bits_per_values: Vec<u32>,
97    array_encoding: Box<dyn MiniBlockDecompressor>,
98}
99
100impl PackedStructFixedWidthMiniBlockDecompressor {
101    pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self {
102        let array_encoding: Box<dyn MiniBlockDecompressor> = match description
103            .flat
104            .as_ref()
105            .unwrap()
106            .array_encoding
107            .as_ref()
108            .unwrap()
109        {
110            pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)),
111            _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
112        };
113        Self {
114            bits_per_values: description.bits_per_values.clone(),
115            array_encoding,
116        }
117    }
118}
119
120impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
121    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
122        let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
123        let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
124            panic!("ValueDecompressor should output FixedWidth DataBlock")
125        };
126
127        let bytes_per_values = self
128            .bits_per_values
129            .iter()
130            .map(|bits_per_value| *bits_per_value as usize / 8)
131            .collect::<Vec<_>>();
132
133        assert!(encoded_data_block.bits_per_value % 8 == 0);
134        let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
135
136        // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`.
137        let mut prefix_sum = vec![0; self.bits_per_values.len()];
138        for i in 0..(self.bits_per_values.len() - 1) {
139            prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
140        }
141
142        let mut children_data_block = vec![];
143        for i in 0..self.bits_per_values.len() {
144            let child_buf_size = bytes_per_values[i] * num_values as usize;
145            let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
146
147            for j in 0..num_values as usize {
148                // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`.
149                let this_value = encoded_data_block.data.slice_with_length(
150                    prefix_sum[i] + (j * encoded_bytes_per_row),
151                    bytes_per_values[i],
152                );
153
154                child_buf.extend_from_slice(&this_value);
155            }
156
157            let child = DataBlock::FixedWidth(FixedWidthDataBlock {
158                data: LanceBuffer::Owned(child_buf),
159                bits_per_value: self.bits_per_values[i] as u64,
160                num_values,
161                block_info: BlockInfo::default(),
162            });
163            children_data_block.push(child);
164        }
165        Ok(DataBlock::Struct(StructDataBlock {
166            children: children_data_block,
167            block_info: BlockInfo::default(),
168        }))
169    }
170}