lance_encoding/encodings/physical/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9
10use lance_core::{Error, Result};
11use snafu::location;
12
13use crate::{
14    buffer::LanceBuffer,
15    data::{BlockInfo, DataBlock, NullableDataBlock, VariableWidthBlock},
16    decoder::{
17        MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder, VariablePerValueDecompressor,
18    },
19    encoder::{
20        ArrayEncoder, EncodedArray, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
21        PerValueDataBlock,
22    },
23    format::{
24        pb::{self},
25        ProtobufUtils,
26    },
27    EncodingsIo,
28};
29
30use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};
31
32#[derive(Debug)]
33pub struct FsstPageScheduler {
34    inner_scheduler: Box<dyn PageScheduler>,
35    symbol_table: LanceBuffer,
36}
37
38impl FsstPageScheduler {
39    pub fn new(inner_scheduler: Box<dyn PageScheduler>, symbol_table: LanceBuffer) -> Self {
40        Self {
41            inner_scheduler,
42            symbol_table,
43        }
44    }
45}
46
47impl PageScheduler for FsstPageScheduler {
48    fn schedule_ranges(
49        &self,
50        ranges: &[Range<u64>],
51        scheduler: &Arc<dyn EncodingsIo>,
52        top_level_row: u64,
53    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
54        let inner_decoder = self
55            .inner_scheduler
56            .schedule_ranges(ranges, scheduler, top_level_row);
57        let symbol_table = self.symbol_table.try_clone().unwrap();
58
59        async move {
60            let inner_decoder = inner_decoder.await?;
61            Ok(Box::new(FsstPageDecoder {
62                inner_decoder,
63                symbol_table,
64            }) as Box<dyn PrimitivePageDecoder>)
65        }
66        .boxed()
67    }
68}
69
70struct FsstPageDecoder {
71    inner_decoder: Box<dyn PrimitivePageDecoder>,
72    symbol_table: LanceBuffer,
73}
74
75impl PrimitivePageDecoder for FsstPageDecoder {
76    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
77        let compressed_data = self.inner_decoder.decode(rows_to_skip, num_rows)?;
78        let (string_data, nulls) = match compressed_data {
79            DataBlock::Nullable(nullable) => {
80                let data = nullable.data.as_variable_width().unwrap();
81                Result::Ok((data, Some(nullable.nulls)))
82            }
83            DataBlock::VariableWidth(variable) => Ok((variable, None)),
84            _ => panic!("Received non-variable width data from inner decoder"),
85        }?;
86
87        let offsets = ScalarBuffer::<i32>::from(string_data.offsets.into_buffer());
88        let bytes = string_data.data.into_buffer();
89
90        let mut decompressed_offsets = vec![0_i32; offsets.len()];
91        let mut decompressed_bytes = vec![0_u8; bytes.len() * 8];
92        // Safety: Exposes uninitialized memory but we're about to clobber it
93        unsafe {
94            decompressed_bytes.set_len(decompressed_bytes.capacity());
95        }
96        fsst::fsst::decompress(
97            &self.symbol_table,
98            &bytes,
99            &offsets,
100            &mut decompressed_bytes,
101            &mut decompressed_offsets,
102        )?;
103
104        // TODO: Change PrimitivePageDecoder to use Vec instead of BytesMut
105        // since there is no way to get BytesMut from Vec but these copies should be avoidable
106        // This is not the first time this has happened
107        let mut offsets_as_bytes_mut = Vec::with_capacity(decompressed_offsets.len());
108        let decompressed_offsets = ScalarBuffer::<i32>::from(decompressed_offsets);
109        offsets_as_bytes_mut.extend_from_slice(decompressed_offsets.inner().as_slice());
110
111        let mut bytes_as_bytes_mut = Vec::with_capacity(decompressed_bytes.len());
112        bytes_as_bytes_mut.extend_from_slice(&decompressed_bytes);
113
114        let new_string_data = DataBlock::VariableWidth(VariableWidthBlock {
115            bits_per_offset: 32,
116            data: LanceBuffer::from(bytes_as_bytes_mut),
117            num_values: num_rows,
118            offsets: LanceBuffer::from(offsets_as_bytes_mut),
119            block_info: BlockInfo::new(),
120        });
121
122        if let Some(nulls) = nulls {
123            Ok(DataBlock::Nullable(NullableDataBlock {
124                data: Box::new(new_string_data),
125                nulls,
126                block_info: BlockInfo::new(),
127            }))
128        } else {
129            Ok(new_string_data)
130        }
131    }
132}
133
134#[derive(Debug)]
135pub struct FsstArrayEncoder {
136    inner_encoder: Box<dyn ArrayEncoder>,
137}
138
139impl FsstArrayEncoder {
140    pub fn new(inner_encoder: Box<dyn ArrayEncoder>) -> Self {
141        Self { inner_encoder }
142    }
143}
144
145impl ArrayEncoder for FsstArrayEncoder {
146    fn encode(
147        &self,
148        data: DataBlock,
149        data_type: &DataType,
150        buffer_index: &mut u32,
151    ) -> lance_core::Result<EncodedArray> {
152        let (mut data, nulls) = match data {
153            DataBlock::Nullable(nullable) => {
154                let data = nullable.data.as_variable_width().unwrap();
155                (data, Some(nullable.nulls))
156            }
157            DataBlock::VariableWidth(variable) => (variable, None),
158            _ => panic!("Expected variable width data block"),
159        };
160        assert_eq!(data.bits_per_offset, 32);
161        let num_values = data.num_values;
162        let offsets = data.offsets.borrow_to_typed_slice::<i32>();
163        let offsets_slice = offsets.as_ref();
164        let bytes_data = data.data.into_buffer();
165
166        let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
167        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
168        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
169
170        fsst::fsst::compress(
171            &mut symbol_table,
172            bytes_data.as_slice(),
173            offsets_slice,
174            &mut dest_values,
175            &mut dest_offsets,
176        )?;
177
178        let dest_offset = LanceBuffer::reinterpret_vec(dest_offsets);
179        let dest_values = LanceBuffer::Owned(dest_values);
180        let dest_data = DataBlock::VariableWidth(VariableWidthBlock {
181            bits_per_offset: 32,
182            data: dest_values,
183            num_values,
184            offsets: dest_offset,
185            block_info: BlockInfo::new(),
186        });
187
188        let data_block = if let Some(nulls) = nulls {
189            DataBlock::Nullable(NullableDataBlock {
190                data: Box::new(dest_data),
191                nulls,
192                block_info: BlockInfo::new(),
193            })
194        } else {
195            dest_data
196        };
197
198        let inner_encoded = self
199            .inner_encoder
200            .encode(data_block, data_type, buffer_index)?;
201
202        let encoding = ProtobufUtils::fsst(inner_encoded.encoding, symbol_table);
203
204        Ok(EncodedArray {
205            data: inner_encoded.data,
206            encoding,
207        })
208    }
209}
210
211struct FsstCompressed {
212    data: VariableWidthBlock,
213    symbol_table: Vec<u8>,
214}
215
216impl FsstCompressed {
217    fn fsst_compress(data: DataBlock) -> Result<Self> {
218        match data {
219            DataBlock::VariableWidth(mut variable_width) => {
220                let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
221                let offsets_slice = offsets.as_ref();
222                let bytes_data = variable_width.data.into_buffer();
223
224                // prepare compression output buffer
225                let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
226                let mut dest_values = vec![0_u8; bytes_data.len() * 2];
227                let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
228
229                // fsst compression
230                fsst::fsst::compress(
231                    &mut symbol_table,
232                    bytes_data.as_slice(),
233                    offsets_slice,
234                    &mut dest_values,
235                    &mut dest_offsets,
236                )?;
237
238                // construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
239                let compressed = VariableWidthBlock {
240                    data: LanceBuffer::reinterpret_vec(dest_values),
241                    bits_per_offset: 32,
242                    offsets: LanceBuffer::reinterpret_vec(dest_offsets),
243                    num_values: variable_width.num_values,
244                    block_info: BlockInfo::new(),
245                };
246
247                Ok(Self {
248                    data: compressed,
249                    symbol_table,
250                })
251            }
252            _ => Err(Error::InvalidInput {
253                source: format!(
254                    "Cannot compress a data block of type {} with FsstEncoder",
255                    data.name()
256                )
257                .into(),
258                location: location!(),
259            }),
260        }
261    }
262}
263
264#[derive(Debug, Default)]
265pub struct FsstMiniBlockEncoder {}
266
267impl MiniBlockCompressor for FsstMiniBlockEncoder {
268    fn compress(
269        &self,
270        data: DataBlock,
271    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
272        let compressed = FsstCompressed::fsst_compress(data)?;
273
274        let data_block = DataBlock::VariableWidth(compressed.data);
275
276        // compress the fsst compressed data using `BinaryMiniBlockEncoder`
277        let binary_compressor =
278            Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
279
280        let (binary_miniblock_compressed, binary_array_encoding) =
281            binary_compressor.compress(data_block)?;
282
283        Ok((
284            binary_miniblock_compressed,
285            ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
286        ))
287    }
288}
289
290#[derive(Debug)]
291pub struct FsstPerValueEncoder {
292    inner: Box<dyn PerValueCompressor>,
293}
294
295impl FsstPerValueEncoder {
296    pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
297        Self { inner }
298    }
299}
300
301impl PerValueCompressor for FsstPerValueEncoder {
302    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
303        let compressed = FsstCompressed::fsst_compress(data)?;
304
305        let data_block = DataBlock::VariableWidth(compressed.data);
306
307        let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
308
309        Ok((
310            binary_compressed,
311            ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
312        ))
313    }
314}
315
316#[derive(Debug)]
317pub struct FsstPerValueDecompressor {
318    symbol_table: LanceBuffer,
319    inner_decompressor: Box<dyn VariablePerValueDecompressor>,
320}
321
322impl FsstPerValueDecompressor {
323    pub fn new(
324        symbol_table: LanceBuffer,
325        inner_decompressor: Box<dyn VariablePerValueDecompressor>,
326    ) -> Self {
327        Self {
328            symbol_table,
329            inner_decompressor,
330        }
331    }
332}
333
334impl VariablePerValueDecompressor for FsstPerValueDecompressor {
335    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
336        // Step 1. Run inner decompressor
337        let mut compressed_variable_data = self
338            .inner_decompressor
339            .decompress(data)?
340            .as_variable_width()
341            .unwrap();
342
343        // Step 2. FSST decompress
344        let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
345        let bytes = bytes.as_ref();
346        let offsets = compressed_variable_data
347            .offsets
348            .borrow_to_typed_slice::<i32>();
349        let offsets = offsets.as_ref();
350        let num_values = compressed_variable_data.num_values;
351
352        // The data will expand at most 8 times
353        // The offsets will be the same size because we have the same # of strings
354        let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
355        let mut decompress_offset_buf = vec![0i32; offsets.len()];
356        fsst::fsst::decompress(
357            &self.symbol_table,
358            bytes,
359            offsets,
360            &mut decompress_bytes_buf,
361            &mut decompress_offset_buf,
362        )?;
363
364        Ok(DataBlock::VariableWidth(VariableWidthBlock {
365            data: LanceBuffer::Owned(decompress_bytes_buf),
366            offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
367            bits_per_offset: 32,
368            num_values,
369            block_info: BlockInfo::new(),
370        }))
371    }
372}
373
374#[derive(Debug)]
375pub struct FsstMiniBlockDecompressor {
376    symbol_table: LanceBuffer,
377}
378
379impl FsstMiniBlockDecompressor {
380    pub fn new(description: &pb::Fsst) -> Self {
381        Self {
382            symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
383        }
384    }
385}
386
387impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
388    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
389        // Step 1. decompress data use `BinaryMiniBlockDecompressor`
390        let binary_decompressor =
391            Box::new(BinaryMiniBlockDecompressor::default()) as Box<dyn MiniBlockDecompressor>;
392        let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
393        let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
394            panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
395        };
396
397        // Step 2. FSST decompress
398        let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
399        let bytes = bytes.as_ref();
400        let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
401        let offsets = offsets.as_ref();
402
403        // FSST decompression output buffer, the `MiniBlock` has a size limit of `4 KiB` and
404        // the FSST decompression algorithm output is at most `8 * input_size`
405        // Since `MiniBlock Size` <= 4 KiB and `offsets` are type `i32, it has number of `offsets` <= 1024.
406        let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
407        let mut decompress_offset_buf = vec![0i32; 1024];
408        fsst::fsst::decompress(
409            &self.symbol_table,
410            bytes,
411            offsets,
412            &mut decompress_bytes_buf,
413            &mut decompress_offset_buf,
414        )?;
415
416        Ok(DataBlock::VariableWidth(VariableWidthBlock {
417            data: LanceBuffer::Owned(decompress_bytes_buf),
418            offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
419            bits_per_offset: 32,
420            num_values,
421            block_info: BlockInfo::new(),
422        }))
423    }
424}
425
426#[cfg(test)]
427mod tests {
428
429    use std::collections::HashMap;
430
431    use lance_datagen::{ByteCount, RowCount};
432
433    use crate::{
434        testing::{check_round_trip_encoding_of_data, TestCases},
435        version::LanceFileVersion,
436    };
437
438    #[test_log::test(tokio::test)]
439    async fn test_fsst() {
440        let arr = lance_datagen::gen()
441            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(32), false))
442            .into_batch_rows(RowCount::from(1_000_000))
443            .unwrap()
444            .column(0)
445            .clone();
446        check_round_trip_encoding_of_data(
447            vec![arr],
448            &TestCases::default().with_file_version(LanceFileVersion::V2_1),
449            HashMap::new(),
450        )
451        .await;
452
453        let arr = lance_datagen::gen()
454            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
455            .into_batch_rows(RowCount::from(1_000_000))
456            .unwrap()
457            .column(0)
458            .clone();
459        check_round_trip_encoding_of_data(
460            vec![arr],
461            &TestCases::default().with_file_version(LanceFileVersion::V2_1),
462            HashMap::new(),
463        )
464        .await;
465    }
466}