1use std::{collections::HashMap, env, sync::Arc};
4
5use arrow::array::AsArray;
6use arrow::datatypes::UInt64Type;
7use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
8use arrow_schema::DataType;
9use bytes::{Bytes, BytesMut};
10use futures::future::BoxFuture;
11use lance_core::datatypes::{
12 Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
13 COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
14};
15use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
16use lance_core::{Error, Result};
17use snafu::location;
18
19use crate::buffer::LanceBuffer;
20use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
21use crate::decoder::PageEncoding;
22use crate::encodings::logical::blob::BlobFieldEncoder;
23use crate::encodings::logical::list::ListStructuralEncoder;
24use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
25use crate::encodings::logical::r#struct::StructFieldEncoder;
26use crate::encodings::logical::r#struct::StructStructuralEncoder;
27use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
28use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
29use crate::encodings::physical::bitpack_fastlanes::{
30 compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
31};
32use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
33use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
34use crate::encodings::physical::fsst::{
35 FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
36};
37use crate::encodings::physical::packed_struct::PackedStructEncoder;
38use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
39use crate::format::ProtobufUtils;
40use crate::repdef::RepDefBuilder;
41use crate::statistics::{GetStat, Stat};
42use crate::version::LanceFileVersion;
43use crate::{
44 decoder::{ColumnInfo, PageInfo},
45 encodings::{
46 logical::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder},
47 physical::{
48 basic::BasicEncoder, binary::BinaryEncoder, dictionary::DictionaryEncoder,
49 fixed_size_binary::FixedSizeBinaryEncoder, fixed_size_list::FslEncoder,
50 value::ValueEncoder,
51 },
52 },
53 format::pb,
54};
55use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
56
57use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
58use std::collections::hash_map::RandomState;
59
60pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
62
63#[derive(Debug)]
70pub struct EncodedArray {
71 pub data: DataBlock,
73 pub encoding: pb::ArrayEncoding,
75}
76
77impl EncodedArray {
78 pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
79 Self { data, encoding }
80 }
81
82 pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
83 let buffers = self.data.into_buffers();
84 (buffers, self.encoding)
85 }
86}
87
88#[derive(Debug)]
94pub struct EncodedPage {
95 pub data: Vec<LanceBuffer>,
97 pub description: PageEncoding,
99 pub num_rows: u64,
101 pub row_number: u64,
108 pub column_idx: u32,
110}
111
112#[derive(Debug)]
113pub struct EncodedBufferMeta {
114 pub bits_per_value: u64,
115
116 pub bitpacking: Option<BitpackingBufferMeta>,
117
118 pub compression_scheme: Option<CompressionScheme>,
119}
120
121#[derive(Debug)]
122pub struct BitpackingBufferMeta {
123 pub bits_per_value: u64,
124
125 pub signed: bool,
126}
127
128pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
134 fn encode(
139 &self,
140 data: DataBlock,
141 data_type: &DataType,
142 buffer_index: &mut u32,
143 ) -> Result<EncodedArray>;
144}
145
146pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;
147pub const MAX_MINIBLOCK_VALUES: u64 = 4096;
148
149pub struct MiniBlockCompressed {
152 pub data: LanceBuffer,
154 pub chunks: Vec<MiniBlockChunk>,
156 pub num_values: u64,
158}
159
160#[derive(Debug)]
170pub struct MiniBlockChunk {
171 pub num_bytes: u16,
175 pub log_num_values: u8,
184}
185
186impl MiniBlockChunk {
187 pub fn num_values(&self, vals_in_prev_blocks: u64, total_num_values: u64) -> u64 {
194 if self.log_num_values == 0 {
195 total_num_values - vals_in_prev_blocks
196 } else {
197 1 << self.log_num_values
198 }
199 }
200}
201
202pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
207 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
214}
215
216#[derive(Debug)]
223pub enum PerValueDataBlock {
224 Fixed(FixedWidthDataBlock),
225 Variable(VariableWidthBlock),
226}
227
228impl PerValueDataBlock {
229 pub fn data_size(&self) -> u64 {
230 match self {
231 Self::Fixed(fixed) => fixed.data_size(),
232 Self::Variable(variable) => variable.data_size(),
233 }
234 }
235}
236
237pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
246 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
250}
251
252pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
263 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
268}
269
270pub fn values_column_encoding() -> pb::ColumnEncoding {
271 pb::ColumnEncoding {
272 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
273 }
274}
275
276pub struct EncodedColumn {
277 pub column_buffers: Vec<LanceBuffer>,
278 pub encoding: pb::ColumnEncoding,
279 pub final_pages: Vec<EncodedPage>,
280}
281
282impl Default for EncodedColumn {
283 fn default() -> Self {
284 Self {
285 column_buffers: Default::default(),
286 encoding: pb::ColumnEncoding {
287 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
288 },
289 final_pages: Default::default(),
290 }
291 }
292}
293
294pub struct OutOfLineBuffers {
308 position: u64,
309 buffer_alignment: u64,
310 buffers: Vec<LanceBuffer>,
311}
312
313impl OutOfLineBuffers {
314 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
315 Self {
316 position: base_position,
317 buffer_alignment,
318 buffers: Vec::new(),
319 }
320 }
321
322 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
323 let position = self.position;
324 self.position += buffer.len() as u64;
325 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
326 self.buffers.push(buffer);
327 position
328 }
329
330 pub fn take_buffers(self) -> Vec<LanceBuffer> {
331 self.buffers
332 }
333
334 pub fn reset_position(&mut self, position: u64) {
335 self.position = position;
336 }
337}
338
339pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
341
342pub trait FieldEncoder: Send {
353 fn maybe_encode(
373 &mut self,
374 array: ArrayRef,
375 external_buffers: &mut OutOfLineBuffers,
376 repdef: RepDefBuilder,
377 row_number: u64,
378 num_rows: u64,
379 ) -> Result<Vec<EncodeTask>>;
380 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
389 fn finish(
395 &mut self,
396 external_buffers: &mut OutOfLineBuffers,
397 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
398
399 fn num_columns(&self) -> u32;
401}
402
403pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
409 fn create_array_encoder(
410 &self,
411 arrays: &[ArrayRef],
412 field: &Field,
413 ) -> Result<Box<dyn ArrayEncoder>>;
414}
415
416pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
429 fn create_block_compressor(
431 &self,
432 field: &Field,
433 data: &DataBlock,
434 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
435
436 fn create_per_value(
438 &self,
439 field: &Field,
440 data: &DataBlock,
441 ) -> Result<Box<dyn PerValueCompressor>>;
442
443 fn create_miniblock_compressor(
445 &self,
446 field: &Field,
447 data: &DataBlock,
448 ) -> Result<Box<dyn MiniBlockCompressor>>;
449}
450
451#[derive(Debug, Default)]
454pub struct CoreArrayEncodingStrategy {
455 pub version: LanceFileVersion,
456}
457
458const BINARY_DATATYPES: [DataType; 4] = [
459 DataType::Binary,
460 DataType::LargeBinary,
461 DataType::Utf8,
462 DataType::LargeUtf8,
463];
464
465impl CoreArrayEncodingStrategy {
466 fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
467 version >= LanceFileVersion::V2_1
468 && matches!(data_type, DataType::Utf8 | DataType::Binary)
469 && data_size > 4 * 1024 * 1024
470 }
471
472 fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
473 let compression = field_meta.get(COMPRESSION_META_KEY)?;
474 let compression_scheme = compression.parse::<CompressionScheme>();
475 match compression_scheme {
476 Ok(compression_scheme) => Some(CompressionConfig::new(
477 compression_scheme,
478 field_meta
479 .get(COMPRESSION_LEVEL_META_KEY)
480 .and_then(|level| level.parse().ok()),
481 )),
482 Err(_) => None,
483 }
484 }
485
486 fn default_binary_encoder(
487 arrays: &[ArrayRef],
488 data_type: &DataType,
489 field_meta: Option<&HashMap<String, String>>,
490 data_size: u64,
491 version: LanceFileVersion,
492 ) -> Result<Box<dyn ArrayEncoder>> {
493 let bin_indices_encoder =
494 Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
495
496 if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
497 if compression.scheme == CompressionScheme::Fsst {
498 let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
500 Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
501 } else {
502 Ok(Box::new(BinaryEncoder::new(
504 bin_indices_encoder,
505 Some(compression),
506 )))
507 }
508 } else {
509 let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
511 if Self::can_use_fsst(data_type, data_size, version) {
512 Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
513 } else {
514 Ok(bin_encoder)
515 }
516 }
517 }
518
519 fn choose_array_encoder(
520 arrays: &[ArrayRef],
521 data_type: &DataType,
522 data_size: u64,
523 use_dict_encoding: bool,
524 version: LanceFileVersion,
525 field_meta: Option<&HashMap<String, String>>,
526 ) -> Result<Box<dyn ArrayEncoder>> {
527 match data_type {
528 DataType::FixedSizeList(inner, dimension) => {
529 Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
530 Self::choose_array_encoder(
531 arrays,
532 inner.data_type(),
533 data_size,
534 use_dict_encoding,
535 version,
536 None,
537 )?,
538 *dimension as u32,
539 )))))
540 }
541 DataType::Dictionary(key_type, value_type) => {
542 let key_encoder =
543 Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
544 let value_encoder = Self::choose_array_encoder(
545 arrays, value_type, data_size, false, version, None,
546 )?;
547
548 Ok(Box::new(AlreadyDictionaryEncoder::new(
549 key_encoder,
550 value_encoder,
551 )))
552 }
553 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
554 if use_dict_encoding {
555 let dict_indices_encoder = Self::choose_array_encoder(
556 &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
561 &DataType::UInt8,
562 data_size,
563 false,
564 version,
565 None,
566 )?;
567 let dict_items_encoder = Self::choose_array_encoder(
568 arrays,
569 &DataType::Utf8,
570 data_size,
571 false,
572 version,
573 None,
574 )?;
575
576 Ok(Box::new(DictionaryEncoder::new(
577 dict_indices_encoder,
578 dict_items_encoder,
579 )))
580 }
581 else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
584 if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
585 let bytes_encoder = Self::choose_array_encoder(
587 arrays,
588 &DataType::UInt8,
589 data_size,
590 false,
591 version,
592 None,
593 )?;
594
595 Ok(Box::new(BasicEncoder::new(Box::new(
596 FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
597 ))))
598 } else {
599 Self::default_binary_encoder(
600 arrays, data_type, field_meta, data_size, version,
601 )
602 }
603 } else {
604 Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
605 }
606 }
607 DataType::Struct(fields) => {
608 let num_fields = fields.len();
609 let mut inner_encoders = Vec::new();
610
611 for i in 0..num_fields {
612 let inner_datatype = fields[i].data_type();
613 let inner_encoder = Self::choose_array_encoder(
614 arrays,
615 inner_datatype,
616 data_size,
617 use_dict_encoding,
618 version,
619 None,
620 )?;
621 inner_encoders.push(inner_encoder);
622 }
623
624 Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
625 }
626 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
627 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
628 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
629 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
630 compressed_bit_width as usize,
631 data_type.clone(),
632 )))
633 } else {
634 Ok(Box::new(BasicEncoder::new(Box::new(
635 ValueEncoder::default(),
636 ))))
637 }
638 }
639
640 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
644 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
645 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
646 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
647 compressed_bit_width as usize,
648 data_type.clone(),
649 )))
650 } else {
651 Ok(Box::new(BasicEncoder::new(Box::new(
652 ValueEncoder::default(),
653 ))))
654 }
655 }
656 _ => Ok(Box::new(BasicEncoder::new(Box::new(
657 ValueEncoder::default(),
658 )))),
659 }
660 }
661}
662
663fn get_dict_encoding_threshold() -> u64 {
664 env::var("LANCE_DICT_ENCODING_THRESHOLD")
665 .ok()
666 .and_then(|val| val.parse().ok())
667 .unwrap_or(100)
668}
669
670fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
678 let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
679 if num_total_rows < threshold as usize {
680 return false;
681 }
682 const PRECISION: u8 = 12;
683
684 let mut hll: HyperLogLogPlus<String, RandomState> =
685 HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
686
687 for arr in arrays {
688 let string_array = arrow_array::cast::as_string_array(arr);
689 for value in string_array.iter().flatten() {
690 hll.insert(value);
691 let estimated_cardinality = hll.count() as u64;
692 if estimated_cardinality >= threshold {
693 return false;
694 }
695 }
696 }
697
698 true
699}
700
701fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
702 if version < LanceFileVersion::V2_1 || arrays.is_empty() {
703 return None;
704 }
705
706 if !arrays.iter().all(|arr| {
708 if let Some(arr) = arr.as_string_opt::<i32>() {
709 arr.iter().flatten().all(|s| !s.is_empty())
710 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
711 arr.iter().flatten().all(|s| !s.is_empty())
712 } else if let Some(arr) = arr.as_string_opt::<i64>() {
713 arr.iter().flatten().all(|s| !s.is_empty())
714 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
715 arr.iter().flatten().all(|s| !s.is_empty())
716 } else {
717 panic!("wrong dtype");
718 }
719 }) {
720 return None;
721 }
722
723 let lengths = arrays
724 .iter()
725 .flat_map(|arr| {
726 if let Some(arr) = arr.as_string_opt::<i32>() {
727 let offsets = arr.offsets().inner();
728 offsets
729 .windows(2)
730 .map(|w| (w[1] - w[0]) as u64)
731 .collect::<Vec<_>>()
732 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
733 let offsets = arr.offsets().inner();
734 offsets
735 .windows(2)
736 .map(|w| (w[1] - w[0]) as u64)
737 .collect::<Vec<_>>()
738 } else if let Some(arr) = arr.as_string_opt::<i64>() {
739 let offsets = arr.offsets().inner();
740 offsets
741 .windows(2)
742 .map(|w| (w[1] - w[0]) as u64)
743 .collect::<Vec<_>>()
744 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
745 let offsets = arr.offsets().inner();
746 offsets
747 .windows(2)
748 .map(|w| (w[1] - w[0]) as u64)
749 .collect::<Vec<_>>()
750 } else {
751 panic!("wrong dtype");
752 }
753 })
754 .collect::<Vec<_>>();
755
756 let first_non_zero = lengths.iter().position(|&x| x != 0);
758 if let Some(first_non_zero) = first_non_zero {
759 if !lengths
761 .iter()
762 .all(|&x| x == 0 || x == lengths[first_non_zero])
763 {
764 return None;
765 }
766
767 Some(lengths[first_non_zero])
769 } else {
770 None
771 }
772}
773
774impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
775 fn create_array_encoder(
776 &self,
777 arrays: &[ArrayRef],
778 field: &Field,
779 ) -> Result<Box<dyn ArrayEncoder>> {
780 let data_size = arrays
781 .iter()
782 .map(|arr| arr.get_buffer_memory_size() as u64)
783 .sum::<u64>();
784 let data_type = arrays[0].data_type();
785
786 let use_dict_encoding = data_type == &DataType::Utf8
787 && check_dict_encoding(arrays, get_dict_encoding_threshold());
788
789 Self::choose_array_encoder(
790 arrays,
791 data_type,
792 data_size,
793 use_dict_encoding,
794 self.version,
795 Some(&field.metadata),
796 )
797 }
798}
799
800impl CompressionStrategy for CoreArrayEncodingStrategy {
801 fn create_miniblock_compressor(
802 &self,
803 _field: &Field,
804 data: &DataBlock,
805 ) -> Result<Box<dyn MiniBlockCompressor>> {
806 match data {
807 DataBlock::FixedWidth(fixed_width_data) => {
808 let bit_widths = data.expect_stat(Stat::BitWidth);
809 let has_all_zeros = bit_widths
812 .as_primitive::<UInt64Type>()
813 .values()
814 .iter()
815 .any(|v| *v == 0);
816 if !has_all_zeros
817 && (fixed_width_data.bits_per_value == 8
818 || fixed_width_data.bits_per_value == 16
819 || fixed_width_data.bits_per_value == 32
820 || fixed_width_data.bits_per_value == 64)
821 {
822 Ok(Box::new(BitpackMiniBlockEncoder::default()))
823 } else {
824 Ok(Box::new(ValueEncoder::default()))
825 }
826 }
827 DataBlock::VariableWidth(variable_width_data) => {
828 if variable_width_data.bits_per_offset == 32 {
829 let data_size =
830 variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
831 let max_len =
832 variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
833
834 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
835 && data_size >= FSST_LEAST_INPUT_SIZE as u64
836 {
837 Ok(Box::new(FsstMiniBlockEncoder::default()))
838 } else {
839 Ok(Box::new(BinaryMiniBlockEncoder::default()))
840 }
841 } else {
842 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
843 }
844 }
845 DataBlock::Struct(struct_data_block) => {
846 if struct_data_block
849 .children
850 .iter()
851 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
852 {
853 panic!("packed struct encoding currently only supports fixed-width fields.")
854 }
855 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
856 }
857 DataBlock::FixedSizeList(_) => {
858 if data.is_variable() {
864 todo!("Implement MiniBlockCompression for variable width FSL")
865 } else {
866 Ok(Box::new(ValueEncoder::default()))
867 }
868 }
869 _ => Err(Error::NotSupported {
870 source: format!(
871 "Mini-block compression not yet supported for block type {}",
872 data.name()
873 )
874 .into(),
875 location: location!(),
876 }),
877 }
878 }
879
880 fn create_per_value(
881 &self,
882 _field: &Field,
883 data: &DataBlock,
884 ) -> Result<Box<dyn PerValueCompressor>> {
885 match data {
886 DataBlock::FixedWidth(_) => {
887 let encoder = Box::new(ValueEncoder::default());
888 Ok(encoder)
889 }
890 DataBlock::VariableWidth(variable_width) => {
891 if variable_width.bits_per_offset == 32 {
892 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
893 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
894
895 let variable_compression = Box::new(VariableEncoder::default());
896
897 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
898 && data_size >= FSST_LEAST_INPUT_SIZE as u64
899 {
900 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
901 } else {
902 Ok(variable_compression)
903 }
904 } else {
905 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
906 }
907 }
908 _ => unreachable!(),
909 }
910 }
911
912 fn create_block_compressor(
913 &self,
914 _field: &Field,
915 data: &DataBlock,
916 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
917 match data {
918 DataBlock::FixedWidth(fixed_width) => {
921 let encoder = Box::new(ValueEncoder::default());
922 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
923 Ok((encoder, encoding))
924 }
925 DataBlock::VariableWidth(variable_width) => {
926 let encoder = Box::new(VariableEncoder::default());
927 let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
928 Ok((encoder, encoding))
929 }
930 _ => unreachable!(),
931 }
932 }
933}
934#[derive(Debug, Default)]
937pub struct ColumnIndexSequence {
938 current_index: u32,
939 mapping: Vec<(u32, u32)>,
940}
941
942impl ColumnIndexSequence {
943 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
944 let idx = self.current_index;
945 self.current_index += 1;
946 self.mapping.push((field_id, idx));
947 idx
948 }
949
950 pub fn skip(&mut self) {
951 self.current_index += 1;
952 }
953}
954
955pub struct EncodingOptions {
957 pub cache_bytes_per_column: u64,
961 pub max_page_bytes: u64,
964 pub keep_original_array: bool,
969 pub buffer_alignment: u64,
974}
975
976impl Default for EncodingOptions {
977 fn default() -> Self {
978 Self {
979 cache_bytes_per_column: 8 * 1024 * 1024,
980 max_page_bytes: 32 * 1024 * 1024,
981 keep_original_array: true,
982 buffer_alignment: 64,
983 }
984 }
985}
986
987pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
993 fn create_field_encoder(
1005 &self,
1006 encoding_strategy_root: &dyn FieldEncodingStrategy,
1007 field: &Field,
1008 column_index: &mut ColumnIndexSequence,
1009 options: &EncodingOptions,
1010 ) -> Result<Box<dyn FieldEncoder>>;
1011}
1012
1013pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
1014 match version.resolve() {
1015 LanceFileVersion::Legacy => panic!(),
1016 LanceFileVersion::V2_0 => Box::new(CoreFieldEncodingStrategy::default()),
1017 _ => Box::new(StructuralEncodingStrategy::default()),
1018 }
1019}
1020
1021#[derive(Debug)]
1024pub struct CoreFieldEncodingStrategy {
1025 pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
1026 pub version: LanceFileVersion,
1027}
1028
1029#[allow(clippy::derivable_impls)]
1032impl Default for CoreFieldEncodingStrategy {
1033 fn default() -> Self {
1034 Self {
1035 array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1036 version: LanceFileVersion::default(),
1037 }
1038 }
1039}
1040
1041impl CoreFieldEncodingStrategy {
1042 fn is_primitive_type(data_type: &DataType) -> bool {
1043 matches!(
1044 data_type,
1045 DataType::Boolean
1046 | DataType::Date32
1047 | DataType::Date64
1048 | DataType::Decimal128(_, _)
1049 | DataType::Decimal256(_, _)
1050 | DataType::Duration(_)
1051 | DataType::Float16
1052 | DataType::Float32
1053 | DataType::Float64
1054 | DataType::Int16
1055 | DataType::Int32
1056 | DataType::Int64
1057 | DataType::Int8
1058 | DataType::Interval(_)
1059 | DataType::Null
1060 | DataType::Time32(_)
1061 | DataType::Time64(_)
1062 | DataType::Timestamp(_, _)
1063 | DataType::UInt16
1064 | DataType::UInt32
1065 | DataType::UInt64
1066 | DataType::UInt8
1067 | DataType::FixedSizeBinary(_)
1068 | DataType::FixedSizeList(_, _)
1069 | DataType::Binary
1070 | DataType::LargeBinary
1071 | DataType::Utf8
1072 | DataType::LargeUtf8,
1073 )
1074 }
1075}
1076
1077impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
1078 fn create_field_encoder(
1079 &self,
1080 encoding_strategy_root: &dyn FieldEncodingStrategy,
1081 field: &Field,
1082 column_index: &mut ColumnIndexSequence,
1083 options: &EncodingOptions,
1084 ) -> Result<Box<dyn FieldEncoder>> {
1085 let data_type = field.data_type();
1086 if Self::is_primitive_type(&data_type) {
1087 let column_index = column_index.next_column_index(field.id as u32);
1088 if field.metadata.contains_key(BLOB_META_KEY) {
1089 let mut packed_meta = HashMap::new();
1090 packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
1091 let desc_field =
1092 Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
1093 let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
1094 options,
1095 self.array_encoding_strategy.clone(),
1096 column_index,
1097 desc_field,
1098 )?);
1099 Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
1100 } else {
1101 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1102 options,
1103 self.array_encoding_strategy.clone(),
1104 column_index,
1105 field.clone(),
1106 )?))
1107 }
1108 } else {
1109 match data_type {
1110 DataType::List(_child) | DataType::LargeList(_child) => {
1111 let list_idx = column_index.next_column_index(field.id as u32);
1112 let inner_encoding = encoding_strategy_root.create_field_encoder(
1113 encoding_strategy_root,
1114 &field.children[0],
1115 column_index,
1116 options,
1117 )?;
1118 let offsets_encoder =
1119 Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
1120 Ok(Box::new(ListFieldEncoder::new(
1121 inner_encoding,
1122 offsets_encoder,
1123 options.cache_bytes_per_column,
1124 options.keep_original_array,
1125 list_idx,
1126 )))
1127 }
1128 DataType::Struct(_) => {
1129 let field_metadata = &field.metadata;
1130 if field_metadata
1131 .get(PACKED_STRUCT_LEGACY_META_KEY)
1132 .map(|v| v == "true")
1133 .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
1134 {
1135 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1136 options,
1137 self.array_encoding_strategy.clone(),
1138 column_index.next_column_index(field.id as u32),
1139 field.clone(),
1140 )?))
1141 } else {
1142 let header_idx = column_index.next_column_index(field.id as u32);
1143 let children_encoders = field
1144 .children
1145 .iter()
1146 .map(|field| {
1147 self.create_field_encoder(
1148 encoding_strategy_root,
1149 field,
1150 column_index,
1151 options,
1152 )
1153 })
1154 .collect::<Result<Vec<_>>>()?;
1155 Ok(Box::new(StructFieldEncoder::new(
1156 children_encoders,
1157 header_idx,
1158 )))
1159 }
1160 }
1161 DataType::Dictionary(_, value_type) => {
1162 if Self::is_primitive_type(&value_type) {
1164 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1165 options,
1166 self.array_encoding_strategy.clone(),
1167 column_index.next_column_index(field.id as u32),
1168 field.clone(),
1169 )?))
1170 } else {
1171 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1177 }
1178 }
1179 _ => todo!("Implement encoding for field {}", field),
1180 }
1181 }
1182 }
1183}
1184
1185#[derive(Debug)]
1187pub struct StructuralEncodingStrategy {
1188 pub compression_strategy: Arc<dyn CompressionStrategy>,
1189 pub version: LanceFileVersion,
1190}
1191
1192#[allow(clippy::derivable_impls)]
1195impl Default for StructuralEncodingStrategy {
1196 fn default() -> Self {
1197 Self {
1198 compression_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1199 version: LanceFileVersion::default(),
1200 }
1201 }
1202}
1203
1204impl StructuralEncodingStrategy {
1205 fn is_primitive_type(data_type: &DataType) -> bool {
1206 matches!(
1207 data_type,
1208 DataType::Boolean
1209 | DataType::Date32
1210 | DataType::Date64
1211 | DataType::Decimal128(_, _)
1212 | DataType::Decimal256(_, _)
1213 | DataType::Duration(_)
1214 | DataType::Float16
1215 | DataType::Float32
1216 | DataType::Float64
1217 | DataType::Int16
1218 | DataType::Int32
1219 | DataType::Int64
1220 | DataType::Int8
1221 | DataType::Interval(_)
1222 | DataType::Null
1223 | DataType::Time32(_)
1224 | DataType::Time64(_)
1225 | DataType::Timestamp(_, _)
1226 | DataType::UInt16
1227 | DataType::UInt32
1228 | DataType::UInt64
1229 | DataType::UInt8
1230 | DataType::FixedSizeBinary(_)
1231 | DataType::FixedSizeList(_, _)
1232 | DataType::Binary
1233 | DataType::LargeBinary
1234 | DataType::Utf8
1235 | DataType::LargeUtf8,
1236 )
1237 }
1238
1239 fn do_create_field_encoder(
1240 &self,
1241 _encoding_strategy_root: &dyn FieldEncodingStrategy,
1242 field: &Field,
1243 column_index: &mut ColumnIndexSequence,
1244 options: &EncodingOptions,
1245 root_field_metadata: &HashMap<String, String>,
1246 ) -> Result<Box<dyn FieldEncoder>> {
1247 let data_type = field.data_type();
1248 if Self::is_primitive_type(&data_type) {
1249 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1250 options,
1251 self.compression_strategy.clone(),
1252 column_index.next_column_index(field.id as u32),
1253 field.clone(),
1254 Arc::new(root_field_metadata.clone()),
1255 )?))
1256 } else {
1257 match data_type {
1258 DataType::List(_) | DataType::LargeList(_) => {
1259 let child = field.children.first().expect("List should have a child");
1260 let child_encoder = self.do_create_field_encoder(
1261 _encoding_strategy_root,
1262 child,
1263 column_index,
1264 options,
1265 root_field_metadata,
1266 )?;
1267 Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
1268 }
1269 DataType::Struct(_) => {
1270 if field.is_packed_struct() {
1271 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1272 options,
1273 self.compression_strategy.clone(),
1274 column_index.next_column_index(field.id as u32),
1275 field.clone(),
1276 Arc::new(root_field_metadata.clone()),
1277 )?))
1278 } else {
1279 let children_encoders = field
1280 .children
1281 .iter()
1282 .map(|field| {
1283 self.do_create_field_encoder(
1284 _encoding_strategy_root,
1285 field,
1286 column_index,
1287 options,
1288 root_field_metadata,
1289 )
1290 })
1291 .collect::<Result<Vec<_>>>()?;
1292 Ok(Box::new(StructStructuralEncoder::new(children_encoders)))
1293 }
1294 }
1295 DataType::Dictionary(_, value_type) => {
1296 if Self::is_primitive_type(&value_type) {
1298 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1299 options,
1300 self.compression_strategy.clone(),
1301 column_index.next_column_index(field.id as u32),
1302 field.clone(),
1303 Arc::new(root_field_metadata.clone()),
1304 )?))
1305 } else {
1306 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1312 }
1313 }
1314 _ => todo!("Implement encoding for field {}", field),
1315 }
1316 }
1317 }
1318}
1319
1320impl FieldEncodingStrategy for StructuralEncodingStrategy {
1321 fn create_field_encoder(
1322 &self,
1323 encoding_strategy_root: &dyn FieldEncodingStrategy,
1324 field: &Field,
1325 column_index: &mut ColumnIndexSequence,
1326 options: &EncodingOptions,
1327 ) -> Result<Box<dyn FieldEncoder>> {
1328 self.do_create_field_encoder(
1329 encoding_strategy_root,
1330 field,
1331 column_index,
1332 options,
1333 &field.metadata,
1334 )
1335 }
1336}
1337
1338pub struct BatchEncoder {
1341 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
1342 pub field_id_to_column_index: Vec<(u32, u32)>,
1343}
1344
1345impl BatchEncoder {
1346 pub fn try_new(
1347 schema: &Schema,
1348 strategy: &dyn FieldEncodingStrategy,
1349 options: &EncodingOptions,
1350 ) -> Result<Self> {
1351 let mut col_idx = 0;
1352 let mut col_idx_sequence = ColumnIndexSequence::default();
1353 let field_encoders = schema
1354 .fields
1355 .iter()
1356 .map(|field| {
1357 let encoder = strategy.create_field_encoder(
1358 strategy,
1359 field,
1360 &mut col_idx_sequence,
1361 options,
1362 )?;
1363 col_idx += encoder.as_ref().num_columns();
1364 Ok(encoder)
1365 })
1366 .collect::<Result<Vec<_>>>()?;
1367 Ok(Self {
1368 field_encoders,
1369 field_id_to_column_index: col_idx_sequence.mapping,
1370 })
1371 }
1372
1373 pub fn num_columns(&self) -> u32 {
1374 self.field_encoders
1375 .iter()
1376 .map(|field_encoder| field_encoder.num_columns())
1377 .sum::<u32>()
1378 }
1379}
1380
1381#[derive(Debug)]
1385pub struct EncodedBatch {
1386 pub data: Bytes,
1387 pub page_table: Vec<Arc<ColumnInfo>>,
1388 pub schema: Arc<Schema>,
1389 pub top_level_columns: Vec<u32>,
1390 pub num_rows: u64,
1391}
1392
1393fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
1394 let buffers = page.data;
1395 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
1396 for buffer in buffers {
1397 let buffer_offset = data_buffer.len() as u64;
1398 data_buffer.extend_from_slice(&buffer);
1399 let size = data_buffer.len() as u64 - buffer_offset;
1400 buffer_offsets_and_sizes.push((buffer_offset, size));
1401 }
1402
1403 PageInfo {
1404 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
1405 encoding: page.description,
1406 num_rows: page.num_rows,
1407 priority: page.row_number,
1408 }
1409}
1410
1411pub async fn encode_batch(
1416 batch: &RecordBatch,
1417 schema: Arc<Schema>,
1418 encoding_strategy: &dyn FieldEncodingStrategy,
1419 options: &EncodingOptions,
1420) -> Result<EncodedBatch> {
1421 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
1422 {
1423 return Err(Error::InvalidInput {
1424 source: format!(
1425 "buffer_alignment must be a power of two and at least {}",
1426 MIN_PAGE_BUFFER_ALIGNMENT
1427 )
1428 .into(),
1429 location: location!(),
1430 });
1431 }
1432
1433 let mut data_buffer = BytesMut::new();
1434 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
1435 let options = EncodingOptions {
1436 keep_original_array: true,
1437 ..*options
1438 };
1439 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
1440 let mut page_table = Vec::new();
1441 let mut col_idx_offset = 0;
1442 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
1443 let mut external_buffers =
1444 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1445 let repdef = RepDefBuilder::default();
1446 let encoder = encoder.as_mut();
1447 let num_rows = arr.len() as u64;
1448 let mut tasks =
1449 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
1450 tasks.extend(encoder.flush(&mut external_buffers)?);
1451 for buffer in external_buffers.take_buffers() {
1452 data_buffer.extend_from_slice(&buffer);
1453 }
1454 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
1455 for task in tasks {
1456 let encoded_page = task.await?;
1457 pages
1459 .entry(encoded_page.column_idx)
1460 .or_default()
1461 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
1462 }
1463 let mut external_buffers =
1464 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1465 let encoded_columns = encoder.finish(&mut external_buffers).await?;
1466 for buffer in external_buffers.take_buffers() {
1467 data_buffer.extend_from_slice(&buffer);
1468 }
1469 let num_columns = encoded_columns.len();
1470 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
1471 let col_idx = col_idx + col_idx_offset;
1472 let mut col_buffer_offsets_and_sizes = Vec::new();
1473 for buffer in encoded_column.column_buffers {
1474 let buffer_offset = data_buffer.len() as u64;
1475 data_buffer.extend_from_slice(&buffer);
1476 let size = data_buffer.len() as u64 - buffer_offset;
1477 col_buffer_offsets_and_sizes.push((buffer_offset, size));
1478 }
1479 for page in encoded_column.final_pages {
1480 pages
1481 .entry(page.column_idx)
1482 .or_default()
1483 .push(write_page_to_data_buffer(page, &mut data_buffer));
1484 }
1485 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
1486 page_table.push(Arc::new(ColumnInfo {
1487 index: col_idx as u32,
1488 buffer_offsets_and_sizes: Arc::from(
1489 col_buffer_offsets_and_sizes.into_boxed_slice(),
1490 ),
1491 page_infos: Arc::from(col_pages.into_boxed_slice()),
1492 encoding: encoded_column.encoding,
1493 }))
1494 }
1495 col_idx_offset += num_columns;
1496 }
1497 let top_level_columns = batch_encoder
1498 .field_id_to_column_index
1499 .iter()
1500 .map(|(_, idx)| *idx)
1501 .collect();
1502 Ok(EncodedBatch {
1503 data: data_buffer.freeze(),
1504 top_level_columns,
1505 page_table,
1506 schema,
1507 num_rows: batch.num_rows() as u64,
1508 })
1509}
1510
1511#[cfg(test)]
1512pub mod tests {
1513 use crate::version::LanceFileVersion;
1514 use arrow_array::{ArrayRef, StringArray};
1515 use arrow_schema::Field;
1516 use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
1517 use std::collections::HashMap;
1518 use std::sync::Arc;
1519
1520 use super::check_fixed_size_encoding;
1521 use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};
1522
1523 fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
1524 let arr = StringArray::from(arr);
1525 let arr = Arc::new(arr) as ArrayRef;
1526 check_dict_encoding(&[arr], threshold)
1527 }
1528
1529 #[test]
1530 fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
1531 assert!(is_dict_encoding_applicable(
1532 vec![Some("a"), Some("b"), Some("a"), Some("b")],
1533 3,
1534 ));
1535 }
1536
1537 #[test]
1538 fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
1539 assert!(!is_dict_encoding_applicable(
1540 vec![Some("a"), Some("b"), Some("c"), Some("d")],
1541 3,
1542 ));
1543 }
1544
1545 #[test]
1546 fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
1547 assert!(!is_dict_encoding_applicable(
1548 vec![Some("a"), Some("b"), Some("c"), Some("a")],
1549 3,
1550 ));
1551 }
1552
1553 #[test]
1554 fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
1555 assert!(!is_dict_encoding_applicable(vec![], 3));
1556 }
1557
1558 #[test]
1559 fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
1560 assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
1561 }
1562
1563 fn is_fixed_size_encoding_applicable(
1564 arrays: Vec<Vec<Option<&str>>>,
1565 version: LanceFileVersion,
1566 ) -> bool {
1567 let mut final_arrays = Vec::new();
1568 for arr in arrays {
1569 let arr = StringArray::from(arr);
1570 let arr = Arc::new(arr) as ArrayRef;
1571 final_arrays.push(arr);
1572 }
1573
1574 check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
1575 }
1576
1577 #[test]
1578 fn test_fixed_size_binary_encoding_applicable() {
1579 assert!(!is_fixed_size_encoding_applicable(
1580 vec![vec![]],
1581 LanceFileVersion::V2_1
1582 ));
1583
1584 assert!(is_fixed_size_encoding_applicable(
1585 vec![vec![Some("a"), Some("b")]],
1586 LanceFileVersion::V2_1
1587 ));
1588
1589 assert!(!is_fixed_size_encoding_applicable(
1590 vec![vec![Some("abc"), Some("de")]],
1591 LanceFileVersion::V2_1
1592 ));
1593
1594 assert!(is_fixed_size_encoding_applicable(
1595 vec![vec![Some("pqr"), None]],
1596 LanceFileVersion::V2_1
1597 ));
1598
1599 assert!(!is_fixed_size_encoding_applicable(
1600 vec![vec![Some("pqr"), Some("")]],
1601 LanceFileVersion::V2_1
1602 ));
1603
1604 assert!(!is_fixed_size_encoding_applicable(
1605 vec![vec![Some(""), Some("")]],
1606 LanceFileVersion::V2_1
1607 ));
1608 }
1609
1610 #[test]
1611 fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
1612 assert!(is_fixed_size_encoding_applicable(
1613 vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
1614 LanceFileVersion::V2_1
1615 ));
1616
1617 assert!(!is_fixed_size_encoding_applicable(
1618 vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
1619 LanceFileVersion::V2_1
1620 ));
1621
1622 assert!(!is_fixed_size_encoding_applicable(
1623 vec![vec![Some("ab"), None], vec![None, Some("d")]],
1624 LanceFileVersion::V2_1
1625 ));
1626
1627 assert!(is_fixed_size_encoding_applicable(
1628 vec![vec![Some("a"), None], vec![None, Some("d")]],
1629 LanceFileVersion::V2_1
1630 ));
1631
1632 assert!(!is_fixed_size_encoding_applicable(
1633 vec![vec![Some(""), None], vec![None, Some("")]],
1634 LanceFileVersion::V2_1
1635 ));
1636
1637 assert!(!is_fixed_size_encoding_applicable(
1638 vec![vec![None, None], vec![None, None]],
1639 LanceFileVersion::V2_1
1640 ));
1641 }
1642
1643 fn verify_array_encoder(
1644 array: ArrayRef,
1645 field_meta: Option<HashMap<String, String>>,
1646 version: LanceFileVersion,
1647 expected_encoder: &str,
1648 ) {
1649 let encoding_strategy = CoreArrayEncodingStrategy { version };
1650 let mut field = Field::new("test_field", array.data_type().clone(), true);
1651 if let Some(field_meta) = field_meta {
1652 field.set_metadata(field_meta);
1653 }
1654 let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
1655 let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
1656 assert!(encoder_result.is_ok());
1657 let encoder = encoder_result.unwrap();
1658 assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
1659 }
1660
1661 #[test]
1662 fn test_choose_encoder_for_zstd_compressed_string_field() {
1663 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1664 Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
1665 LanceFileVersion::V2_1,
1666 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
1667 }
1668
1669 #[test]
1670 fn test_choose_encoder_for_zstd_compression_level() {
1671 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1672 Some(HashMap::from([
1673 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
1674 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
1675 ])),
1676 LanceFileVersion::V2_1,
1677 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
1678 }
1679}