lance_encoding/encodings/
physical.rs1use arrow_schema::DataType;
5use block_compress::CompressionConfig;
6use fsst::FsstPageScheduler;
7use lance_arrow::DataTypeExt;
8use packed_struct::PackedStructPageScheduler;
9
10use self::{
11 basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
12 dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
13 value::ValuePageScheduler,
14};
15use crate::buffer::LanceBuffer;
16use crate::encodings::physical::block_compress::CompressionScheme;
17use crate::{
18 decoder::PageScheduler,
19 format::pb::{self, PackedStruct},
20};
21
22pub mod basic;
23pub mod binary;
24pub mod bitmap;
25pub mod bitpack;
26pub mod bitpack_fastlanes;
27pub mod block_compress;
28pub mod dictionary;
29pub mod fixed_size_binary;
30pub mod fixed_size_list;
31pub mod fsst;
32pub mod packed_struct;
33pub mod struct_encoding;
34pub mod value;
35
36#[derive(Clone, Copy, Debug)]
38pub struct FileBuffers<'a> {
39 pub positions_and_sizes: &'a [(u64, u64)],
40}
41
42#[derive(Clone, Copy, Debug)]
44pub struct ColumnBuffers<'a, 'b> {
45 pub file_buffers: FileBuffers<'a>,
46 pub positions_and_sizes: &'b [(u64, u64)],
47}
48
49#[derive(Clone, Copy, Debug)]
51pub struct PageBuffers<'a, 'b, 'c> {
52 pub column_buffers: ColumnBuffers<'a, 'b>,
53 pub positions_and_sizes: &'c [(u64, u64)],
54}
55
56fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
59 let index = buffer_desc.buffer_index as usize;
60
61 match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
62 pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
63 pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
64 pb::buffer::BufferType::File => {
65 buffers.column_buffers.file_buffers.positions_and_sizes[index]
66 }
67 }
68}
69
70fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
72 let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
73 let compression_config: CompressionConfig = if encoding.compression.is_none() {
74 CompressionConfig::new(CompressionScheme::None, None)
75 } else {
76 let compression = encoding.compression.as_ref().unwrap();
77 CompressionConfig::new(
78 compression.scheme.as_str().parse().unwrap(),
79 compression.level,
80 )
81 };
82 match encoding.bits_per_value {
83 1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
84 bits_per_value => {
85 if bits_per_value % 8 != 0 {
86 todo!(
87 "bits_per_value ({}) that is not a multiple of 8",
88 bits_per_value
89 );
90 }
91 Box::new(ValuePageScheduler::new(
92 bits_per_value / 8,
93 buffer_offset,
94 buffer_size,
95 compression_config,
96 ))
97 }
98 }
99}
100
101fn get_bitpacked_buffer_decoder(
102 encoding: &pb::Bitpacked,
103 buffers: &PageBuffers,
104) -> Box<dyn PageScheduler> {
105 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
106
107 Box::new(bitpack::BitpackedScheduler::new(
108 encoding.compressed_bits_per_value,
109 encoding.uncompressed_bits_per_value,
110 buffer_offset,
111 encoding.signed,
112 ))
113}
114
115fn get_bitpacked_for_non_neg_buffer_decoder(
116 encoding: &pb::BitpackedForNonNeg,
117 buffers: &PageBuffers,
118) -> Box<dyn PageScheduler> {
119 let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
120
121 Box::new(bitpack_fastlanes::BitpackedForNonNegScheduler::new(
122 encoding.compressed_bits_per_value,
123 encoding.uncompressed_bits_per_value,
124 buffer_offset,
125 ))
126}
127
128fn decoder_from_packed_struct(
129 packed_struct: &PackedStruct,
130 buffers: &PageBuffers,
131 data_type: &DataType,
132) -> Box<dyn PageScheduler> {
133 let inner_encodings = &packed_struct.inner;
134 let fields = match data_type {
135 DataType::Struct(fields) => Some(fields),
136 _ => None,
137 }
138 .unwrap();
139
140 let inner_datatypes = fields
141 .iter()
142 .map(|field| field.data_type())
143 .collect::<Vec<_>>();
144
145 let mut inner_schedulers = Vec::with_capacity(fields.len());
146 for i in 0..fields.len() {
147 let inner_encoding = &inner_encodings[i];
148 let inner_datatype = inner_datatypes[i];
149 let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
150 inner_schedulers.push(inner_scheduler);
151 }
152
153 let packed_buffer = packed_struct.buffer.as_ref().unwrap();
154 let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
155
156 Box::new(PackedStructPageScheduler::new(
157 inner_schedulers,
158 data_type.clone(),
159 buffer_offset,
160 ))
161}
162
163pub fn decoder_from_array_encoding(
165 encoding: &pb::ArrayEncoding,
166 buffers: &PageBuffers,
167 data_type: &DataType,
168) -> Box<dyn PageScheduler> {
169 match encoding.array_encoding.as_ref().unwrap() {
170 pb::array_encoding::ArrayEncoding::Nullable(basic) => {
171 match basic.nullability.as_ref().unwrap() {
172 pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
173 BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
174 no_nulls.values.as_ref().unwrap(),
175 buffers,
176 data_type,
177 )),
178 ),
179 pb::nullable::Nullability::SomeNulls(some_nulls) => {
180 Box::new(BasicPageScheduler::new_nullable(
181 decoder_from_array_encoding(
182 some_nulls.validity.as_ref().unwrap(),
183 buffers,
184 data_type,
185 ),
186 decoder_from_array_encoding(
187 some_nulls.values.as_ref().unwrap(),
188 buffers,
189 data_type,
190 ),
191 ))
192 }
193 pb::nullable::Nullability::AllNulls(_) => {
194 Box::new(BasicPageScheduler::new_all_null())
195 }
196 }
197 }
198 pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
199 get_bitpacked_buffer_decoder(bitpacked, buffers)
200 }
201 pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
202 pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
203 let item_encoding = fixed_size_list.items.as_ref().unwrap();
204 let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
205 Box::new(FixedListScheduler::new(
206 item_scheduler,
207 fixed_size_list.dimension,
208 ))
209 }
210 pb::array_encoding::ArrayEncoding::List(list) => {
214 decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
215 }
216 pb::array_encoding::ArrayEncoding::Binary(binary) => {
217 let indices_encoding = binary.indices.as_ref().unwrap();
218 let bytes_encoding = binary.bytes.as_ref().unwrap();
219
220 let indices_scheduler =
221 decoder_from_array_encoding(indices_encoding, buffers, data_type);
222 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
223
224 let offset_type = match data_type {
225 DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
226 _ => DataType::Int32,
227 };
228
229 Box::new(BinaryPageScheduler::new(
230 indices_scheduler.into(),
231 bytes_scheduler.into(),
232 offset_type,
233 binary.null_adjustment,
234 ))
235 }
236 pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
237 let inner =
238 decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
239
240 Box::new(FsstPageScheduler::new(
241 inner,
242 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
243 ))
244 }
245 pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
246 let indices_encoding = dictionary.indices.as_ref().unwrap();
247 let items_encoding = dictionary.items.as_ref().unwrap();
248 let num_dictionary_items = dictionary.num_dictionary_items;
249
250 let value_type = if let DataType::Dictionary(_, value_type) = data_type {
254 value_type
255 } else {
256 data_type
257 };
258
259 let indices_scheduler =
263 decoder_from_array_encoding(indices_encoding, buffers, data_type);
264
265 let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
266
267 let should_decode_dict = !data_type.is_dictionary();
268
269 Box::new(DictionaryPageScheduler::new(
270 indices_scheduler.into(),
271 items_scheduler.into(),
272 num_dictionary_items,
273 should_decode_dict,
274 ))
275 }
276 pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
277 let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
278 let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
279 let bytes_per_offset = match data_type {
280 DataType::LargeBinary | DataType::LargeUtf8 => 8,
281 DataType::Binary | DataType::Utf8 => 4,
282 _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
283 };
284
285 Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
286 bytes_scheduler,
287 fixed_size_binary.byte_width,
288 bytes_per_offset,
289 ))
290 }
291 pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
292 decoder_from_packed_struct(packed_struct, buffers, data_type)
293 }
294 pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
295 get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
296 }
297 pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
302 pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
304 pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
305 pb::array_encoding::ArrayEncoding::Variable(_) => unreachable!(),
306 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(_) => unreachable!(),
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use crate::encodings::physical::{get_buffer_decoder, ColumnBuffers, FileBuffers, PageBuffers};
313 use crate::format::pb;
314
315 #[test]
316 fn test_get_buffer_decoder_for_compressed_buffer() {
317 let page_scheduler = get_buffer_decoder(
318 &pb::Flat {
319 buffer: Some(pb::Buffer {
320 buffer_index: 0,
321 buffer_type: pb::buffer::BufferType::File as i32,
322 }),
323 bits_per_value: 8,
324 compression: Some(pb::Compression {
325 scheme: "zstd".to_string(),
326 level: Some(0),
327 }),
328 },
329 &PageBuffers {
330 column_buffers: ColumnBuffers {
331 file_buffers: FileBuffers {
332 positions_and_sizes: &[(0, 100)],
333 },
334 positions_and_sizes: &[],
335 },
336 positions_and_sizes: &[],
337 },
338 );
339 assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
340 }
341}