lance_encoding/encodings/
physical.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use block_compress::CompressionConfig;
6use fsst::FsstPageScheduler;
7use lance_arrow::DataTypeExt;
8use packed_struct::PackedStructPageScheduler;
9
10use self::{
11    basic::BasicPageScheduler, binary::BinaryPageScheduler, bitmap::DenseBitmapScheduler,
12    dictionary::DictionaryPageScheduler, fixed_size_list::FixedListScheduler,
13    value::ValuePageScheduler,
14};
15use crate::buffer::LanceBuffer;
16use crate::encodings::physical::block_compress::CompressionScheme;
17use crate::{
18    decoder::PageScheduler,
19    format::pb::{self, PackedStruct},
20};
21
22pub mod basic;
23pub mod binary;
24pub mod bitmap;
25pub mod bitpack;
26pub mod bitpack_fastlanes;
27pub mod block_compress;
28pub mod dictionary;
29pub mod fixed_size_binary;
30pub mod fixed_size_list;
31pub mod fsst;
32pub mod packed_struct;
33pub mod struct_encoding;
34pub mod value;
35
36/// These contain the file buffers shared across the entire file
37#[derive(Clone, Copy, Debug)]
38pub struct FileBuffers<'a> {
39    pub positions_and_sizes: &'a [(u64, u64)],
40}
41
42/// These contain the file buffers and also buffers specific to a column
43#[derive(Clone, Copy, Debug)]
44pub struct ColumnBuffers<'a, 'b> {
45    pub file_buffers: FileBuffers<'a>,
46    pub positions_and_sizes: &'b [(u64, u64)],
47}
48
49/// These contain the file & column buffers and also buffers specific to a page
50#[derive(Clone, Copy, Debug)]
51pub struct PageBuffers<'a, 'b, 'c> {
52    pub column_buffers: ColumnBuffers<'a, 'b>,
53    pub positions_and_sizes: &'c [(u64, u64)],
54}
55
56// Translate a protobuf buffer description into a position in the file.  This could be a page
57// buffer, a column buffer, or a file buffer.
58fn get_buffer(buffer_desc: &pb::Buffer, buffers: &PageBuffers) -> (u64, u64) {
59    let index = buffer_desc.buffer_index as usize;
60
61    match pb::buffer::BufferType::try_from(buffer_desc.buffer_type).unwrap() {
62        pb::buffer::BufferType::Page => buffers.positions_and_sizes[index],
63        pb::buffer::BufferType::Column => buffers.column_buffers.positions_and_sizes[index],
64        pb::buffer::BufferType::File => {
65            buffers.column_buffers.file_buffers.positions_and_sizes[index]
66        }
67    }
68}
69
70/// Convert a protobuf buffer encoding into a physical page scheduler
71fn get_buffer_decoder(encoding: &pb::Flat, buffers: &PageBuffers) -> Box<dyn PageScheduler> {
72    let (buffer_offset, buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
73    let compression_config: CompressionConfig = if encoding.compression.is_none() {
74        CompressionConfig::new(CompressionScheme::None, None)
75    } else {
76        let compression = encoding.compression.as_ref().unwrap();
77        CompressionConfig::new(
78            compression.scheme.as_str().parse().unwrap(),
79            compression.level,
80        )
81    };
82    match encoding.bits_per_value {
83        1 => Box::new(DenseBitmapScheduler::new(buffer_offset)),
84        bits_per_value => {
85            if bits_per_value % 8 != 0 {
86                todo!(
87                    "bits_per_value ({}) that is not a multiple of 8",
88                    bits_per_value
89                );
90            }
91            Box::new(ValuePageScheduler::new(
92                bits_per_value / 8,
93                buffer_offset,
94                buffer_size,
95                compression_config,
96            ))
97        }
98    }
99}
100
101fn get_bitpacked_buffer_decoder(
102    encoding: &pb::Bitpacked,
103    buffers: &PageBuffers,
104) -> Box<dyn PageScheduler> {
105    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
106
107    Box::new(bitpack::BitpackedScheduler::new(
108        encoding.compressed_bits_per_value,
109        encoding.uncompressed_bits_per_value,
110        buffer_offset,
111        encoding.signed,
112    ))
113}
114
115fn get_bitpacked_for_non_neg_buffer_decoder(
116    encoding: &pb::BitpackedForNonNeg,
117    buffers: &PageBuffers,
118) -> Box<dyn PageScheduler> {
119    let (buffer_offset, _buffer_size) = get_buffer(encoding.buffer.as_ref().unwrap(), buffers);
120
121    Box::new(bitpack_fastlanes::BitpackedForNonNegScheduler::new(
122        encoding.compressed_bits_per_value,
123        encoding.uncompressed_bits_per_value,
124        buffer_offset,
125    ))
126}
127
128fn decoder_from_packed_struct(
129    packed_struct: &PackedStruct,
130    buffers: &PageBuffers,
131    data_type: &DataType,
132) -> Box<dyn PageScheduler> {
133    let inner_encodings = &packed_struct.inner;
134    let fields = match data_type {
135        DataType::Struct(fields) => Some(fields),
136        _ => None,
137    }
138    .unwrap();
139
140    let inner_datatypes = fields
141        .iter()
142        .map(|field| field.data_type())
143        .collect::<Vec<_>>();
144
145    let mut inner_schedulers = Vec::with_capacity(fields.len());
146    for i in 0..fields.len() {
147        let inner_encoding = &inner_encodings[i];
148        let inner_datatype = inner_datatypes[i];
149        let inner_scheduler = decoder_from_array_encoding(inner_encoding, buffers, inner_datatype);
150        inner_schedulers.push(inner_scheduler);
151    }
152
153    let packed_buffer = packed_struct.buffer.as_ref().unwrap();
154    let (buffer_offset, _) = get_buffer(packed_buffer, buffers);
155
156    Box::new(PackedStructPageScheduler::new(
157        inner_schedulers,
158        data_type.clone(),
159        buffer_offset,
160    ))
161}
162
163/// Convert a protobuf array encoding into a physical page scheduler
164pub fn decoder_from_array_encoding(
165    encoding: &pb::ArrayEncoding,
166    buffers: &PageBuffers,
167    data_type: &DataType,
168) -> Box<dyn PageScheduler> {
169    match encoding.array_encoding.as_ref().unwrap() {
170        pb::array_encoding::ArrayEncoding::Nullable(basic) => {
171            match basic.nullability.as_ref().unwrap() {
172                pb::nullable::Nullability::NoNulls(no_nulls) => Box::new(
173                    BasicPageScheduler::new_non_nullable(decoder_from_array_encoding(
174                        no_nulls.values.as_ref().unwrap(),
175                        buffers,
176                        data_type,
177                    )),
178                ),
179                pb::nullable::Nullability::SomeNulls(some_nulls) => {
180                    Box::new(BasicPageScheduler::new_nullable(
181                        decoder_from_array_encoding(
182                            some_nulls.validity.as_ref().unwrap(),
183                            buffers,
184                            data_type,
185                        ),
186                        decoder_from_array_encoding(
187                            some_nulls.values.as_ref().unwrap(),
188                            buffers,
189                            data_type,
190                        ),
191                    ))
192                }
193                pb::nullable::Nullability::AllNulls(_) => {
194                    Box::new(BasicPageScheduler::new_all_null())
195                }
196            }
197        }
198        pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
199            get_bitpacked_buffer_decoder(bitpacked, buffers)
200        }
201        pb::array_encoding::ArrayEncoding::Flat(flat) => get_buffer_decoder(flat, buffers),
202        pb::array_encoding::ArrayEncoding::FixedSizeList(fixed_size_list) => {
203            let item_encoding = fixed_size_list.items.as_ref().unwrap();
204            let item_scheduler = decoder_from_array_encoding(item_encoding, buffers, data_type);
205            Box::new(FixedListScheduler::new(
206                item_scheduler,
207                fixed_size_list.dimension,
208            ))
209        }
210        // This is a column containing the list offsets.  This wrapper is superfluous at the moment
211        // since we know it is a list based on the schema.  In the future there may be different ways
212        // of storing the list offsets.
213        pb::array_encoding::ArrayEncoding::List(list) => {
214            decoder_from_array_encoding(list.offsets.as_ref().unwrap(), buffers, data_type)
215        }
216        pb::array_encoding::ArrayEncoding::Binary(binary) => {
217            let indices_encoding = binary.indices.as_ref().unwrap();
218            let bytes_encoding = binary.bytes.as_ref().unwrap();
219
220            let indices_scheduler =
221                decoder_from_array_encoding(indices_encoding, buffers, data_type);
222            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
223
224            let offset_type = match data_type {
225                DataType::LargeBinary | DataType::LargeUtf8 => DataType::Int64,
226                _ => DataType::Int32,
227            };
228
229            Box::new(BinaryPageScheduler::new(
230                indices_scheduler.into(),
231                bytes_scheduler.into(),
232                offset_type,
233                binary.null_adjustment,
234            ))
235        }
236        pb::array_encoding::ArrayEncoding::Fsst(fsst) => {
237            let inner =
238                decoder_from_array_encoding(fsst.binary.as_ref().unwrap(), buffers, data_type);
239
240            Box::new(FsstPageScheduler::new(
241                inner,
242                LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
243            ))
244        }
245        pb::array_encoding::ArrayEncoding::Dictionary(dictionary) => {
246            let indices_encoding = dictionary.indices.as_ref().unwrap();
247            let items_encoding = dictionary.items.as_ref().unwrap();
248            let num_dictionary_items = dictionary.num_dictionary_items;
249
250            // We can get here in 2 ways.  The data is dictionary encoded and the user wants a dictionary or
251            // the data is dictionary encoded, as an optimization, and the user wants the value type.  Figure
252            // out the value type.
253            let value_type = if let DataType::Dictionary(_, value_type) = data_type {
254                value_type
255            } else {
256                data_type
257            };
258
259            // Note: we don't actually know the indices type here, passing down `data_type` works ok because
260            // the dictionary indices are always integers and we don't need the data_type to figure out how
261            // to decode integers.
262            let indices_scheduler =
263                decoder_from_array_encoding(indices_encoding, buffers, data_type);
264
265            let items_scheduler = decoder_from_array_encoding(items_encoding, buffers, value_type);
266
267            let should_decode_dict = !data_type.is_dictionary();
268
269            Box::new(DictionaryPageScheduler::new(
270                indices_scheduler.into(),
271                items_scheduler.into(),
272                num_dictionary_items,
273                should_decode_dict,
274            ))
275        }
276        pb::array_encoding::ArrayEncoding::FixedSizeBinary(fixed_size_binary) => {
277            let bytes_encoding = fixed_size_binary.bytes.as_ref().unwrap();
278            let bytes_scheduler = decoder_from_array_encoding(bytes_encoding, buffers, data_type);
279            let bytes_per_offset = match data_type {
280                DataType::LargeBinary | DataType::LargeUtf8 => 8,
281                DataType::Binary | DataType::Utf8 => 4,
282                _ => panic!("FixedSizeBinary only supports binary and utf8 types"),
283            };
284
285            Box::new(fixed_size_binary::FixedSizeBinaryPageScheduler::new(
286                bytes_scheduler,
287                fixed_size_binary.byte_width,
288                bytes_per_offset,
289            ))
290        }
291        pb::array_encoding::ArrayEncoding::PackedStruct(packed_struct) => {
292            decoder_from_packed_struct(packed_struct, buffers, data_type)
293        }
294        pb::array_encoding::ArrayEncoding::BitpackedForNonNeg(bitpacked) => {
295            get_bitpacked_for_non_neg_buffer_decoder(bitpacked, buffers)
296        }
297        // Currently there is no way to encode struct nullability and structs are encoded with a "header" column
298        // (that has no data).  We never actually decode that column and so this branch is never actually encountered.
299        //
300        // This will change in the future when we add support for struct nullability.
301        pb::array_encoding::ArrayEncoding::Struct(_) => unreachable!(),
302        // 2.1 only
303        pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
304        pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
305        pb::array_encoding::ArrayEncoding::Variable(_) => unreachable!(),
306        pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(_) => unreachable!(),
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use crate::encodings::physical::{get_buffer_decoder, ColumnBuffers, FileBuffers, PageBuffers};
313    use crate::format::pb;
314
315    #[test]
316    fn test_get_buffer_decoder_for_compressed_buffer() {
317        let page_scheduler = get_buffer_decoder(
318            &pb::Flat {
319                buffer: Some(pb::Buffer {
320                    buffer_index: 0,
321                    buffer_type: pb::buffer::BufferType::File as i32,
322                }),
323                bits_per_value: 8,
324                compression: Some(pb::Compression {
325                    scheme: "zstd".to_string(),
326                    level: Some(0),
327                }),
328            },
329            &PageBuffers {
330                column_buffers: ColumnBuffers {
331                    file_buffers: FileBuffers {
332                        positions_and_sizes: &[(0, 100)],
333                    },
334                    positions_and_sizes: &[],
335                },
336                positions_and_sizes: &[],
337            },
338        );
339        assert_eq!(format!("{:?}", page_scheduler).as_str(), "ValuePageScheduler { bytes_per_value: 1, buffer_offset: 0, buffer_size: 100, compression_config: CompressionConfig { scheme: Zstd, level: Some(0) } }");
340    }
341}