1use core::panic;
5use std::sync::Arc;
6
7use arrow_array::cast::AsArray;
8use arrow_array::types::UInt64Type;
9use arrow_array::ArrayRef;
10use arrow_buffer::{bit_util, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
11use bytemuck::{cast_slice, try_cast_slice};
12use byteorder::{ByteOrder, LittleEndian};
13use futures::TryFutureExt;
14use lance_core::utils::bit::pad_bytes;
15use snafu::location;
16
17use futures::{future::BoxFuture, FutureExt};
18
19use crate::decoder::{
20 BlockDecompressor, LogicalPageDecoder, MiniBlockDecompressor, VariablePerValueDecompressor,
21};
22use crate::encoder::{
23 BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
24 PerValueDataBlock,
25};
26use crate::encodings::logical::primitive::PrimitiveFieldDecoder;
27
28use crate::buffer::LanceBuffer;
29use crate::data::{
30 BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
31};
32use crate::format::{pb, ProtobufUtils};
33use crate::{
34 decoder::{PageScheduler, PrimitivePageDecoder},
35 encoder::{ArrayEncoder, EncodedArray},
36 EncodingsIo,
37};
38
39use arrow_array::{PrimitiveArray, UInt64Array};
40use arrow_schema::DataType;
41use lance_core::{Error, Result};
42
43use super::block_compress::{BufferCompressor, CompressionConfig, GeneralBufferCompressor};
44
45struct IndicesNormalizer {
46 indices: Vec<u64>,
47 validity: BooleanBufferBuilder,
48 null_adjustment: u64,
49}
50
51impl IndicesNormalizer {
52 fn new(num_rows: u64, null_adjustment: u64) -> Self {
53 let mut indices = Vec::with_capacity(num_rows as usize);
54 indices.push(0);
55 Self {
56 indices,
57 validity: BooleanBufferBuilder::new(num_rows as usize),
58 null_adjustment,
59 }
60 }
61
62 fn normalize(&self, val: u64) -> (bool, u64) {
63 if val >= self.null_adjustment {
64 (false, val - self.null_adjustment)
65 } else {
66 (true, val)
67 }
68 }
69
70 fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) {
71 let mut last = *self.indices.last().unwrap();
72 if is_start {
73 let (is_valid, val) = self.normalize(new_indices.value(0));
74 self.indices.push(val);
75 self.validity.append(is_valid);
76 last += val;
77 }
78 let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
79 for w in new_indices.values().windows(2) {
80 let (is_valid, val) = self.normalize(w[1]);
81 let next = val - prev + last;
82 self.indices.push(next);
83 self.validity.append(is_valid);
84 prev = val;
85 last = next;
86 }
87 }
88
89 fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
90 (self.indices, self.validity.finish())
91 }
92}
93
94#[derive(Debug)]
95pub struct BinaryPageScheduler {
96 indices_scheduler: Arc<dyn PageScheduler>,
97 bytes_scheduler: Arc<dyn PageScheduler>,
98 offsets_type: DataType,
99 null_adjustment: u64,
100}
101
102impl BinaryPageScheduler {
103 pub fn new(
104 indices_scheduler: Arc<dyn PageScheduler>,
105 bytes_scheduler: Arc<dyn PageScheduler>,
106 offsets_type: DataType,
107 null_adjustment: u64,
108 ) -> Self {
109 Self {
110 indices_scheduler,
111 bytes_scheduler,
112 offsets_type,
113 null_adjustment,
114 }
115 }
116
117 fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
118 let mut primitive_wrapper =
119 PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
120 let drained_task = primitive_wrapper.drain(num_rows)?;
121 let indices_decode_task = drained_task.task;
122 indices_decode_task.decode()
123 }
124}
125
126struct IndirectData {
127 decoded_indices: UInt64Array,
128 offsets_type: DataType,
129 validity: BooleanBuffer,
130 bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
131}
132
133impl PageScheduler for BinaryPageScheduler {
134 fn schedule_ranges(
135 &self,
136 ranges: &[std::ops::Range<u64>],
137 scheduler: &Arc<dyn EncodingsIo>,
138 top_level_row: u64,
139 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
140 let indices_ranges = ranges
145 .iter()
146 .map(|range| {
147 if range.start != 0 {
148 (range.start - 1)..range.end
149 } else {
150 0..range.end
151 }
152 })
153 .collect::<Vec<std::ops::Range<u64>>>();
154
155 let indices_page_decoder =
158 self.indices_scheduler
159 .schedule_ranges(&indices_ranges, scheduler, top_level_row);
160
161 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
162 let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
163
164 let ranges = ranges.to_vec();
165 let copy_scheduler = scheduler.clone();
166 let copy_bytes_scheduler = self.bytes_scheduler.clone();
167 let null_adjustment = self.null_adjustment;
168 let offsets_type = self.offsets_type.clone();
169
170 tokio::spawn(async move {
171 let indices_decoder = Arc::from(indices_page_decoder.await?);
183 let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
184 let decoded_indices = indices.as_primitive::<UInt64Type>();
185
186 let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
187 let mut bytes_ranges = Vec::new();
188 let mut curr_offset_index = 0;
189
190 for curr_row_range in ranges.iter() {
191 let row_start = curr_row_range.start;
192 let curr_range_len = (curr_row_range.end - row_start) as usize;
193
194 let curr_indices;
195
196 if row_start == 0 {
197 curr_indices = decoded_indices.slice(0, curr_range_len);
198 curr_offset_index = curr_range_len;
199 } else {
200 curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
201 curr_offset_index += curr_range_len + 1;
202 }
203
204 let first = if row_start == 0 {
205 0
206 } else {
207 indices_builder
208 .normalize(*curr_indices.values().first().unwrap())
209 .1
210 };
211 let last = indices_builder
212 .normalize(*curr_indices.values().last().unwrap())
213 .1;
214 if first != last {
215 bytes_ranges.push(first..last);
216 }
217
218 indices_builder.extend(&curr_indices, row_start == 0);
219 }
220
221 let (indices, validity) = indices_builder.into_parts();
222 let decoded_indices = UInt64Array::from(indices);
223
224 let bytes_decoder_fut =
228 copy_bytes_scheduler.schedule_ranges(&bytes_ranges, ©_scheduler, top_level_row);
229
230 Ok(IndirectData {
231 decoded_indices,
232 validity,
233 offsets_type,
234 bytes_decoder_fut,
235 })
236 })
237 .map(|join_handle| join_handle.unwrap())
239 .and_then(|indirect_data| {
240 async move {
241 let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
244 Ok(Box::new(BinaryPageDecoder {
245 decoded_indices: indirect_data.decoded_indices,
246 offsets_type: indirect_data.offsets_type,
247 validity: indirect_data.validity,
248 bytes_decoder,
249 }) as Box<dyn PrimitivePageDecoder>)
250 }
251 })
252 .boxed()
253 }
254}
255
256struct BinaryPageDecoder {
257 decoded_indices: UInt64Array,
258 offsets_type: DataType,
259 validity: BooleanBuffer,
260 bytes_decoder: Box<dyn PrimitivePageDecoder>,
261}
262
263impl PrimitivePageDecoder for BinaryPageDecoder {
264 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
277 let target_validity = self
279 .validity
280 .slice(rows_to_skip as usize, num_rows as usize);
281 let has_nulls = target_validity.count_set_bits() < target_validity.len();
282
283 let validity_buffer = if has_nulls {
284 let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
285 let mut validity_buffer = Vec::with_capacity(num_validity_bits);
286
287 if rows_to_skip == 0 {
288 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
289 } else {
290 let target_validity = BooleanBuffer::from_iter(target_validity.iter());
292 validity_buffer.extend_from_slice(target_validity.inner().as_slice());
293 }
294 Some(validity_buffer)
295 } else {
296 None
297 };
298
299 let bytes_per_offset = match self.offsets_type {
304 DataType::Int32 => 4,
305 DataType::Int64 => 8,
306 _ => panic!("Unsupported offsets type"),
307 };
308
309 let target_offsets = self
310 .decoded_indices
311 .slice(rows_to_skip as usize, (num_rows + 1) as usize);
312
313 let target_vec = target_offsets.values();
315 let start = target_vec[0];
316 let offsets_buffer =
317 match bytes_per_offset {
318 4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
319 .into_inner(),
320 8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
321 .into_inner(),
322 _ => panic!("Unsupported offsets type"),
323 };
324
325 let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
326 let num_bytes = self
327 .decoded_indices
328 .value((rows_to_skip + num_rows) as usize)
329 - bytes_to_skip;
330
331 let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
332 let bytes = bytes.as_fixed_width().unwrap();
333 debug_assert_eq!(bytes.bits_per_value, 8);
334
335 let string_data = DataBlock::VariableWidth(VariableWidthBlock {
336 bits_per_offset: bytes_per_offset * 8,
337 data: bytes.data,
338 num_values: num_rows,
339 offsets: LanceBuffer::from(offsets_buffer),
340 block_info: BlockInfo::new(),
341 });
342 if let Some(validity) = validity_buffer {
343 Ok(DataBlock::Nullable(NullableDataBlock {
344 data: Box::new(string_data),
345 nulls: LanceBuffer::from(validity),
346 block_info: BlockInfo::new(),
347 }))
348 } else {
349 Ok(string_data)
350 }
351 }
352}
353
354#[derive(Debug)]
355pub struct BinaryEncoder {
356 indices_encoder: Box<dyn ArrayEncoder>,
357 compression_config: Option<CompressionConfig>,
358 buffer_compressor: Option<Box<dyn BufferCompressor>>,
359}
360
361impl BinaryEncoder {
362 pub fn new(
363 indices_encoder: Box<dyn ArrayEncoder>,
364 compression_config: Option<CompressionConfig>,
365 ) -> Self {
366 let buffer_compressor = compression_config.map(GeneralBufferCompressor::get_compressor);
367 Self {
368 indices_encoder,
369 compression_config,
370 buffer_compressor,
371 }
372 }
373
374 fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
377 if matches!(data_type, DataType::Binary | DataType::Utf8) {
378 VariableWidthBlock {
379 bits_per_offset: 32,
380 data: LanceBuffer::empty(),
381 num_values,
382 offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
383 block_info: BlockInfo::new(),
384 }
385 } else {
386 VariableWidthBlock {
387 bits_per_offset: 64,
388 data: LanceBuffer::empty(),
389 num_values,
390 offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
391 block_info: BlockInfo::new(),
392 }
393 }
394 }
395}
396
397fn get_indices_from_string_arrays(
402 mut offsets: LanceBuffer,
403 bits_per_offset: u8,
404 nulls: Option<LanceBuffer>,
405 num_rows: usize,
406) -> (DataBlock, u64) {
407 let mut indices = Vec::with_capacity(num_rows);
408 let mut last_offset = 0_u64;
409 if bits_per_offset == 32 {
410 let offsets = offsets.borrow_to_typed_slice::<i32>();
411 indices.extend(offsets.as_ref().windows(2).map(|w| {
412 let strlen = (w[1] - w[0]) as u64;
413 last_offset += strlen;
414 last_offset
415 }));
416 } else if bits_per_offset == 64 {
417 let offsets = offsets.borrow_to_typed_slice::<i64>();
418 indices.extend(offsets.as_ref().windows(2).map(|w| {
419 let strlen = (w[1] - w[0]) as u64;
420 last_offset += strlen;
421 last_offset
422 }));
423 }
424
425 if indices.is_empty() {
426 return (
427 DataBlock::FixedWidth(FixedWidthDataBlock {
428 bits_per_value: 64,
429 data: LanceBuffer::empty(),
430 num_values: 0,
431 block_info: BlockInfo::new(),
432 }),
433 0,
434 );
435 }
436
437 let last_offset = *indices.last().expect("Indices array is empty");
438 assert!(
440 last_offset < u64::MAX / 2,
441 "Indices array with strings up to 2^63 is too large for this encoding"
442 );
443 let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
444
445 if let Some(nulls) = nulls {
446 let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
447 indices
448 .iter_mut()
449 .zip(nulls.iter())
450 .for_each(|(index, is_valid)| {
451 if !is_valid {
452 *index += null_adjustment;
453 }
454 });
455 }
456 let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
457 bits_per_value: 64,
458 data: LanceBuffer::reinterpret_vec(indices),
459 num_values: num_rows as u64,
460 block_info: BlockInfo::new(),
461 });
462 (indices, null_adjustment)
463}
464
465impl ArrayEncoder for BinaryEncoder {
466 fn encode(
467 &self,
468 data: DataBlock,
469 data_type: &DataType,
470 buffer_index: &mut u32,
471 ) -> Result<EncodedArray> {
472 let (mut data, nulls) = match data {
473 DataBlock::Nullable(nullable) => {
474 let data = nullable.data.as_variable_width().unwrap();
475 (data, Some(nullable.nulls))
476 }
477 DataBlock::VariableWidth(variable) => (variable, None),
478 DataBlock::AllNull(all_null) => {
479 let data = Self::all_null_variable_width(data_type, all_null.num_values);
480 let validity =
481 LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
482 (data, Some(validity))
483 }
484 _ => panic!("Expected variable width data block but got {}", data.name()),
485 };
486
487 let (indices, null_adjustment) = get_indices_from_string_arrays(
488 data.offsets,
489 data.bits_per_offset,
490 nulls,
491 data.num_values as usize,
492 );
493 let encoded_indices =
494 self.indices_encoder
495 .encode(indices, &DataType::UInt64, buffer_index)?;
496
497 let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
498
499 assert!(encoded_indices_data.bits_per_value <= 64);
500
501 if let Some(buffer_compressor) = &self.buffer_compressor {
502 let mut compressed_data = Vec::with_capacity(data.data.len());
503 buffer_compressor.compress(&data.data, &mut compressed_data)?;
504 data.data = LanceBuffer::Owned(compressed_data);
505 }
506
507 let data = DataBlock::VariableWidth(VariableWidthBlock {
508 bits_per_offset: encoded_indices_data.bits_per_value as u8,
509 offsets: encoded_indices_data.data,
510 data: data.data,
511 num_values: data.num_values,
512 block_info: BlockInfo::new(),
513 });
514
515 let bytes_buffer_index = *buffer_index;
516 *buffer_index += 1;
517
518 let bytes_encoding = ProtobufUtils::flat_encoding(
519 8,
520 bytes_buffer_index,
521 self.compression_config,
522 );
523
524 let encoding =
525 ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
526
527 Ok(EncodedArray { data, encoding })
528 }
529}
530
531#[derive(Debug, Default)]
532pub struct BinaryMiniBlockEncoder {}
533
534const AIM_MINICHUNK_SIZE: u32 = 4 * 1024;
535const BINARY_MINIBLOCK_CHUNK_ALIGNMENT: usize = 4;
536
537fn search_next_offset_idx(offsets: &[u32], last_offset_idx: usize) -> usize {
542 let mut num_values = 1;
543 let mut new_num_values = num_values * 2;
544 loop {
545 if last_offset_idx + new_num_values >= offsets.len() {
546 if (offsets[offsets.len() - 1] - offsets[last_offset_idx])
547 + (offsets.len() - last_offset_idx) as u32 * 4
548 <= AIM_MINICHUNK_SIZE
549 {
550 return offsets.len() - 1;
552 } else {
553 return last_offset_idx + num_values;
555 }
556 }
557 if ((offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx])
558 + ((new_num_values + 1) * 4) as u32)
559 <= AIM_MINICHUNK_SIZE
560 {
561 num_values = new_num_values;
562 new_num_values *= 2;
563 } else {
564 break;
565 }
566 }
567 last_offset_idx + num_values
568}
569
570impl BinaryMiniBlockEncoder {
571 fn chunk_data(
575 &self,
576 mut data: VariableWidthBlock,
577 ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
578 assert!(data.bits_per_offset == 32);
579
580 let offsets = data.offsets.borrow_to_typed_slice::<u32>();
581 let offsets = offsets.as_ref();
582
583 assert!(offsets.len() > 1);
584
585 #[derive(Debug)]
586 struct ChunkInfo {
587 chunk_start_offset_in_orig_idx: usize,
588 chunk_last_offset_in_orig_idx: usize,
589 bytes_start_offset: usize,
591 padded_chunk_size: usize,
596 }
597
598 let mut chunks_info = vec![];
599 let mut chunks = vec![];
600 let mut last_offset_in_orig_idx = 0;
601 const CHUNK_PAD_BUFFER: [u8; BINARY_MINIBLOCK_CHUNK_ALIGNMENT] =
602 [72; BINARY_MINIBLOCK_CHUNK_ALIGNMENT];
603 loop {
604 let this_last_offset_in_orig_idx =
605 search_next_offset_idx(offsets, last_offset_in_orig_idx);
606
607 if this_last_offset_in_orig_idx == offsets.len() - 1 {
609 let num_values_in_this_chunk =
610 this_last_offset_in_orig_idx - last_offset_in_orig_idx;
611
612 let this_chunk_size = (num_values_in_this_chunk + 1) * 4
613 + (offsets[offsets.len() - 1] - offsets[last_offset_in_orig_idx]) as usize;
614
615 let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
616
617 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
619 chunks_info.push(ChunkInfo {
620 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
621 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
622 bytes_start_offset: this_chunk_bytes_start_offset,
623 padded_chunk_size,
624 });
625 chunks.push(MiniBlockChunk {
626 log_num_values: 0,
627 num_bytes: padded_chunk_size as u16,
628 });
629 break;
630 } else {
631 let num_values_in_this_chunk =
633 this_last_offset_in_orig_idx - last_offset_in_orig_idx;
634
635 let this_chunk_size = (num_values_in_this_chunk + 1) * 4
636 + (offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx])
637 as usize;
638
639 let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
640
641 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
643
644 chunks_info.push(ChunkInfo {
645 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
646 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
647 bytes_start_offset: this_chunk_bytes_start_offset,
648 padded_chunk_size,
649 });
650
651 chunks.push(MiniBlockChunk {
652 log_num_values: num_values_in_this_chunk.trailing_zeros() as u8,
653 num_bytes: padded_chunk_size as u16,
654 });
655
656 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
657 }
658 }
659 let output_total_bytes = chunks_info
660 .iter()
661 .map(|chunk_info| chunk_info.padded_chunk_size)
662 .sum::<usize>();
663
664 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
665 for chunk in chunks_info {
666 let this_chunk_offsets = offsets
668 [chunk.chunk_start_offset_in_orig_idx..chunk.chunk_last_offset_in_orig_idx + 1]
669 .iter()
670 .map(|offset| {
671 offset - offsets[chunk.chunk_start_offset_in_orig_idx]
672 + chunk.bytes_start_offset as u32
673 })
674 .collect::<Vec<_>>();
675
676 output.extend_from_slice(cast_slice(&this_chunk_offsets));
677
678 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx];
679 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx];
680
681 output.extend_from_slice(&data.data[start_in_orig as usize..end_in_orig as usize]);
682
683 output.extend_from_slice(
685 &CHUNK_PAD_BUFFER[..pad_bytes::<BINARY_MINIBLOCK_CHUNK_ALIGNMENT>(output.len())],
686 );
687 }
688
689 (
690 MiniBlockCompressed {
691 data: LanceBuffer::reinterpret_vec(output),
692 chunks,
693 num_values: data.num_values,
694 },
695 ProtobufUtils::variable(32),
696 )
697 }
698}
699
700impl MiniBlockCompressor for BinaryMiniBlockEncoder {
701 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
702 match data {
703 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
704 _ => Err(Error::InvalidInput {
705 source: format!(
706 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
707 data.name()
708 )
709 .into(),
710 location: location!(),
711 }),
712 }
713 }
714}
715
716#[derive(Debug, Default)]
717pub struct BinaryMiniBlockDecompressor {}
718
719impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
720 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
724 assert!(data.len() >= 8);
725 let offsets: &[u32] = try_cast_slice(&data)
726 .expect("casting buffer failed during BinaryMiniBlock decompression");
727
728 let result_offsets = offsets[0..(num_values + 1) as usize]
729 .iter()
730 .map(|offset| offset - offsets[0])
731 .collect::<Vec<u32>>();
732
733 Ok(DataBlock::VariableWidth(VariableWidthBlock {
734 data: LanceBuffer::Owned(
735 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
736 ),
737 offsets: LanceBuffer::reinterpret_vec(result_offsets),
738 bits_per_offset: 32,
739 num_values,
740 block_info: BlockInfo::new(),
741 }))
742 }
743}
744
745#[derive(Debug, Default)]
747pub struct VariableEncoder {}
748
749impl BlockCompressor for VariableEncoder {
750 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
751 let num_values: u32 = data
752 .num_values()
753 .try_into()
754 .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
755
756 match data {
757 DataBlock::VariableWidth(mut variable_width_data) => {
758 if variable_width_data.bits_per_offset != 32 {
759 panic!("BinaryBlockEncoder only works with 32 bits per offset VariableWidth DataBlock.");
760 }
761 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
762 let offsets = offsets.as_ref();
763 let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
766
767 let output_total_bytes =
768 bytes_start_offset as usize + variable_width_data.data.len();
769 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
770
771 output.extend_from_slice(&(num_values).to_le_bytes());
773
774 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
776
777 output.extend_from_slice(cast_slice(offsets));
779
780 output.extend_from_slice(&variable_width_data.data);
782
783 Ok(LanceBuffer::Owned(output))
784 }
785 _ => {
786 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
787 }
788 }
789 }
790}
791
792impl PerValueCompressor for VariableEncoder {
793 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
794 let DataBlock::VariableWidth(variable) = data else {
795 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
796 };
797
798 let encoding = ProtobufUtils::variable(variable.bits_per_offset);
799 Ok((PerValueDataBlock::Variable(variable), encoding))
800 }
801}
802
803#[derive(Debug, Default)]
804pub struct VariableDecoder {}
805
806impl VariablePerValueDecompressor for VariableDecoder {
807 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
808 Ok(DataBlock::VariableWidth(data))
809 }
810}
811
812#[derive(Debug, Default)]
813pub struct BinaryBlockDecompressor {}
814
815impl BlockDecompressor for BinaryBlockDecompressor {
816 fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
817 let num_values = LittleEndian::read_u32(&data[..4]) as u64;
819
820 let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
822
823 let offsets = data.slice_with_length(8, bytes_start_offset as usize - 8);
825
826 let data = data.slice_with_length(
828 bytes_start_offset as usize,
829 data.len() - bytes_start_offset as usize,
830 );
831
832 Ok(DataBlock::VariableWidth(VariableWidthBlock {
833 data,
834 offsets,
835 bits_per_offset: 32,
836 num_values,
837 block_info: BlockInfo::new(),
838 }))
839 }
840}
841
842#[cfg(test)]
843pub mod tests {
844 use arrow_array::{
845 builder::{LargeStringBuilder, StringBuilder},
846 ArrayRef, StringArray,
847 };
848 use arrow_schema::{DataType, Field};
849
850 use lance_core::datatypes::{
851 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
852 STRUCTURAL_ENCODING_MINIBLOCK,
853 };
854 use rstest::rstest;
855 use std::{collections::HashMap, sync::Arc, vec};
856
857 use crate::{
858 buffer::LanceBuffer,
859 data::DataBlock,
860 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
861 version::LanceFileVersion,
862 };
863
864 use super::get_indices_from_string_arrays;
865 #[rstest]
866 #[test_log::test(tokio::test)]
867 async fn test_utf8_binary(
868 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
869 ) {
870 let field = Field::new("", DataType::Utf8, false);
871 check_round_trip_encoding_random(field, version).await;
872 }
873
874 #[test]
875 fn test_encode_indices_adjusts_nulls() {
876 let string_array = Arc::new(StringArray::from(vec![
878 None,
879 Some("foo"),
880 Some("foo"),
881 None,
882 None,
883 None,
884 ])) as ArrayRef;
885 let string_data = DataBlock::from(string_array).as_nullable().unwrap();
886 let nulls = string_data.nulls;
887 let string_data = string_data.data.as_variable_width().unwrap();
888
889 let (indices, null_adjustment) = get_indices_from_string_arrays(
890 string_data.offsets,
891 string_data.bits_per_offset,
892 Some(nulls),
893 string_data.num_values as usize,
894 );
895
896 let indices = indices.as_fixed_width().unwrap();
897 assert_eq!(indices.bits_per_value, 64);
898 assert_eq!(
899 indices.data,
900 LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
901 );
902 assert_eq!(null_adjustment, 7);
903 }
904
905 #[rstest]
906 #[test_log::test(tokio::test)]
907 async fn test_binary(
908 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
909 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
910 structural_encoding: &str,
911 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
912 ) {
913 use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
914
915 let mut field_metadata = HashMap::new();
916 field_metadata.insert(
917 STRUCTURAL_ENCODING_META_KEY.to_string(),
918 structural_encoding.into(),
919 );
920
921 let field = Field::new("", data_type, false).with_metadata(field_metadata);
922 check_round_trip_encoding_random(field, version).await;
923 }
924
925 #[rstest]
926 #[test_log::test(tokio::test)]
927 async fn test_binary_fsst(
928 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
929 structural_encoding: &str,
930 ) {
931 let mut field_metadata = HashMap::new();
932 field_metadata.insert(
933 STRUCTURAL_ENCODING_META_KEY.to_string(),
934 structural_encoding.into(),
935 );
936 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
937
938 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
939 check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
940 }
941
942 #[test_log::test(tokio::test)]
943 async fn test_large_binary() {
944 let field = Field::new("", DataType::LargeBinary, true);
945 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
946 }
947
948 #[test_log::test(tokio::test)]
949 async fn test_large_utf8() {
950 let field = Field::new("", DataType::LargeUtf8, true);
951 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
952 }
953
954 #[rstest]
955 #[test_log::test(tokio::test)]
956 async fn test_simple_binary(
957 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
958 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
959 structural_encoding: &str,
960 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
961 ) {
962 use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
963
964 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
965 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
966
967 let mut field_metadata = HashMap::new();
968 field_metadata.insert(
969 STRUCTURAL_ENCODING_META_KEY.to_string(),
970 structural_encoding.into(),
971 );
972
973 let test_cases = TestCases::default()
974 .with_range(0..2)
975 .with_range(0..3)
976 .with_range(1..3)
977 .with_indices(vec![0, 1, 3, 4])
978 .with_file_version(version);
979 check_round_trip_encoding_of_data(
980 vec![Arc::new(string_array)],
981 &test_cases,
982 field_metadata,
983 )
984 .await;
985 }
986
987 #[rstest]
988 #[test_log::test(tokio::test)]
989 async fn test_sliced_utf8(
990 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
991 ) {
992 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
993 let string_array = string_array.slice(1, 3);
994
995 let test_cases = TestCases::default()
996 .with_range(0..1)
997 .with_range(0..2)
998 .with_range(1..2)
999 .with_file_version(version);
1000 check_round_trip_encoding_of_data(
1001 vec![Arc::new(string_array)],
1002 &test_cases,
1003 HashMap::new(),
1004 )
1005 .await;
1006 }
1007
1008 #[test_log::test(tokio::test)]
1009 async fn test_bigger_than_max_page_size() {
1010 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
1012 let string_array = StringArray::from(vec![
1013 Some(big_string),
1014 Some("abc".to_string()),
1015 None,
1016 None,
1017 Some("xyz".to_string()),
1018 ]);
1019
1020 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
1022
1023 check_round_trip_encoding_of_data(
1024 vec![Arc::new(string_array)],
1025 &test_cases,
1026 HashMap::new(),
1027 )
1028 .await;
1029
1030 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
1034 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
1035
1036 check_round_trip_encoding_of_data(
1037 vec![Arc::new(string_array)],
1038 &TestCases::default(),
1039 HashMap::new(),
1040 )
1041 .await;
1042 }
1043
1044 #[rstest]
1045 #[test_log::test(tokio::test)]
1046 async fn test_empty_strings(
1047 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1048 ) {
1049 let values = [Some("abc"), Some(""), None];
1052 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
1054 let mut string_builder = StringBuilder::new();
1055 for idx in order {
1056 string_builder.append_option(values[idx]);
1057 }
1058 let string_array = Arc::new(string_builder.finish());
1059 let test_cases = TestCases::default()
1060 .with_indices(vec![1])
1061 .with_indices(vec![0])
1062 .with_indices(vec![2])
1063 .with_indices(vec![0, 1])
1064 .with_file_version(version);
1065 check_round_trip_encoding_of_data(
1066 vec![string_array.clone()],
1067 &test_cases,
1068 HashMap::new(),
1069 )
1070 .await;
1071 let test_cases = test_cases.with_batch_size(1);
1072 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
1073 .await;
1074 }
1075
1076 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
1081
1082 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
1083 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
1084 .await;
1085 let test_cases = test_cases.with_batch_size(1);
1086 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
1087 }
1088
1089 #[test_log::test(tokio::test)]
1090 #[ignore] async fn test_jumbo_string() {
1092 let mut string_builder = LargeStringBuilder::new();
1096 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
1098 for _ in 0..5000 {
1099 string_builder.append_option(Some(&giant_string));
1100 }
1101 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
1102 let arrs = vec![giant_array];
1103
1104 let test_cases = TestCases::default().without_validation();
1106 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
1107 }
1108
1109 #[test_log::test(tokio::test)]
1110 async fn test_binary_dictionary_encoding() {
1111 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1112 let strings = [
1113 "Hal Abelson",
1114 "Charles Babbage",
1115 "Vint Cerf",
1116 "Jim Gray",
1117 "Alonzo Church",
1118 "Edgar F. Codd",
1119 ];
1120 let repeated_strings: Vec<_> = strings
1121 .iter()
1122 .cycle()
1123 .take(strings.len() * 10000)
1124 .cloned()
1125 .collect();
1126 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
1127 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
1128 }
1129}