lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use core::panic;
5use std::sync::Arc;
6
7use arrow_array::cast::AsArray;
8use arrow_array::types::UInt64Type;
9use arrow_array::ArrayRef;
10use arrow_buffer::{bit_util, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
11use bytemuck::{cast_slice, try_cast_slice};
12use byteorder::{ByteOrder, LittleEndian};
13use futures::TryFutureExt;
14use lance_core::utils::bit::pad_bytes;
15use snafu::location;
16
17use futures::{future::BoxFuture, FutureExt};
18
19use crate::decoder::{
20    BlockDecompressor, LogicalPageDecoder, MiniBlockDecompressor, VariablePerValueDecompressor,
21};
22use crate::encoder::{
23    BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
24    PerValueDataBlock,
25};
26use crate::encodings::logical::primitive::PrimitiveFieldDecoder;
27
28use crate::buffer::LanceBuffer;
29use crate::data::{
30    BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
31};
32use crate::format::{pb, ProtobufUtils};
33use crate::{
34    decoder::{PageScheduler, PrimitivePageDecoder},
35    encoder::{ArrayEncoder, EncodedArray},
36    EncodingsIo,
37};
38
39use arrow_array::{PrimitiveArray, UInt64Array};
40use arrow_schema::DataType;
41use lance_core::{Error, Result};
42
43use super::block_compress::{BufferCompressor, CompressionConfig, GeneralBufferCompressor};
44
45struct IndicesNormalizer {
46    indices: Vec<u64>,
47    validity: BooleanBufferBuilder,
48    null_adjustment: u64,
49}
50
51impl IndicesNormalizer {
52    fn new(num_rows: u64, null_adjustment: u64) -> Self {
53        let mut indices = Vec::with_capacity(num_rows as usize);
54        indices.push(0);
55        Self {
56            indices,
57            validity: BooleanBufferBuilder::new(num_rows as usize),
58            null_adjustment,
59        }
60    }
61
62    fn normalize(&self, val: u64) -> (bool, u64) {
63        if val >= self.null_adjustment {
64            (false, val - self.null_adjustment)
65        } else {
66            (true, val)
67        }
68    }
69
70    fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) {
71        let mut last = *self.indices.last().unwrap();
72        if is_start {
73            let (is_valid, val) = self.normalize(new_indices.value(0));
74            self.indices.push(val);
75            self.validity.append(is_valid);
76            last += val;
77        }
78        let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
79        for w in new_indices.values().windows(2) {
80            let (is_valid, val) = self.normalize(w[1]);
81            let next = val - prev + last;
82            self.indices.push(next);
83            self.validity.append(is_valid);
84            prev = val;
85            last = next;
86        }
87    }
88
89    fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
90        (self.indices, self.validity.finish())
91    }
92}
93
94#[derive(Debug)]
95pub struct BinaryPageScheduler {
96    indices_scheduler: Arc<dyn PageScheduler>,
97    bytes_scheduler: Arc<dyn PageScheduler>,
98    offsets_type: DataType,
99    null_adjustment: u64,
100}
101
102impl BinaryPageScheduler {
103    pub fn new(
104        indices_scheduler: Arc<dyn PageScheduler>,
105        bytes_scheduler: Arc<dyn PageScheduler>,
106        offsets_type: DataType,
107        null_adjustment: u64,
108    ) -> Self {
109        Self {
110            indices_scheduler,
111            bytes_scheduler,
112            offsets_type,
113            null_adjustment,
114        }
115    }
116
117    fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
118        let mut primitive_wrapper =
119            PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
120        let drained_task = primitive_wrapper.drain(num_rows)?;
121        let indices_decode_task = drained_task.task;
122        indices_decode_task.decode()
123    }
124}
125
126struct IndirectData {
127    decoded_indices: UInt64Array,
128    offsets_type: DataType,
129    validity: BooleanBuffer,
130    bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
131}
132
133impl PageScheduler for BinaryPageScheduler {
134    fn schedule_ranges(
135        &self,
136        ranges: &[std::ops::Range<u64>],
137        scheduler: &Arc<dyn EncodingsIo>,
138        top_level_row: u64,
139    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
140        // ranges corresponds to row ranges that the user wants to fetch.
141        // if user wants row range a..b
142        // Case 1: if a != 0, we need indices a-1..b to decode
143        // Case 2: if a = 0, we need indices 0..b to decode
144        let indices_ranges = ranges
145            .iter()
146            .map(|range| {
147                if range.start != 0 {
148                    (range.start - 1)..range.end
149                } else {
150                    0..range.end
151                }
152            })
153            .collect::<Vec<std::ops::Range<u64>>>();
154
155        // We schedule all the indices for decoding together
156        // This is more efficient compared to scheduling them one by one (reduces speed significantly for random access)
157        let indices_page_decoder =
158            self.indices_scheduler
159                .schedule_ranges(&indices_ranges, scheduler, top_level_row);
160
161        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
162        let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
163
164        let ranges = ranges.to_vec();
165        let copy_scheduler = scheduler.clone();
166        let copy_bytes_scheduler = self.bytes_scheduler.clone();
167        let null_adjustment = self.null_adjustment;
168        let offsets_type = self.offsets_type.clone();
169
170        tokio::spawn(async move {
171            // For the following data:
172            // "abcd", "hello", "abcd", "apple", "hello", "abcd"
173            //   4,        9,     13,      18,      23,     27
174            // e.g. want to scan rows 0, 2, 4
175            // i.e. offsets are 4 | 9, 13 | 18, 23
176            // Normalization is required for decoding later on
177            // Normalize each part: 0, 4 | 0, 4 | 0, 5
178            // Remove leading zeros except first one: 0, 4 | 4 | 5
179            // Cumulative sum: 0, 4 | 8 | 13
180            // These are the normalized offsets stored in decoded_indices
181            // Rest of the workflow is continued later in BinaryPageDecoder
182            let indices_decoder = Arc::from(indices_page_decoder.await?);
183            let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
184            let decoded_indices = indices.as_primitive::<UInt64Type>();
185
186            let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
187            let mut bytes_ranges = Vec::new();
188            let mut curr_offset_index = 0;
189
190            for curr_row_range in ranges.iter() {
191                let row_start = curr_row_range.start;
192                let curr_range_len = (curr_row_range.end - row_start) as usize;
193
194                let curr_indices;
195
196                if row_start == 0 {
197                    curr_indices = decoded_indices.slice(0, curr_range_len);
198                    curr_offset_index = curr_range_len;
199                } else {
200                    curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
201                    curr_offset_index += curr_range_len + 1;
202                }
203
204                let first = if row_start == 0 {
205                    0
206                } else {
207                    indices_builder
208                        .normalize(*curr_indices.values().first().unwrap())
209                        .1
210                };
211                let last = indices_builder
212                    .normalize(*curr_indices.values().last().unwrap())
213                    .1;
214                if first != last {
215                    bytes_ranges.push(first..last);
216                }
217
218                indices_builder.extend(&curr_indices, row_start == 0);
219            }
220
221            let (indices, validity) = indices_builder.into_parts();
222            let decoded_indices = UInt64Array::from(indices);
223
224            // In the indirect task we schedule the bytes, but we do not await them.  We don't want to
225            // await the bytes until the decoder is ready for them so that we don't release the backpressure
226            // too early
227            let bytes_decoder_fut =
228                copy_bytes_scheduler.schedule_ranges(&bytes_ranges, &copy_scheduler, top_level_row);
229
230            Ok(IndirectData {
231                decoded_indices,
232                validity,
233                offsets_type,
234                bytes_decoder_fut,
235            })
236        })
237        // Propagate join panic
238        .map(|join_handle| join_handle.unwrap())
239        .and_then(|indirect_data| {
240            async move {
241                // Later, this will be called once the decoder actually starts polling.  At that point
242                // we await the bytes (releasing the backpressure)
243                let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
244                Ok(Box::new(BinaryPageDecoder {
245                    decoded_indices: indirect_data.decoded_indices,
246                    offsets_type: indirect_data.offsets_type,
247                    validity: indirect_data.validity,
248                    bytes_decoder,
249                }) as Box<dyn PrimitivePageDecoder>)
250            }
251        })
252        .boxed()
253    }
254}
255
256struct BinaryPageDecoder {
257    decoded_indices: UInt64Array,
258    offsets_type: DataType,
259    validity: BooleanBuffer,
260    bytes_decoder: Box<dyn PrimitivePageDecoder>,
261}
262
263impl PrimitivePageDecoder for BinaryPageDecoder {
264    // Continuing the example from BinaryPageScheduler
265    // Suppose batch_size = 2. Then first, rows_to_skip=0, num_rows=2
266    // Need to scan 2 rows
267    // First row will be 4-0=4 bytes, second also 8-4=4 bytes.
268    // Allocate 8 bytes capacity.
269    // Next rows_to_skip=2, num_rows=1
270    // Skip 8 bytes. Allocate 5 bytes capacity.
271    //
272    // The normalized offsets are [0, 4, 8, 13]
273    // We only need [8, 13] to decode in this case.
274    // These need to be normalized in order to build the string later
275    // So return [0, 5]
276    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
277        // STEP 1: validity buffer
278        let target_validity = self
279            .validity
280            .slice(rows_to_skip as usize, num_rows as usize);
281        let has_nulls = target_validity.count_set_bits() < target_validity.len();
282
283        let validity_buffer = if has_nulls {
284            let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
285            let mut validity_buffer = Vec::with_capacity(num_validity_bits);
286
287            if rows_to_skip == 0 {
288                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
289            } else {
290                // Need to copy the buffer because there may be a bit offset in first byte
291                let target_validity = BooleanBuffer::from_iter(target_validity.iter());
292                validity_buffer.extend_from_slice(target_validity.inner().as_slice());
293            }
294            Some(validity_buffer)
295        } else {
296            None
297        };
298
299        // STEP 2: offsets buffer
300        // Currently we always do a copy here, we need to cast to the appropriate type
301        // and we go ahead and normalize so the starting offset is 0 (though we could skip
302        // this)
303        let bytes_per_offset = match self.offsets_type {
304            DataType::Int32 => 4,
305            DataType::Int64 => 8,
306            _ => panic!("Unsupported offsets type"),
307        };
308
309        let target_offsets = self
310            .decoded_indices
311            .slice(rows_to_skip as usize, (num_rows + 1) as usize);
312
313        // Normalize and cast (TODO: could fuse these into one pass for micro-optimization)
314        let target_vec = target_offsets.values();
315        let start = target_vec[0];
316        let offsets_buffer =
317            match bytes_per_offset {
318                4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
319                    .into_inner(),
320                8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
321                    .into_inner(),
322                _ => panic!("Unsupported offsets type"),
323            };
324
325        let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
326        let num_bytes = self
327            .decoded_indices
328            .value((rows_to_skip + num_rows) as usize)
329            - bytes_to_skip;
330
331        let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
332        let bytes = bytes.as_fixed_width().unwrap();
333        debug_assert_eq!(bytes.bits_per_value, 8);
334
335        let string_data = DataBlock::VariableWidth(VariableWidthBlock {
336            bits_per_offset: bytes_per_offset * 8,
337            data: bytes.data,
338            num_values: num_rows,
339            offsets: LanceBuffer::from(offsets_buffer),
340            block_info: BlockInfo::new(),
341        });
342        if let Some(validity) = validity_buffer {
343            Ok(DataBlock::Nullable(NullableDataBlock {
344                data: Box::new(string_data),
345                nulls: LanceBuffer::from(validity),
346                block_info: BlockInfo::new(),
347            }))
348        } else {
349            Ok(string_data)
350        }
351    }
352}
353
354#[derive(Debug)]
355pub struct BinaryEncoder {
356    indices_encoder: Box<dyn ArrayEncoder>,
357    compression_config: Option<CompressionConfig>,
358    buffer_compressor: Option<Box<dyn BufferCompressor>>,
359}
360
361impl BinaryEncoder {
362    pub fn new(
363        indices_encoder: Box<dyn ArrayEncoder>,
364        compression_config: Option<CompressionConfig>,
365    ) -> Self {
366        let buffer_compressor = compression_config.map(GeneralBufferCompressor::get_compressor);
367        Self {
368            indices_encoder,
369            compression_config,
370            buffer_compressor,
371        }
372    }
373
374    // In 2.1 we will materialize nulls higher up (in the primitive encoder).  Unfortunately,
375    // in 2.0 we actually need to write the offsets.
376    fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
377        if matches!(data_type, DataType::Binary | DataType::Utf8) {
378            VariableWidthBlock {
379                bits_per_offset: 32,
380                data: LanceBuffer::empty(),
381                num_values,
382                offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
383                block_info: BlockInfo::new(),
384            }
385        } else {
386            VariableWidthBlock {
387                bits_per_offset: 64,
388                data: LanceBuffer::empty(),
389                num_values,
390                offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
391                block_info: BlockInfo::new(),
392            }
393        }
394    }
395}
396
397// Creates indices arrays from string arrays
398// Strings are a vector of arrays corresponding to each record batch
399// Zero offset is removed from the start of the offsets array
400// The indices array is computed across all arrays in the vector
401fn get_indices_from_string_arrays(
402    mut offsets: LanceBuffer,
403    bits_per_offset: u8,
404    nulls: Option<LanceBuffer>,
405    num_rows: usize,
406) -> (DataBlock, u64) {
407    let mut indices = Vec::with_capacity(num_rows);
408    let mut last_offset = 0_u64;
409    if bits_per_offset == 32 {
410        let offsets = offsets.borrow_to_typed_slice::<i32>();
411        indices.extend(offsets.as_ref().windows(2).map(|w| {
412            let strlen = (w[1] - w[0]) as u64;
413            last_offset += strlen;
414            last_offset
415        }));
416    } else if bits_per_offset == 64 {
417        let offsets = offsets.borrow_to_typed_slice::<i64>();
418        indices.extend(offsets.as_ref().windows(2).map(|w| {
419            let strlen = (w[1] - w[0]) as u64;
420            last_offset += strlen;
421            last_offset
422        }));
423    }
424
425    if indices.is_empty() {
426        return (
427            DataBlock::FixedWidth(FixedWidthDataBlock {
428                bits_per_value: 64,
429                data: LanceBuffer::empty(),
430                num_values: 0,
431                block_info: BlockInfo::new(),
432            }),
433            0,
434        );
435    }
436
437    let last_offset = *indices.last().expect("Indices array is empty");
438    // 8 exabytes in a single array seems unlikely but...just in case
439    assert!(
440        last_offset < u64::MAX / 2,
441        "Indices array with strings up to 2^63 is too large for this encoding"
442    );
443    let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
444
445    if let Some(nulls) = nulls {
446        let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
447        indices
448            .iter_mut()
449            .zip(nulls.iter())
450            .for_each(|(index, is_valid)| {
451                if !is_valid {
452                    *index += null_adjustment;
453                }
454            });
455    }
456    let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
457        bits_per_value: 64,
458        data: LanceBuffer::reinterpret_vec(indices),
459        num_values: num_rows as u64,
460        block_info: BlockInfo::new(),
461    });
462    (indices, null_adjustment)
463}
464
465impl ArrayEncoder for BinaryEncoder {
466    fn encode(
467        &self,
468        data: DataBlock,
469        data_type: &DataType,
470        buffer_index: &mut u32,
471    ) -> Result<EncodedArray> {
472        let (mut data, nulls) = match data {
473            DataBlock::Nullable(nullable) => {
474                let data = nullable.data.as_variable_width().unwrap();
475                (data, Some(nullable.nulls))
476            }
477            DataBlock::VariableWidth(variable) => (variable, None),
478            DataBlock::AllNull(all_null) => {
479                let data = Self::all_null_variable_width(data_type, all_null.num_values);
480                let validity =
481                    LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
482                (data, Some(validity))
483            }
484            _ => panic!("Expected variable width data block but got {}", data.name()),
485        };
486
487        let (indices, null_adjustment) = get_indices_from_string_arrays(
488            data.offsets,
489            data.bits_per_offset,
490            nulls,
491            data.num_values as usize,
492        );
493        let encoded_indices =
494            self.indices_encoder
495                .encode(indices, &DataType::UInt64, buffer_index)?;
496
497        let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
498
499        assert!(encoded_indices_data.bits_per_value <= 64);
500
501        if let Some(buffer_compressor) = &self.buffer_compressor {
502            let mut compressed_data = Vec::with_capacity(data.data.len());
503            buffer_compressor.compress(&data.data, &mut compressed_data)?;
504            data.data = LanceBuffer::Owned(compressed_data);
505        }
506
507        let data = DataBlock::VariableWidth(VariableWidthBlock {
508            bits_per_offset: encoded_indices_data.bits_per_value as u8,
509            offsets: encoded_indices_data.data,
510            data: data.data,
511            num_values: data.num_values,
512            block_info: BlockInfo::new(),
513        });
514
515        let bytes_buffer_index = *buffer_index;
516        *buffer_index += 1;
517
518        let bytes_encoding = ProtobufUtils::flat_encoding(
519            /*bits_per_value=*/ 8,
520            bytes_buffer_index,
521            self.compression_config,
522        );
523
524        let encoding =
525            ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
526
527        Ok(EncodedArray { data, encoding })
528    }
529}
530
531#[derive(Debug, Default)]
532pub struct BinaryMiniBlockEncoder {}
533
534const AIM_MINICHUNK_SIZE: u32 = 4 * 1024;
535const BINARY_MINIBLOCK_CHUNK_ALIGNMENT: usize = 4;
536
537// search for the next offset index to cut the values into a chunk.
538// this function incrementally peek the number of values in a chunk,
539// each time multiplies the number of values by 2.
540// It returns the offset_idx in `offsets` that belongs to this chunk.
541fn search_next_offset_idx(offsets: &[u32], last_offset_idx: usize) -> usize {
542    let mut num_values = 1;
543    let mut new_num_values = num_values * 2;
544    loop {
545        if last_offset_idx + new_num_values >= offsets.len() {
546            if (offsets[offsets.len() - 1] - offsets[last_offset_idx])
547                + (offsets.len() - last_offset_idx) as u32 * 4
548                <= AIM_MINICHUNK_SIZE
549            {
550                // case 1: can fit the rest of all data into a miniblock
551                return offsets.len() - 1;
552            } else {
553                // case 2: can only fit the last tried `num_values` into a miniblock
554                return last_offset_idx + num_values;
555            }
556        }
557        if ((offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx])
558            + ((new_num_values + 1) * 4) as u32)
559            <= AIM_MINICHUNK_SIZE
560        {
561            num_values = new_num_values;
562            new_num_values *= 2;
563        } else {
564            break;
565        }
566    }
567    last_offset_idx + num_values
568}
569
570impl BinaryMiniBlockEncoder {
571    // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`.
572    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
573    // the offsets in the chunk points to the bytes offset in this chunk.
574    fn chunk_data(
575        &self,
576        mut data: VariableWidthBlock,
577    ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
578        assert!(data.bits_per_offset == 32);
579
580        let offsets = data.offsets.borrow_to_typed_slice::<u32>();
581        let offsets = offsets.as_ref();
582
583        assert!(offsets.len() > 1);
584
585        #[derive(Debug)]
586        struct ChunkInfo {
587            chunk_start_offset_in_orig_idx: usize,
588            chunk_last_offset_in_orig_idx: usize,
589            // the bytes in every chunk starts at `chunk.bytes_start_offset`
590            bytes_start_offset: usize,
591            // every chunk is padded to 8 bytes.
592            // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
593            // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
594            // the `output_total_bytes`.
595            padded_chunk_size: usize,
596        }
597
598        let mut chunks_info = vec![];
599        let mut chunks = vec![];
600        let mut last_offset_in_orig_idx = 0;
601        const CHUNK_PAD_BUFFER: [u8; BINARY_MINIBLOCK_CHUNK_ALIGNMENT] =
602            [72; BINARY_MINIBLOCK_CHUNK_ALIGNMENT];
603        loop {
604            let this_last_offset_in_orig_idx =
605                search_next_offset_idx(offsets, last_offset_in_orig_idx);
606
607            // case 1: last chunk
608            if this_last_offset_in_orig_idx == offsets.len() - 1 {
609                let num_values_in_this_chunk =
610                    this_last_offset_in_orig_idx - last_offset_in_orig_idx;
611
612                let this_chunk_size = (num_values_in_this_chunk + 1) * 4
613                    + (offsets[offsets.len() - 1] - offsets[last_offset_in_orig_idx]) as usize;
614
615                let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
616
617                // the bytes are put after the offsets
618                let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
619                chunks_info.push(ChunkInfo {
620                    chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
621                    chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
622                    bytes_start_offset: this_chunk_bytes_start_offset,
623                    padded_chunk_size,
624                });
625                chunks.push(MiniBlockChunk {
626                    log_num_values: 0,
627                    num_bytes: padded_chunk_size as u16,
628                });
629                break;
630            } else {
631                // case 2: not the last chunk
632                let num_values_in_this_chunk =
633                    this_last_offset_in_orig_idx - last_offset_in_orig_idx;
634
635                let this_chunk_size = (num_values_in_this_chunk + 1) * 4
636                    + (offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx])
637                        as usize;
638
639                let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
640
641                // the bytes are put after the offsets
642                let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
643
644                chunks_info.push(ChunkInfo {
645                    chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
646                    chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
647                    bytes_start_offset: this_chunk_bytes_start_offset,
648                    padded_chunk_size,
649                });
650
651                chunks.push(MiniBlockChunk {
652                    log_num_values: num_values_in_this_chunk.trailing_zeros() as u8,
653                    num_bytes: padded_chunk_size as u16,
654                });
655
656                last_offset_in_orig_idx = this_last_offset_in_orig_idx;
657            }
658        }
659        let output_total_bytes = chunks_info
660            .iter()
661            .map(|chunk_info| chunk_info.padded_chunk_size)
662            .sum::<usize>();
663
664        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
665        for chunk in chunks_info {
666            // `this_chunk_offsets` are offsets that points to bytes in this chunk,
667            let this_chunk_offsets = offsets
668                [chunk.chunk_start_offset_in_orig_idx..chunk.chunk_last_offset_in_orig_idx + 1]
669                .iter()
670                .map(|offset| {
671                    offset - offsets[chunk.chunk_start_offset_in_orig_idx]
672                        + chunk.bytes_start_offset as u32
673                })
674                .collect::<Vec<_>>();
675
676            output.extend_from_slice(cast_slice(&this_chunk_offsets));
677
678            let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx];
679            let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx];
680
681            output.extend_from_slice(&data.data[start_in_orig as usize..end_in_orig as usize]);
682
683            // pad this chunk to make it align to 4 bytes.
684            output.extend_from_slice(
685                &CHUNK_PAD_BUFFER[..pad_bytes::<BINARY_MINIBLOCK_CHUNK_ALIGNMENT>(output.len())],
686            );
687        }
688
689        (
690            MiniBlockCompressed {
691                data: LanceBuffer::reinterpret_vec(output),
692                chunks,
693                num_values: data.num_values,
694            },
695            ProtobufUtils::variable(/*bits_per_value=*/ 32),
696        )
697    }
698}
699
700impl MiniBlockCompressor for BinaryMiniBlockEncoder {
701    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
702        match data {
703            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
704            _ => Err(Error::InvalidInput {
705                source: format!(
706                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
707                    data.name()
708                )
709                .into(),
710                location: location!(),
711            }),
712        }
713    }
714}
715
716#[derive(Debug, Default)]
717pub struct BinaryMiniBlockDecompressor {}
718
719impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
720    // decompress a MiniBlock of binary data, the num_values must be less than or equal
721    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
722    // it has so assertion can not be done here and the caller of `decompress` must ensure `num_values` <= number of values in the chunk.
723    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
724        assert!(data.len() >= 8);
725        let offsets: &[u32] = try_cast_slice(&data)
726            .expect("casting buffer failed during BinaryMiniBlock decompression");
727
728        let result_offsets = offsets[0..(num_values + 1) as usize]
729            .iter()
730            .map(|offset| offset - offsets[0])
731            .collect::<Vec<u32>>();
732
733        Ok(DataBlock::VariableWidth(VariableWidthBlock {
734            data: LanceBuffer::Owned(
735                data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
736            ),
737            offsets: LanceBuffer::reinterpret_vec(result_offsets),
738            bits_per_offset: 32,
739            num_values,
740            block_info: BlockInfo::new(),
741        }))
742    }
743}
744
745/// Most basic encoding for variable-width data which does no compression at all
746#[derive(Debug, Default)]
747pub struct VariableEncoder {}
748
749impl BlockCompressor for VariableEncoder {
750    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
751        let num_values: u32 = data
752            .num_values()
753            .try_into()
754            .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
755
756        match data {
757            DataBlock::VariableWidth(mut variable_width_data) => {
758                if variable_width_data.bits_per_offset != 32 {
759                    panic!("BinaryBlockEncoder only works with 32 bits per offset VariableWidth DataBlock.");
760                }
761                let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
762                let offsets = offsets.as_ref();
763                // the first 4 bytes store the number of values, then 4 bytes for bytes_start_offset,
764                // then offsets data, then bytes data.
765                let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
766
767                let output_total_bytes =
768                    bytes_start_offset as usize + variable_width_data.data.len();
769                let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
770
771                // store `num_values` in the first 4 bytes of output buffer
772                output.extend_from_slice(&(num_values).to_le_bytes());
773
774                // store `bytes_start_offset` in the next 4 bytes of output buffer
775                output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
776
777                // store offsets
778                output.extend_from_slice(cast_slice(offsets));
779
780                // store bytes
781                output.extend_from_slice(&variable_width_data.data);
782
783                Ok(LanceBuffer::Owned(output))
784            }
785            _ => {
786                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
787            }
788        }
789    }
790}
791
792impl PerValueCompressor for VariableEncoder {
793    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
794        let DataBlock::VariableWidth(variable) = data else {
795            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
796        };
797
798        let encoding = ProtobufUtils::variable(variable.bits_per_offset);
799        Ok((PerValueDataBlock::Variable(variable), encoding))
800    }
801}
802
803#[derive(Debug, Default)]
804pub struct VariableDecoder {}
805
806impl VariablePerValueDecompressor for VariableDecoder {
807    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
808        Ok(DataBlock::VariableWidth(data))
809    }
810}
811
812#[derive(Debug, Default)]
813pub struct BinaryBlockDecompressor {}
814
815impl BlockDecompressor for BinaryBlockDecompressor {
816    fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
817        // the first 4 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
818        let num_values = LittleEndian::read_u32(&data[..4]) as u64;
819
820        // the next 4 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
821        let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
822
823        // the next `bytes_start_offset - 8` stores the offsets.
824        let offsets = data.slice_with_length(8, bytes_start_offset as usize - 8);
825
826        // the rest are the binary bytes.
827        let data = data.slice_with_length(
828            bytes_start_offset as usize,
829            data.len() - bytes_start_offset as usize,
830        );
831
832        Ok(DataBlock::VariableWidth(VariableWidthBlock {
833            data,
834            offsets,
835            bits_per_offset: 32,
836            num_values,
837            block_info: BlockInfo::new(),
838        }))
839    }
840}
841
842#[cfg(test)]
843pub mod tests {
844    use arrow_array::{
845        builder::{LargeStringBuilder, StringBuilder},
846        ArrayRef, StringArray,
847    };
848    use arrow_schema::{DataType, Field};
849
850    use lance_core::datatypes::{
851        COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
852        STRUCTURAL_ENCODING_MINIBLOCK,
853    };
854    use rstest::rstest;
855    use std::{collections::HashMap, sync::Arc, vec};
856
857    use crate::{
858        buffer::LanceBuffer,
859        data::DataBlock,
860        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
861        version::LanceFileVersion,
862    };
863
864    use super::get_indices_from_string_arrays;
865    #[rstest]
866    #[test_log::test(tokio::test)]
867    async fn test_utf8_binary(
868        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
869    ) {
870        let field = Field::new("", DataType::Utf8, false);
871        check_round_trip_encoding_random(field, version).await;
872    }
873
874    #[test]
875    fn test_encode_indices_adjusts_nulls() {
876        // Null entries in string arrays should be adjusted
877        let string_array = Arc::new(StringArray::from(vec![
878            None,
879            Some("foo"),
880            Some("foo"),
881            None,
882            None,
883            None,
884        ])) as ArrayRef;
885        let string_data = DataBlock::from(string_array).as_nullable().unwrap();
886        let nulls = string_data.nulls;
887        let string_data = string_data.data.as_variable_width().unwrap();
888
889        let (indices, null_adjustment) = get_indices_from_string_arrays(
890            string_data.offsets,
891            string_data.bits_per_offset,
892            Some(nulls),
893            string_data.num_values as usize,
894        );
895
896        let indices = indices.as_fixed_width().unwrap();
897        assert_eq!(indices.bits_per_value, 64);
898        assert_eq!(
899            indices.data,
900            LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
901        );
902        assert_eq!(null_adjustment, 7);
903    }
904
905    #[rstest]
906    #[test_log::test(tokio::test)]
907    async fn test_binary(
908        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
909        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
910        structural_encoding: &str,
911        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
912    ) {
913        use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
914
915        let mut field_metadata = HashMap::new();
916        field_metadata.insert(
917            STRUCTURAL_ENCODING_META_KEY.to_string(),
918            structural_encoding.into(),
919        );
920
921        let field = Field::new("", data_type, false).with_metadata(field_metadata);
922        check_round_trip_encoding_random(field, version).await;
923    }
924
925    #[rstest]
926    #[test_log::test(tokio::test)]
927    async fn test_binary_fsst(
928        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
929        structural_encoding: &str,
930    ) {
931        let mut field_metadata = HashMap::new();
932        field_metadata.insert(
933            STRUCTURAL_ENCODING_META_KEY.to_string(),
934            structural_encoding.into(),
935        );
936        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
937
938        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
939        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
940    }
941
942    #[test_log::test(tokio::test)]
943    async fn test_large_binary() {
944        let field = Field::new("", DataType::LargeBinary, true);
945        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
946    }
947
948    #[test_log::test(tokio::test)]
949    async fn test_large_utf8() {
950        let field = Field::new("", DataType::LargeUtf8, true);
951        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
952    }
953
954    #[rstest]
955    #[test_log::test(tokio::test)]
956    async fn test_simple_binary(
957        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
958        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
959        structural_encoding: &str,
960        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
961    ) {
962        use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
963
964        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
965        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
966
967        let mut field_metadata = HashMap::new();
968        field_metadata.insert(
969            STRUCTURAL_ENCODING_META_KEY.to_string(),
970            structural_encoding.into(),
971        );
972
973        let test_cases = TestCases::default()
974            .with_range(0..2)
975            .with_range(0..3)
976            .with_range(1..3)
977            .with_indices(vec![0, 1, 3, 4])
978            .with_file_version(version);
979        check_round_trip_encoding_of_data(
980            vec![Arc::new(string_array)],
981            &test_cases,
982            field_metadata,
983        )
984        .await;
985    }
986
987    #[rstest]
988    #[test_log::test(tokio::test)]
989    async fn test_sliced_utf8(
990        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
991    ) {
992        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
993        let string_array = string_array.slice(1, 3);
994
995        let test_cases = TestCases::default()
996            .with_range(0..1)
997            .with_range(0..2)
998            .with_range(1..2)
999            .with_file_version(version);
1000        check_round_trip_encoding_of_data(
1001            vec![Arc::new(string_array)],
1002            &test_cases,
1003            HashMap::new(),
1004        )
1005        .await;
1006    }
1007
1008    #[test_log::test(tokio::test)]
1009    async fn test_bigger_than_max_page_size() {
1010        // Create an array with one single 32MiB string
1011        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
1012        let string_array = StringArray::from(vec![
1013            Some(big_string),
1014            Some("abc".to_string()),
1015            None,
1016            None,
1017            Some("xyz".to_string()),
1018        ]);
1019
1020        // Drop the max page size to 1MiB
1021        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
1022
1023        check_round_trip_encoding_of_data(
1024            vec![Arc::new(string_array)],
1025            &test_cases,
1026            HashMap::new(),
1027        )
1028        .await;
1029
1030        // This is a regression testing the case where a page with X rows is split into Y parts
1031        // where the number of parts is not evenly divisible by the number of rows.  In this
1032        // case we are splitting 90 rows into 4 parts.
1033        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
1034        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
1035
1036        check_round_trip_encoding_of_data(
1037            vec![Arc::new(string_array)],
1038            &TestCases::default(),
1039            HashMap::new(),
1040        )
1041        .await;
1042    }
1043
1044    #[rstest]
1045    #[test_log::test(tokio::test)]
1046    async fn test_empty_strings(
1047        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1048    ) {
1049        // Scenario 1: Some strings are empty
1050
1051        let values = [Some("abc"), Some(""), None];
1052        // Test empty list at beginning, middle, and end
1053        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1054            let mut string_builder = StringBuilder::new();
1055            for idx in order {
1056                string_builder.append_option(values[idx]);
1057            }
1058            let string_array = Arc::new(string_builder.finish());
1059            let test_cases = TestCases::default()
1060                .with_indices(vec![1])
1061                .with_indices(vec![0])
1062                .with_indices(vec![2])
1063                .with_indices(vec![0, 1])
1064                .with_file_version(version);
1065            check_round_trip_encoding_of_data(
1066                vec![string_array.clone()],
1067                &test_cases,
1068                HashMap::new(),
1069            )
1070            .await;
1071            let test_cases = test_cases.with_batch_size(1);
1072            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
1073                .await;
1074        }
1075
1076        // Scenario 2: All strings are empty
1077
1078        // When encoding an array of empty strings there are no bytes to encode
1079        // which is strange and we want to ensure we handle it
1080        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
1081
1082        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1083        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
1084            .await;
1085        let test_cases = test_cases.with_batch_size(1);
1086        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
1087    }
1088
1089    #[test_log::test(tokio::test)]
1090    #[ignore] // This test is quite slow in debug mode
1091    async fn test_jumbo_string() {
1092        // This is an overflow test.  We have a list of lists where each list
1093        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
1094        // offsets range
1095        let mut string_builder = LargeStringBuilder::new();
1096        // a 1 MiB string
1097        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
1098        for _ in 0..5000 {
1099            string_builder.append_option(Some(&giant_string));
1100        }
1101        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
1102        let arrs = vec![giant_array];
1103
1104        // // We can't validate because our validation relies on concatenating all input arrays
1105        let test_cases = TestCases::default().without_validation();
1106        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1107    }
1108
1109    #[test_log::test(tokio::test)]
1110    async fn test_binary_dictionary_encoding() {
1111        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1112        let strings = [
1113            "Hal Abelson",
1114            "Charles Babbage",
1115            "Vint Cerf",
1116            "Jim Gray",
1117            "Alonzo Church",
1118            "Edgar F. Codd",
1119        ];
1120        let repeated_strings: Vec<_> = strings
1121            .iter()
1122            .cycle()
1123            .take(strings.len() * 10000)
1124            .cloned()
1125            .collect();
1126        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
1127        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
1128    }
1129}