lance_encoding/encodings/physical/
struct_encoding.rs1use arrow::datatypes::UInt64Type;
5
6use lance_core::{Error, Result};
7use snafu::location;
8
9use crate::{
10 buffer::LanceBuffer,
11 data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
12 decoder::MiniBlockDecompressor,
13 encoder::{MiniBlockCompressed, MiniBlockCompressor},
14 format::{
15 pb::{self},
16 ProtobufUtils,
17 },
18 statistics::{GetStat, Stat},
19};
20
21use super::value::{ValueDecompressor, ValueEncoder};
22
23fn struct_data_block_to_fixed_width_data_block(
27 struct_data_block: StructDataBlock,
28 bits_per_values: &[u32],
29) -> DataBlock {
30 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
31 let mut output = Vec::with_capacity(data_size as usize);
32 let num_values = struct_data_block.children[0].num_values();
33
34 for i in 0..num_values as usize {
35 for (j, child) in struct_data_block.children.iter().enumerate() {
36 let bytes_per_value = (bits_per_values[j] / 8) as usize;
37 let this_data = child
38 .as_fixed_width_ref()
39 .unwrap()
40 .data
41 .slice_with_length(bytes_per_value * i, bytes_per_value);
42 output.extend_from_slice(&this_data);
43 }
44 }
45
46 DataBlock::FixedWidth(FixedWidthDataBlock {
47 bits_per_value: bits_per_values
48 .iter()
49 .map(|bits_per_value| *bits_per_value as u64)
50 .sum(),
51 data: LanceBuffer::Owned(output),
52 num_values,
53 block_info: BlockInfo::default(),
54 })
55}
56
57#[derive(Debug, Default)]
58pub struct PackedStructFixedWidthMiniBlockEncoder {}
59
60impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
61 fn compress(
62 &self,
63 data: DataBlock,
64 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
65 match data {
66 DataBlock::Struct(struct_data_block) => {
67 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::<Vec<_>>();
68
69 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
71
72 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
74 let (value_miniblock_compressed, value_array_encoding) =
75 value_miniblock_compressor.compress(data_block)?;
76
77 Ok((
78 value_miniblock_compressed,
79 ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values),
80 ))
81 }
82 _ => Err(Error::InvalidInput {
83 source: format!(
84 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
85 data.name()
86 )
87 .into(),
88 location: location!(),
89 }),
90 }
91 }
92}
93
94#[derive(Debug)]
95pub struct PackedStructFixedWidthMiniBlockDecompressor {
96 bits_per_values: Vec<u32>,
97 array_encoding: Box<dyn MiniBlockDecompressor>,
98}
99
100impl PackedStructFixedWidthMiniBlockDecompressor {
101 pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self {
102 let array_encoding: Box<dyn MiniBlockDecompressor> = match description
103 .flat
104 .as_ref()
105 .unwrap()
106 .array_encoding
107 .as_ref()
108 .unwrap()
109 {
110 pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)),
111 _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
112 };
113 Self {
114 bits_per_values: description.bits_per_values.clone(),
115 array_encoding,
116 }
117 }
118}
119
120impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
121 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
122 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
123 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
124 panic!("ValueDecompressor should output FixedWidth DataBlock")
125 };
126
127 let bytes_per_values = self
128 .bits_per_values
129 .iter()
130 .map(|bits_per_value| *bits_per_value as usize / 8)
131 .collect::<Vec<_>>();
132
133 assert!(encoded_data_block.bits_per_value % 8 == 0);
134 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
135
136 let mut prefix_sum = vec![0; self.bits_per_values.len()];
138 for i in 0..(self.bits_per_values.len() - 1) {
139 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
140 }
141
142 let mut children_data_block = vec![];
143 for i in 0..self.bits_per_values.len() {
144 let child_buf_size = bytes_per_values[i] * num_values as usize;
145 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
146
147 for j in 0..num_values as usize {
148 let this_value = encoded_data_block.data.slice_with_length(
150 prefix_sum[i] + (j * encoded_bytes_per_row),
151 bytes_per_values[i],
152 );
153
154 child_buf.extend_from_slice(&this_value);
155 }
156
157 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
158 data: LanceBuffer::Owned(child_buf),
159 bits_per_value: self.bits_per_values[i] as u64,
160 num_values,
161 block_info: BlockInfo::default(),
162 });
163 children_data_block.push(child);
164 }
165 Ok(DataBlock::Struct(StructDataBlock {
166 children: children_data_block,
167 block_info: BlockInfo::default(),
168 }))
169 }
170}