1use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9
10use lance_core::{Error, Result};
11use snafu::location;
12
13use crate::{
14 buffer::LanceBuffer,
15 data::{BlockInfo, DataBlock, NullableDataBlock, VariableWidthBlock},
16 decoder::{
17 MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder, VariablePerValueDecompressor,
18 },
19 encoder::{
20 ArrayEncoder, EncodedArray, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
21 PerValueDataBlock,
22 },
23 format::{
24 pb::{self},
25 ProtobufUtils,
26 },
27 EncodingsIo,
28};
29
30use super::binary::{BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder};
31
32#[derive(Debug)]
33pub struct FsstPageScheduler {
34 inner_scheduler: Box<dyn PageScheduler>,
35 symbol_table: LanceBuffer,
36}
37
38impl FsstPageScheduler {
39 pub fn new(inner_scheduler: Box<dyn PageScheduler>, symbol_table: LanceBuffer) -> Self {
40 Self {
41 inner_scheduler,
42 symbol_table,
43 }
44 }
45}
46
47impl PageScheduler for FsstPageScheduler {
48 fn schedule_ranges(
49 &self,
50 ranges: &[Range<u64>],
51 scheduler: &Arc<dyn EncodingsIo>,
52 top_level_row: u64,
53 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
54 let inner_decoder = self
55 .inner_scheduler
56 .schedule_ranges(ranges, scheduler, top_level_row);
57 let symbol_table = self.symbol_table.try_clone().unwrap();
58
59 async move {
60 let inner_decoder = inner_decoder.await?;
61 Ok(Box::new(FsstPageDecoder {
62 inner_decoder,
63 symbol_table,
64 }) as Box<dyn PrimitivePageDecoder>)
65 }
66 .boxed()
67 }
68}
69
70struct FsstPageDecoder {
71 inner_decoder: Box<dyn PrimitivePageDecoder>,
72 symbol_table: LanceBuffer,
73}
74
75impl PrimitivePageDecoder for FsstPageDecoder {
76 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
77 let compressed_data = self.inner_decoder.decode(rows_to_skip, num_rows)?;
78 let (string_data, nulls) = match compressed_data {
79 DataBlock::Nullable(nullable) => {
80 let data = nullable.data.as_variable_width().unwrap();
81 Result::Ok((data, Some(nullable.nulls)))
82 }
83 DataBlock::VariableWidth(variable) => Ok((variable, None)),
84 _ => panic!("Received non-variable width data from inner decoder"),
85 }?;
86
87 let offsets = ScalarBuffer::<i32>::from(string_data.offsets.into_buffer());
88 let bytes = string_data.data.into_buffer();
89
90 let mut decompressed_offsets = vec![0_i32; offsets.len()];
91 let mut decompressed_bytes = vec![0_u8; bytes.len() * 8];
92 unsafe {
94 decompressed_bytes.set_len(decompressed_bytes.capacity());
95 }
96 fsst::fsst::decompress(
97 &self.symbol_table,
98 &bytes,
99 &offsets,
100 &mut decompressed_bytes,
101 &mut decompressed_offsets,
102 )?;
103
104 let mut offsets_as_bytes_mut = Vec::with_capacity(decompressed_offsets.len());
108 let decompressed_offsets = ScalarBuffer::<i32>::from(decompressed_offsets);
109 offsets_as_bytes_mut.extend_from_slice(decompressed_offsets.inner().as_slice());
110
111 let mut bytes_as_bytes_mut = Vec::with_capacity(decompressed_bytes.len());
112 bytes_as_bytes_mut.extend_from_slice(&decompressed_bytes);
113
114 let new_string_data = DataBlock::VariableWidth(VariableWidthBlock {
115 bits_per_offset: 32,
116 data: LanceBuffer::from(bytes_as_bytes_mut),
117 num_values: num_rows,
118 offsets: LanceBuffer::from(offsets_as_bytes_mut),
119 block_info: BlockInfo::new(),
120 });
121
122 if let Some(nulls) = nulls {
123 Ok(DataBlock::Nullable(NullableDataBlock {
124 data: Box::new(new_string_data),
125 nulls,
126 block_info: BlockInfo::new(),
127 }))
128 } else {
129 Ok(new_string_data)
130 }
131 }
132}
133
134#[derive(Debug)]
135pub struct FsstArrayEncoder {
136 inner_encoder: Box<dyn ArrayEncoder>,
137}
138
139impl FsstArrayEncoder {
140 pub fn new(inner_encoder: Box<dyn ArrayEncoder>) -> Self {
141 Self { inner_encoder }
142 }
143}
144
145impl ArrayEncoder for FsstArrayEncoder {
146 fn encode(
147 &self,
148 data: DataBlock,
149 data_type: &DataType,
150 buffer_index: &mut u32,
151 ) -> lance_core::Result<EncodedArray> {
152 let (mut data, nulls) = match data {
153 DataBlock::Nullable(nullable) => {
154 let data = nullable.data.as_variable_width().unwrap();
155 (data, Some(nullable.nulls))
156 }
157 DataBlock::VariableWidth(variable) => (variable, None),
158 _ => panic!("Expected variable width data block"),
159 };
160 assert_eq!(data.bits_per_offset, 32);
161 let num_values = data.num_values;
162 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
163 let offsets_slice = offsets.as_ref();
164 let bytes_data = data.data.into_buffer();
165
166 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
167 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
168 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
169
170 fsst::fsst::compress(
171 &mut symbol_table,
172 bytes_data.as_slice(),
173 offsets_slice,
174 &mut dest_values,
175 &mut dest_offsets,
176 )?;
177
178 let dest_offset = LanceBuffer::reinterpret_vec(dest_offsets);
179 let dest_values = LanceBuffer::Owned(dest_values);
180 let dest_data = DataBlock::VariableWidth(VariableWidthBlock {
181 bits_per_offset: 32,
182 data: dest_values,
183 num_values,
184 offsets: dest_offset,
185 block_info: BlockInfo::new(),
186 });
187
188 let data_block = if let Some(nulls) = nulls {
189 DataBlock::Nullable(NullableDataBlock {
190 data: Box::new(dest_data),
191 nulls,
192 block_info: BlockInfo::new(),
193 })
194 } else {
195 dest_data
196 };
197
198 let inner_encoded = self
199 .inner_encoder
200 .encode(data_block, data_type, buffer_index)?;
201
202 let encoding = ProtobufUtils::fsst(inner_encoded.encoding, symbol_table);
203
204 Ok(EncodedArray {
205 data: inner_encoded.data,
206 encoding,
207 })
208 }
209}
210
211struct FsstCompressed {
212 data: VariableWidthBlock,
213 symbol_table: Vec<u8>,
214}
215
216impl FsstCompressed {
217 fn fsst_compress(data: DataBlock) -> Result<Self> {
218 match data {
219 DataBlock::VariableWidth(mut variable_width) => {
220 let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
221 let offsets_slice = offsets.as_ref();
222 let bytes_data = variable_width.data.into_buffer();
223
224 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
226 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
227 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
228
229 fsst::fsst::compress(
231 &mut symbol_table,
232 bytes_data.as_slice(),
233 offsets_slice,
234 &mut dest_values,
235 &mut dest_offsets,
236 )?;
237
238 let compressed = VariableWidthBlock {
240 data: LanceBuffer::reinterpret_vec(dest_values),
241 bits_per_offset: 32,
242 offsets: LanceBuffer::reinterpret_vec(dest_offsets),
243 num_values: variable_width.num_values,
244 block_info: BlockInfo::new(),
245 };
246
247 Ok(Self {
248 data: compressed,
249 symbol_table,
250 })
251 }
252 _ => Err(Error::InvalidInput {
253 source: format!(
254 "Cannot compress a data block of type {} with FsstEncoder",
255 data.name()
256 )
257 .into(),
258 location: location!(),
259 }),
260 }
261 }
262}
263
264#[derive(Debug, Default)]
265pub struct FsstMiniBlockEncoder {}
266
267impl MiniBlockCompressor for FsstMiniBlockEncoder {
268 fn compress(
269 &self,
270 data: DataBlock,
271 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
272 let compressed = FsstCompressed::fsst_compress(data)?;
273
274 let data_block = DataBlock::VariableWidth(compressed.data);
275
276 let binary_compressor =
278 Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
279
280 let (binary_miniblock_compressed, binary_array_encoding) =
281 binary_compressor.compress(data_block)?;
282
283 Ok((
284 binary_miniblock_compressed,
285 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
286 ))
287 }
288}
289
290#[derive(Debug)]
291pub struct FsstPerValueEncoder {
292 inner: Box<dyn PerValueCompressor>,
293}
294
295impl FsstPerValueEncoder {
296 pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
297 Self { inner }
298 }
299}
300
301impl PerValueCompressor for FsstPerValueEncoder {
302 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
303 let compressed = FsstCompressed::fsst_compress(data)?;
304
305 let data_block = DataBlock::VariableWidth(compressed.data);
306
307 let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
308
309 Ok((
310 binary_compressed,
311 ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
312 ))
313 }
314}
315
316#[derive(Debug)]
317pub struct FsstPerValueDecompressor {
318 symbol_table: LanceBuffer,
319 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
320}
321
322impl FsstPerValueDecompressor {
323 pub fn new(
324 symbol_table: LanceBuffer,
325 inner_decompressor: Box<dyn VariablePerValueDecompressor>,
326 ) -> Self {
327 Self {
328 symbol_table,
329 inner_decompressor,
330 }
331 }
332}
333
334impl VariablePerValueDecompressor for FsstPerValueDecompressor {
335 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
336 let mut compressed_variable_data = self
338 .inner_decompressor
339 .decompress(data)?
340 .as_variable_width()
341 .unwrap();
342
343 let bytes = compressed_variable_data.data.borrow_to_typed_slice::<u8>();
345 let bytes = bytes.as_ref();
346 let offsets = compressed_variable_data
347 .offsets
348 .borrow_to_typed_slice::<i32>();
349 let offsets = offsets.as_ref();
350 let num_values = compressed_variable_data.num_values;
351
352 let mut decompress_bytes_buf = vec![0u8; bytes.len() * 8];
355 let mut decompress_offset_buf = vec![0i32; offsets.len()];
356 fsst::fsst::decompress(
357 &self.symbol_table,
358 bytes,
359 offsets,
360 &mut decompress_bytes_buf,
361 &mut decompress_offset_buf,
362 )?;
363
364 Ok(DataBlock::VariableWidth(VariableWidthBlock {
365 data: LanceBuffer::Owned(decompress_bytes_buf),
366 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
367 bits_per_offset: 32,
368 num_values,
369 block_info: BlockInfo::new(),
370 }))
371 }
372}
373
374#[derive(Debug)]
375pub struct FsstMiniBlockDecompressor {
376 symbol_table: LanceBuffer,
377}
378
379impl FsstMiniBlockDecompressor {
380 pub fn new(description: &pb::Fsst) -> Self {
381 Self {
382 symbol_table: LanceBuffer::from_bytes(description.symbol_table.clone(), 1),
383 }
384 }
385}
386
387impl MiniBlockDecompressor for FsstMiniBlockDecompressor {
388 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
389 let binary_decompressor =
391 Box::new(BinaryMiniBlockDecompressor::default()) as Box<dyn MiniBlockDecompressor>;
392 let compressed_data_block = binary_decompressor.decompress(data, num_values)?;
393 let DataBlock::VariableWidth(mut compressed_data_block) = compressed_data_block else {
394 panic!("BinaryMiniBlockDecompressor should output VariableWidth DataBlock")
395 };
396
397 let bytes = compressed_data_block.data.borrow_to_typed_slice::<u8>();
399 let bytes = bytes.as_ref();
400 let offsets = compressed_data_block.offsets.borrow_to_typed_slice::<i32>();
401 let offsets = offsets.as_ref();
402
403 let mut decompress_bytes_buf = vec![0u8; 4 * 1024 * 8];
407 let mut decompress_offset_buf = vec![0i32; 1024];
408 fsst::fsst::decompress(
409 &self.symbol_table,
410 bytes,
411 offsets,
412 &mut decompress_bytes_buf,
413 &mut decompress_offset_buf,
414 )?;
415
416 Ok(DataBlock::VariableWidth(VariableWidthBlock {
417 data: LanceBuffer::Owned(decompress_bytes_buf),
418 offsets: LanceBuffer::reinterpret_vec(decompress_offset_buf),
419 bits_per_offset: 32,
420 num_values,
421 block_info: BlockInfo::new(),
422 }))
423 }
424}
425
426#[cfg(test)]
427mod tests {
428
429 use std::collections::HashMap;
430
431 use lance_datagen::{ByteCount, RowCount};
432
433 use crate::{
434 testing::{check_round_trip_encoding_of_data, TestCases},
435 version::LanceFileVersion,
436 };
437
438 #[test_log::test(tokio::test)]
439 async fn test_fsst() {
440 let arr = lance_datagen::gen()
441 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(32), false))
442 .into_batch_rows(RowCount::from(1_000_000))
443 .unwrap()
444 .column(0)
445 .clone();
446 check_round_trip_encoding_of_data(
447 vec![arr],
448 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
449 HashMap::new(),
450 )
451 .await;
452
453 let arr = lance_datagen::gen()
454 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(64), false))
455 .into_batch_rows(RowCount::from(1_000_000))
456 .unwrap()
457 .column(0)
458 .clone();
459 check_round_trip_encoding_of_data(
460 vec![arr],
461 &TestCases::default().with_file_version(LanceFileVersion::V2_1),
462 HashMap::new(),
463 )
464 .await;
465 }
466}