lance_encoding/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::{collections::HashMap, env, sync::Arc};
4
5use arrow::array::AsArray;
6use arrow::datatypes::UInt64Type;
7use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
8use arrow_schema::DataType;
9use bytes::{Bytes, BytesMut};
10use futures::future::BoxFuture;
11use lance_core::datatypes::{
12    Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
13    COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
14};
15use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
16use lance_core::{Error, Result};
17use snafu::location;
18
19use crate::buffer::LanceBuffer;
20use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
21use crate::decoder::PageEncoding;
22use crate::encodings::logical::blob::BlobFieldEncoder;
23use crate::encodings::logical::list::ListStructuralEncoder;
24use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
25use crate::encodings::logical::r#struct::StructFieldEncoder;
26use crate::encodings::logical::r#struct::StructStructuralEncoder;
27use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
28use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
29use crate::encodings::physical::bitpack_fastlanes::{
30    compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
31};
32use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
33use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
34use crate::encodings::physical::fsst::{
35    FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
36};
37use crate::encodings::physical::packed_struct::PackedStructEncoder;
38use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
39use crate::format::ProtobufUtils;
40use crate::repdef::RepDefBuilder;
41use crate::statistics::{GetStat, Stat};
42use crate::version::LanceFileVersion;
43use crate::{
44    decoder::{ColumnInfo, PageInfo},
45    encodings::{
46        logical::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder},
47        physical::{
48            basic::BasicEncoder, binary::BinaryEncoder, dictionary::DictionaryEncoder,
49            fixed_size_binary::FixedSizeBinaryEncoder, fixed_size_list::FslEncoder,
50            value::ValueEncoder,
51        },
52    },
53    format::pb,
54};
55use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
56
57use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
58use std::collections::hash_map::RandomState;
59
60/// The minimum alignment for a page buffer.  Writers must respect this.
61pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
62
63/// An encoded array
64///
65/// Maps to a single Arrow array
66///
67/// This contains the encoded data as well as a description of the encoding that was applied which
68/// can be used to decode the data later.
69#[derive(Debug)]
70pub struct EncodedArray {
71    /// The encoded buffers
72    pub data: DataBlock,
73    /// A description of the encoding used to encode the array
74    pub encoding: pb::ArrayEncoding,
75}
76
77impl EncodedArray {
78    pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
79        Self { data, encoding }
80    }
81
82    pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
83        let buffers = self.data.into_buffers();
84        (buffers, self.encoding)
85    }
86}
87
88/// An encoded page of data
89///
90/// Maps to a top-level array
91///
92/// For example, FixedSizeList<Int32> will have two EncodedArray instances and one EncodedPage
93#[derive(Debug)]
94pub struct EncodedPage {
95    // The encoded page buffers
96    pub data: Vec<LanceBuffer>,
97    // A description of the encoding used to encode the page
98    pub description: PageEncoding,
99    /// The number of rows in the encoded page
100    pub num_rows: u64,
101    /// The top-level row number of the first row in the page
102    ///
103    /// Generally the number of "top-level" rows and the number of rows are the same.  However,
104    /// when there is repetition (list/fixed-size-list) there will be more or less items than rows.
105    ///
106    /// A top-level row can never be split across a page boundary.
107    pub row_number: u64,
108    /// The index of the column
109    pub column_idx: u32,
110}
111
112#[derive(Debug)]
113pub struct EncodedBufferMeta {
114    pub bits_per_value: u64,
115
116    pub bitpacking: Option<BitpackingBufferMeta>,
117
118    pub compression_scheme: Option<CompressionScheme>,
119}
120
121#[derive(Debug)]
122pub struct BitpackingBufferMeta {
123    pub bits_per_value: u64,
124
125    pub signed: bool,
126}
127
128/// Encodes data from one format to another (hopefully more compact or useful) format
129///
130/// The array encoder must be Send + Sync.  Encoding is always done on its own
131/// thread task in the background and there could potentially be multiple encode
132/// tasks running for a column at once.
133pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
134    /// Encode data
135    ///
136    /// The result should contain a description of the encoding that was chosen.
137    /// This can be used to decode the data later.
138    fn encode(
139        &self,
140        data: DataBlock,
141        data_type: &DataType,
142        buffer_index: &mut u32,
143    ) -> Result<EncodedArray>;
144}
145
146pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;
147pub const MAX_MINIBLOCK_VALUES: u64 = 4096;
148
149/// Page data that has been compressed into a series of chunks put into
150/// a single buffer.
151pub struct MiniBlockCompressed {
152    /// The buffer of compressed data
153    pub data: LanceBuffer,
154    /// Describes the size of each chunk
155    pub chunks: Vec<MiniBlockChunk>,
156    /// The number of values in the entire page
157    pub num_values: u64,
158}
159
160/// Describes the size of a mini-block chunk of data
161///
162/// Mini-block chunks are designed to be small (just a few disk sectors)
163/// and contain a power-of-two number of values (except for the last chunk)
164///
165/// To enforce this we limit a chunk to 4Ki values and slightly less than
166/// 8KiB of compressed data.  This means that even in the extreme case
167/// where we have 4 bytes of rep/def then we will have at most 24KiB of
168/// data (values, repetition, and definition) per mini-block.
169#[derive(Debug)]
170pub struct MiniBlockChunk {
171    // The number of bytes that make up the chunk
172    //
173    // This value must be less than or equal to 8Ki - 6 (8188)
174    pub num_bytes: u16,
175    // The log (base 2) of the number of values in the chunk.  If this is the final chunk
176    // then this should be 0 (the number of values will be calculated by subtracting the
177    // size of all other chunks from the total size of the page)
178    //
179    // For example, 1 would mean there are 2 values in the chunk and 12 would mean there
180    // are 4Ki values in the chunk.
181    //
182    // This must be <= 12 (i.e. <= 4096 values)
183    pub log_num_values: u8,
184}
185
186impl MiniBlockChunk {
187    /// Gets the number of values in this block
188    ///
189    /// This requires `vals_in_prev_blocks` and `total_num_values` because the
190    /// last block in a page is a special case which stores 0 for log_num_values
191    /// and, in that case, the number of values is determined by subtracting
192    /// `vals_in_prev_blocks` from `total_num_values`
193    pub fn num_values(&self, vals_in_prev_blocks: u64, total_num_values: u64) -> u64 {
194        if self.log_num_values == 0 {
195            total_num_values - vals_in_prev_blocks
196        } else {
197            1 << self.log_num_values
198        }
199    }
200}
201
202/// Trait for compression algorithms that are suitable for use in the miniblock structural encoding
203///
204/// These compression algorithms should be capable of encoding the data into small chunks
205/// where each chunk (except the last) has 2^N values (N can vary between chunks)
206pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
207    /// Compress a `page` of data into multiple chunks
208    ///
209    /// See [`MiniBlockCompressed`] for details on how chunks should be sized.
210    ///
211    /// This method also returns a description of the encoding applied that will be
212    /// used at decode time to read the data.
213    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
214}
215
216/// Per-value compression must either:
217///
218/// A single buffer of fixed-width values
219/// A single buffer of value data and a buffer of offsets
220///
221/// TODO: In the future we may allow metadata buffers
222#[derive(Debug)]
223pub enum PerValueDataBlock {
224    Fixed(FixedWidthDataBlock),
225    Variable(VariableWidthBlock),
226}
227
228impl PerValueDataBlock {
229    pub fn data_size(&self) -> u64 {
230        match self {
231            Self::Fixed(fixed) => fixed.data_size(),
232            Self::Variable(variable) => variable.data_size(),
233        }
234    }
235}
236
237/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
238///
239/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock.  This is because
240/// we need to zip the data and those are the only two blocks we know how to zip today.
241///
242/// In addition, the compressed data must be able to be decompressed in a random-access fashion.
243/// This means that the decompression algorithm must be able to decompress any value without
244/// decompressing all values before it.
245pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
246    /// Compress the data into a single buffer
247    ///
248    /// Also returns a description of the compression that can be used to decompress when reading the data back
249    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
250}
251
252/// Trait for compression algorithms that compress an entire block of data into one opaque
253/// and self-described chunk.
254///
255/// This is the most general type of compression.  There are no constraints on the method
256/// of compression it is assumed that the entire block of data will be present at decompression.
257///
258/// This is the least appropriate strategy for random access because we must load the entire
259/// block to access any single value.  This should only be used for cases where random access is never
260/// required (e.g. when encoding metadata buffers like a dictionary or for encoding rep/def
261/// mini-block chunks)
262pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
263    /// Compress the data into a single buffer
264    ///
265    /// Also returns a description of the compression that can be used to decompress
266    /// when reading the data back
267    fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
268}
269
270pub fn values_column_encoding() -> pb::ColumnEncoding {
271    pb::ColumnEncoding {
272        column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
273    }
274}
275
276pub struct EncodedColumn {
277    pub column_buffers: Vec<LanceBuffer>,
278    pub encoding: pb::ColumnEncoding,
279    pub final_pages: Vec<EncodedPage>,
280}
281
282impl Default for EncodedColumn {
283    fn default() -> Self {
284        Self {
285            column_buffers: Default::default(),
286            encoding: pb::ColumnEncoding {
287                column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
288            },
289            final_pages: Default::default(),
290        }
291    }
292}
293
294/// A tool to reserve space for buffers that are not in-line with the data
295///
296/// In most cases, buffers are stored in the page and referred to in the encoding
297/// metadata by their index in the page.  This keeps all buffers within a page together.
298/// As a result, most encoders should not need to use this structure.
299///
300/// In some cases (currently only the large binary encoding) there is a need to access
301/// buffers that are not in the page (because storing the position / offset of every page
302/// in the page metadata would be too expensive).
303///
304/// To do this you can add a buffer with `add_buffer` and then use the returned position
305/// in some way (in the large binary encoding the returned position is stored in the page
306/// data as a position / size array).
307pub struct OutOfLineBuffers {
308    position: u64,
309    buffer_alignment: u64,
310    buffers: Vec<LanceBuffer>,
311}
312
313impl OutOfLineBuffers {
314    pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
315        Self {
316            position: base_position,
317            buffer_alignment,
318            buffers: Vec::new(),
319        }
320    }
321
322    pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
323        let position = self.position;
324        self.position += buffer.len() as u64;
325        self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
326        self.buffers.push(buffer);
327        position
328    }
329
330    pub fn take_buffers(self) -> Vec<LanceBuffer> {
331        self.buffers
332    }
333
334    pub fn reset_position(&mut self, position: u64) {
335        self.position = position;
336    }
337}
338
339/// A task to create a page of data
340pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
341
342/// Top level encoding trait to code any Arrow array type into one or more pages.
343///
344/// The field encoder implements buffering and encoding of a single input column
345/// but it may map to multiple output columns.  For example, a list array or struct
346/// array will be encoded into multiple columns.
347///
348/// Also, fields may be encoded at different speeds.  For example, given a struct
349/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
350/// tensor field) the tensor field is likely to emit encoded pages much more frequently
351/// than the boolean field.
352pub trait FieldEncoder: Send {
353    /// Buffer the data and, if there is enough data in the buffer to form a page, return
354    /// an encoding task to encode the data.
355    ///
356    /// This may return more than one task because a single column may be mapped to multiple
357    /// output columns.  For example, if encoding a struct column with three children then
358    /// up to three tasks may be returned from each call to maybe_encode.
359    ///
360    /// It may also return multiple tasks for a single column if the input array is larger
361    /// than a single disk page.
362    ///
363    /// It could also return an empty Vec if there is not enough data yet to encode any pages.
364    ///
365    /// The `row_number` must be passed which is the top-level row number currently being encoded
366    /// This is stored in any pages produced by this call so that we can know the priority of the
367    /// page.
368    ///
369    /// The `num_rows` is the number of top level rows.  It is initially the same as `array.len()`
370    /// however it is passed seprately because array will become flattened over time (if there is
371    /// repetition) and we need to know the original number of rows for various purposes.
372    fn maybe_encode(
373        &mut self,
374        array: ArrayRef,
375        external_buffers: &mut OutOfLineBuffers,
376        repdef: RepDefBuilder,
377        row_number: u64,
378        num_rows: u64,
379    ) -> Result<Vec<EncodeTask>>;
380    /// Flush any remaining data from the buffers into encoding tasks
381    ///
382    /// Each encode task produces a single page.  The order of these pages will be maintained
383    /// in the file (we do not worry about order between columns but all pages in the same
384    /// column should maintain order)
385    ///
386    /// This may be called intermittently throughout encoding but will always be called
387    /// once at the end of encoding just before calling finish
388    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
389    /// Finish encoding and return column metadata
390    ///
391    /// This is called only once, after all encode tasks have completed
392    ///
393    /// This returns a Vec because a single field may have created multiple columns
394    fn finish(
395        &mut self,
396        external_buffers: &mut OutOfLineBuffers,
397    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
398
399    /// The number of output columns this encoding will create
400    fn num_columns(&self) -> u32;
401}
402
403/// A trait to pick which encoding strategy to use for a single page
404/// of data
405///
406/// Presumably, implementations will make encoding decisions based on
407/// array statistics.
408pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
409    fn create_array_encoder(
410        &self,
411        arrays: &[ArrayRef],
412        field: &Field,
413    ) -> Result<Box<dyn ArrayEncoder>>;
414}
415
416/// A trait to pick which compression to use for given data
417///
418/// There are several different kinds of compression.
419///
420/// - Block compression is the most generic, but most difficult to use efficiently
421/// - Per-value compression results in either a fixed width data block or a variable
422///   width data block.  In other words, there is some number of bits per value.
423///   In addition, each value should be independently decompressible.
424/// - Mini-block compression results in a small block of opaque data for chunks
425///     of rows.  Each block is somewhere between 0 and 16KiB in size.  This is
426///     used for narrow data types (both fixed and variable length) where we can
427///     fit many values into an 16KiB block.
428pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
429    /// Create a block compressor for the given data
430    fn create_block_compressor(
431        &self,
432        field: &Field,
433        data: &DataBlock,
434    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
435
436    /// Create a per-value compressor for the given data
437    fn create_per_value(
438        &self,
439        field: &Field,
440        data: &DataBlock,
441    ) -> Result<Box<dyn PerValueCompressor>>;
442
443    /// Create a mini-block compressor for the given data
444    fn create_miniblock_compressor(
445        &self,
446        field: &Field,
447        data: &DataBlock,
448    ) -> Result<Box<dyn MiniBlockCompressor>>;
449}
450
451/// The core array encoding strategy is a set of basic encodings that
452/// are generally applicable in most scenarios.
453#[derive(Debug, Default)]
454pub struct CoreArrayEncodingStrategy {
455    pub version: LanceFileVersion,
456}
457
458const BINARY_DATATYPES: [DataType; 4] = [
459    DataType::Binary,
460    DataType::LargeBinary,
461    DataType::Utf8,
462    DataType::LargeUtf8,
463];
464
465impl CoreArrayEncodingStrategy {
466    fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
467        version >= LanceFileVersion::V2_1
468            && matches!(data_type, DataType::Utf8 | DataType::Binary)
469            && data_size > 4 * 1024 * 1024
470    }
471
472    fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
473        let compression = field_meta.get(COMPRESSION_META_KEY)?;
474        let compression_scheme = compression.parse::<CompressionScheme>();
475        match compression_scheme {
476            Ok(compression_scheme) => Some(CompressionConfig::new(
477                compression_scheme,
478                field_meta
479                    .get(COMPRESSION_LEVEL_META_KEY)
480                    .and_then(|level| level.parse().ok()),
481            )),
482            Err(_) => None,
483        }
484    }
485
486    fn default_binary_encoder(
487        arrays: &[ArrayRef],
488        data_type: &DataType,
489        field_meta: Option<&HashMap<String, String>>,
490        data_size: u64,
491        version: LanceFileVersion,
492    ) -> Result<Box<dyn ArrayEncoder>> {
493        let bin_indices_encoder =
494            Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
495
496        if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
497            if compression.scheme == CompressionScheme::Fsst {
498                // User requested FSST
499                let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
500                Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
501            } else {
502                // Generic compression
503                Ok(Box::new(BinaryEncoder::new(
504                    bin_indices_encoder,
505                    Some(compression),
506                )))
507            }
508        } else {
509            // No user-specified compression, use FSST if we can
510            let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
511            if Self::can_use_fsst(data_type, data_size, version) {
512                Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
513            } else {
514                Ok(bin_encoder)
515            }
516        }
517    }
518
519    fn choose_array_encoder(
520        arrays: &[ArrayRef],
521        data_type: &DataType,
522        data_size: u64,
523        use_dict_encoding: bool,
524        version: LanceFileVersion,
525        field_meta: Option<&HashMap<String, String>>,
526    ) -> Result<Box<dyn ArrayEncoder>> {
527        match data_type {
528            DataType::FixedSizeList(inner, dimension) => {
529                Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
530                    Self::choose_array_encoder(
531                        arrays,
532                        inner.data_type(),
533                        data_size,
534                        use_dict_encoding,
535                        version,
536                        None,
537                    )?,
538                    *dimension as u32,
539                )))))
540            }
541            DataType::Dictionary(key_type, value_type) => {
542                let key_encoder =
543                    Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
544                let value_encoder = Self::choose_array_encoder(
545                    arrays, value_type, data_size, false, version, None,
546                )?;
547
548                Ok(Box::new(AlreadyDictionaryEncoder::new(
549                    key_encoder,
550                    value_encoder,
551                )))
552            }
553            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
554                if use_dict_encoding {
555                    let dict_indices_encoder = Self::choose_array_encoder(
556                        // We need to pass arrays to this method to figure out what kind of compression to
557                        // use but we haven't actually calculated the indices yet.  For now, we just assume
558                        // worst case and use the full range.  In the future maybe we can pass in statistics
559                        // instead of the actual data
560                        &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
561                        &DataType::UInt8,
562                        data_size,
563                        false,
564                        version,
565                        None,
566                    )?;
567                    let dict_items_encoder = Self::choose_array_encoder(
568                        arrays,
569                        &DataType::Utf8,
570                        data_size,
571                        false,
572                        version,
573                        None,
574                    )?;
575
576                    Ok(Box::new(DictionaryEncoder::new(
577                        dict_indices_encoder,
578                        dict_items_encoder,
579                    )))
580                }
581                // The parent datatype should be binary or utf8 to use the fixed size encoding
582                // The variable 'data_type' is passed through recursion so comparing with it would be incorrect
583                else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
584                    if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
585                        // use FixedSizeBinaryEncoder
586                        let bytes_encoder = Self::choose_array_encoder(
587                            arrays,
588                            &DataType::UInt8,
589                            data_size,
590                            false,
591                            version,
592                            None,
593                        )?;
594
595                        Ok(Box::new(BasicEncoder::new(Box::new(
596                            FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
597                        ))))
598                    } else {
599                        Self::default_binary_encoder(
600                            arrays, data_type, field_meta, data_size, version,
601                        )
602                    }
603                } else {
604                    Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
605                }
606            }
607            DataType::Struct(fields) => {
608                let num_fields = fields.len();
609                let mut inner_encoders = Vec::new();
610
611                for i in 0..num_fields {
612                    let inner_datatype = fields[i].data_type();
613                    let inner_encoder = Self::choose_array_encoder(
614                        arrays,
615                        inner_datatype,
616                        data_size,
617                        use_dict_encoding,
618                        version,
619                        None,
620                    )?;
621                    inner_encoders.push(inner_encoder);
622                }
623
624                Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
625            }
626            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
627                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
628                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
629                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
630                        compressed_bit_width as usize,
631                        data_type.clone(),
632                    )))
633                } else {
634                    Ok(Box::new(BasicEncoder::new(Box::new(
635                        ValueEncoder::default(),
636                    ))))
637                }
638            }
639
640            // TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
641            // then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am
642            // thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only.
643            DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
644                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
645                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
646                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
647                        compressed_bit_width as usize,
648                        data_type.clone(),
649                    )))
650                } else {
651                    Ok(Box::new(BasicEncoder::new(Box::new(
652                        ValueEncoder::default(),
653                    ))))
654                }
655            }
656            _ => Ok(Box::new(BasicEncoder::new(Box::new(
657                ValueEncoder::default(),
658            )))),
659        }
660    }
661}
662
663fn get_dict_encoding_threshold() -> u64 {
664    env::var("LANCE_DICT_ENCODING_THRESHOLD")
665        .ok()
666        .and_then(|val| val.parse().ok())
667        .unwrap_or(100)
668}
669
670// check whether we want to use dictionary encoding or not
671// by applying a threshold on cardinality
672// returns true if cardinality < threshold but false if the total number of rows is less than the threshold
673// The choice to use 100 is just a heuristic for now
674// hyperloglog is used for cardinality estimation
675// error rate = 1.04 / sqrt(2^p), where p is the precision
676// and error rate is 1.04 / sqrt(2^12) = 1.56%
677fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
678    let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
679    if num_total_rows < threshold as usize {
680        return false;
681    }
682    const PRECISION: u8 = 12;
683
684    let mut hll: HyperLogLogPlus<String, RandomState> =
685        HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
686
687    for arr in arrays {
688        let string_array = arrow_array::cast::as_string_array(arr);
689        for value in string_array.iter().flatten() {
690            hll.insert(value);
691            let estimated_cardinality = hll.count() as u64;
692            if estimated_cardinality >= threshold {
693                return false;
694            }
695        }
696    }
697
698    true
699}
700
701fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
702    if version < LanceFileVersion::V2_1 || arrays.is_empty() {
703        return None;
704    }
705
706    // make sure no array has an empty string
707    if !arrays.iter().all(|arr| {
708        if let Some(arr) = arr.as_string_opt::<i32>() {
709            arr.iter().flatten().all(|s| !s.is_empty())
710        } else if let Some(arr) = arr.as_binary_opt::<i32>() {
711            arr.iter().flatten().all(|s| !s.is_empty())
712        } else if let Some(arr) = arr.as_string_opt::<i64>() {
713            arr.iter().flatten().all(|s| !s.is_empty())
714        } else if let Some(arr) = arr.as_binary_opt::<i64>() {
715            arr.iter().flatten().all(|s| !s.is_empty())
716        } else {
717            panic!("wrong dtype");
718        }
719    }) {
720        return None;
721    }
722
723    let lengths = arrays
724        .iter()
725        .flat_map(|arr| {
726            if let Some(arr) = arr.as_string_opt::<i32>() {
727                let offsets = arr.offsets().inner();
728                offsets
729                    .windows(2)
730                    .map(|w| (w[1] - w[0]) as u64)
731                    .collect::<Vec<_>>()
732            } else if let Some(arr) = arr.as_binary_opt::<i32>() {
733                let offsets = arr.offsets().inner();
734                offsets
735                    .windows(2)
736                    .map(|w| (w[1] - w[0]) as u64)
737                    .collect::<Vec<_>>()
738            } else if let Some(arr) = arr.as_string_opt::<i64>() {
739                let offsets = arr.offsets().inner();
740                offsets
741                    .windows(2)
742                    .map(|w| (w[1] - w[0]) as u64)
743                    .collect::<Vec<_>>()
744            } else if let Some(arr) = arr.as_binary_opt::<i64>() {
745                let offsets = arr.offsets().inner();
746                offsets
747                    .windows(2)
748                    .map(|w| (w[1] - w[0]) as u64)
749                    .collect::<Vec<_>>()
750            } else {
751                panic!("wrong dtype");
752            }
753        })
754        .collect::<Vec<_>>();
755
756    // find first non-zero value in lengths
757    let first_non_zero = lengths.iter().position(|&x| x != 0);
758    if let Some(first_non_zero) = first_non_zero {
759        // make sure all lengths are equal to first_non_zero length or zero
760        if !lengths
761            .iter()
762            .all(|&x| x == 0 || x == lengths[first_non_zero])
763        {
764            return None;
765        }
766
767        // set the byte width
768        Some(lengths[first_non_zero])
769    } else {
770        None
771    }
772}
773
774impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
775    fn create_array_encoder(
776        &self,
777        arrays: &[ArrayRef],
778        field: &Field,
779    ) -> Result<Box<dyn ArrayEncoder>> {
780        let data_size = arrays
781            .iter()
782            .map(|arr| arr.get_buffer_memory_size() as u64)
783            .sum::<u64>();
784        let data_type = arrays[0].data_type();
785
786        let use_dict_encoding = data_type == &DataType::Utf8
787            && check_dict_encoding(arrays, get_dict_encoding_threshold());
788
789        Self::choose_array_encoder(
790            arrays,
791            data_type,
792            data_size,
793            use_dict_encoding,
794            self.version,
795            Some(&field.metadata),
796        )
797    }
798}
799
800impl CompressionStrategy for CoreArrayEncodingStrategy {
801    fn create_miniblock_compressor(
802        &self,
803        _field: &Field,
804        data: &DataBlock,
805    ) -> Result<Box<dyn MiniBlockCompressor>> {
806        match data {
807            DataBlock::FixedWidth(fixed_width_data) => {
808                let bit_widths = data.expect_stat(Stat::BitWidth);
809                // Temporary hack to work around https://github.com/lancedb/lance/issues/3102
810                // Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
811                let has_all_zeros = bit_widths
812                    .as_primitive::<UInt64Type>()
813                    .values()
814                    .iter()
815                    .any(|v| *v == 0);
816                if !has_all_zeros
817                    && (fixed_width_data.bits_per_value == 8
818                        || fixed_width_data.bits_per_value == 16
819                        || fixed_width_data.bits_per_value == 32
820                        || fixed_width_data.bits_per_value == 64)
821                {
822                    Ok(Box::new(BitpackMiniBlockEncoder::default()))
823                } else {
824                    Ok(Box::new(ValueEncoder::default()))
825                }
826            }
827            DataBlock::VariableWidth(variable_width_data) => {
828                if variable_width_data.bits_per_offset == 32 {
829                    let data_size =
830                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
831                    let max_len =
832                        variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
833
834                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
835                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
836                    {
837                        Ok(Box::new(FsstMiniBlockEncoder::default()))
838                    } else {
839                        Ok(Box::new(BinaryMiniBlockEncoder::default()))
840                    }
841                } else {
842                    todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
843                }
844            }
845            DataBlock::Struct(struct_data_block) => {
846                // this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
847                // just being cautious here.
848                if struct_data_block
849                    .children
850                    .iter()
851                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
852                {
853                    panic!("packed struct encoding currently only supports fixed-width fields.")
854                }
855                Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
856            }
857            DataBlock::FixedSizeList(_) => {
858                // In theory we could use something like bitpacking here but it's not clear it would
859                // be very effective.  At most we would shave a few bytes off the first item in the
860                // list.  It might be more sophisticated to treat the FSL as a table and bitpack each
861                // column but that would be expensive as well so it's not clear that would be a win.  For
862                // now we just don't compress FSL
863                if data.is_variable() {
864                    todo!("Implement MiniBlockCompression for variable width FSL")
865                } else {
866                    Ok(Box::new(ValueEncoder::default()))
867                }
868            }
869            _ => Err(Error::NotSupported {
870                source: format!(
871                    "Mini-block compression not yet supported for block type {}",
872                    data.name()
873                )
874                .into(),
875                location: location!(),
876            }),
877        }
878    }
879
880    fn create_per_value(
881        &self,
882        _field: &Field,
883        data: &DataBlock,
884    ) -> Result<Box<dyn PerValueCompressor>> {
885        match data {
886            DataBlock::FixedWidth(_) => {
887                let encoder = Box::new(ValueEncoder::default());
888                Ok(encoder)
889            }
890            DataBlock::VariableWidth(variable_width) => {
891                if variable_width.bits_per_offset == 32 {
892                    let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
893                    let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
894
895                    let variable_compression = Box::new(VariableEncoder::default());
896
897                    if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
898                        && data_size >= FSST_LEAST_INPUT_SIZE as u64
899                    {
900                        Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
901                    } else {
902                        Ok(variable_compression)
903                    }
904                } else {
905                    todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
906                }
907            }
908            _ => unreachable!(),
909        }
910    }
911
912    fn create_block_compressor(
913        &self,
914        _field: &Field,
915        data: &DataBlock,
916    ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
917        match data {
918            // Right now we only need block compressors for rep/def which is u16.  Will need to expand
919            // this if we need block compression of other types.
920            DataBlock::FixedWidth(fixed_width) => {
921                let encoder = Box::new(ValueEncoder::default());
922                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
923                Ok((encoder, encoding))
924            }
925            DataBlock::VariableWidth(variable_width) => {
926                let encoder = Box::new(VariableEncoder::default());
927                let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
928                Ok((encoder, encoding))
929            }
930            _ => unreachable!(),
931        }
932    }
933}
934/// Keeps track of the current column index and makes a mapping
935/// from field id to column index
936#[derive(Debug, Default)]
937pub struct ColumnIndexSequence {
938    current_index: u32,
939    mapping: Vec<(u32, u32)>,
940}
941
942impl ColumnIndexSequence {
943    pub fn next_column_index(&mut self, field_id: u32) -> u32 {
944        let idx = self.current_index;
945        self.current_index += 1;
946        self.mapping.push((field_id, idx));
947        idx
948    }
949
950    pub fn skip(&mut self) {
951        self.current_index += 1;
952    }
953}
954
955/// Options that control the encoding process
956pub struct EncodingOptions {
957    /// How much data (in bytes) to cache in-memory before writing a page
958    ///
959    /// This cache is applied on a per-column basis
960    pub cache_bytes_per_column: u64,
961    /// The maximum size of a page in bytes, if a single array would create
962    /// a page larger than this then it will be split into multiple pages
963    pub max_page_bytes: u64,
964    /// If false (the default) then arrays will be copied (deeply) before
965    /// being cached.  This ensures any data kept alive by the array can
966    /// be discarded safely and helps avoid writer accumulation.  However,
967    /// there is an associated cost.
968    pub keep_original_array: bool,
969    /// The alignment that the writer is applying to buffers
970    ///
971    /// The encoder needs to know this so it figures the position of out-of-line
972    /// buffers correctly
973    pub buffer_alignment: u64,
974}
975
976impl Default for EncodingOptions {
977    fn default() -> Self {
978        Self {
979            cache_bytes_per_column: 8 * 1024 * 1024,
980            max_page_bytes: 32 * 1024 * 1024,
981            keep_original_array: true,
982            buffer_alignment: 64,
983        }
984    }
985}
986
987/// A trait to pick which kind of field encoding to use for a field
988///
989/// Unlike the ArrayEncodingStrategy, the field encoding strategy is
990/// chosen before any data is generated and the same field encoder is
991/// used for all data in the field.
992pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
993    /// Choose and create an appropriate field encoder for the given
994    /// field.
995    ///
996    /// The field encoder can be chosen on the data type as well as
997    /// any metadata that is attached to the field.
998    ///
999    /// The `encoding_strategy_root` is the encoder that should be
1000    /// used to encode any inner data in struct / list / etc. fields.
1001    ///
1002    /// Initially it is the same as `self` and generally should be
1003    /// forwarded to any inner encoding strategy.
1004    fn create_field_encoder(
1005        &self,
1006        encoding_strategy_root: &dyn FieldEncodingStrategy,
1007        field: &Field,
1008        column_index: &mut ColumnIndexSequence,
1009        options: &EncodingOptions,
1010    ) -> Result<Box<dyn FieldEncoder>>;
1011}
1012
1013pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
1014    match version.resolve() {
1015        LanceFileVersion::Legacy => panic!(),
1016        LanceFileVersion::V2_0 => Box::new(CoreFieldEncodingStrategy::default()),
1017        _ => Box::new(StructuralEncodingStrategy::default()),
1018    }
1019}
1020
1021/// The core field encoding strategy is a set of basic encodings that
1022/// are generally applicable in most scenarios.
1023#[derive(Debug)]
1024pub struct CoreFieldEncodingStrategy {
1025    pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
1026    pub version: LanceFileVersion,
1027}
1028
1029// For some reason clippy has a false negative and thinks this can be derived but
1030// it can't because ArrayEncodingStrategy has no default implementation
1031#[allow(clippy::derivable_impls)]
1032impl Default for CoreFieldEncodingStrategy {
1033    fn default() -> Self {
1034        Self {
1035            array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1036            version: LanceFileVersion::default(),
1037        }
1038    }
1039}
1040
1041impl CoreFieldEncodingStrategy {
1042    fn is_primitive_type(data_type: &DataType) -> bool {
1043        matches!(
1044            data_type,
1045            DataType::Boolean
1046                | DataType::Date32
1047                | DataType::Date64
1048                | DataType::Decimal128(_, _)
1049                | DataType::Decimal256(_, _)
1050                | DataType::Duration(_)
1051                | DataType::Float16
1052                | DataType::Float32
1053                | DataType::Float64
1054                | DataType::Int16
1055                | DataType::Int32
1056                | DataType::Int64
1057                | DataType::Int8
1058                | DataType::Interval(_)
1059                | DataType::Null
1060                | DataType::Time32(_)
1061                | DataType::Time64(_)
1062                | DataType::Timestamp(_, _)
1063                | DataType::UInt16
1064                | DataType::UInt32
1065                | DataType::UInt64
1066                | DataType::UInt8
1067                | DataType::FixedSizeBinary(_)
1068                | DataType::FixedSizeList(_, _)
1069                | DataType::Binary
1070                | DataType::LargeBinary
1071                | DataType::Utf8
1072                | DataType::LargeUtf8,
1073        )
1074    }
1075}
1076
1077impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
1078    fn create_field_encoder(
1079        &self,
1080        encoding_strategy_root: &dyn FieldEncodingStrategy,
1081        field: &Field,
1082        column_index: &mut ColumnIndexSequence,
1083        options: &EncodingOptions,
1084    ) -> Result<Box<dyn FieldEncoder>> {
1085        let data_type = field.data_type();
1086        if Self::is_primitive_type(&data_type) {
1087            let column_index = column_index.next_column_index(field.id as u32);
1088            if field.metadata.contains_key(BLOB_META_KEY) {
1089                let mut packed_meta = HashMap::new();
1090                packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
1091                let desc_field =
1092                    Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
1093                let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
1094                    options,
1095                    self.array_encoding_strategy.clone(),
1096                    column_index,
1097                    desc_field,
1098                )?);
1099                Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
1100            } else {
1101                Ok(Box::new(PrimitiveFieldEncoder::try_new(
1102                    options,
1103                    self.array_encoding_strategy.clone(),
1104                    column_index,
1105                    field.clone(),
1106                )?))
1107            }
1108        } else {
1109            match data_type {
1110                DataType::List(_child) | DataType::LargeList(_child) => {
1111                    let list_idx = column_index.next_column_index(field.id as u32);
1112                    let inner_encoding = encoding_strategy_root.create_field_encoder(
1113                        encoding_strategy_root,
1114                        &field.children[0],
1115                        column_index,
1116                        options,
1117                    )?;
1118                    let offsets_encoder =
1119                        Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
1120                    Ok(Box::new(ListFieldEncoder::new(
1121                        inner_encoding,
1122                        offsets_encoder,
1123                        options.cache_bytes_per_column,
1124                        options.keep_original_array,
1125                        list_idx,
1126                    )))
1127                }
1128                DataType::Struct(_) => {
1129                    let field_metadata = &field.metadata;
1130                    if field_metadata
1131                        .get(PACKED_STRUCT_LEGACY_META_KEY)
1132                        .map(|v| v == "true")
1133                        .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
1134                    {
1135                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
1136                            options,
1137                            self.array_encoding_strategy.clone(),
1138                            column_index.next_column_index(field.id as u32),
1139                            field.clone(),
1140                        )?))
1141                    } else {
1142                        let header_idx = column_index.next_column_index(field.id as u32);
1143                        let children_encoders = field
1144                            .children
1145                            .iter()
1146                            .map(|field| {
1147                                self.create_field_encoder(
1148                                    encoding_strategy_root,
1149                                    field,
1150                                    column_index,
1151                                    options,
1152                                )
1153                            })
1154                            .collect::<Result<Vec<_>>>()?;
1155                        Ok(Box::new(StructFieldEncoder::new(
1156                            children_encoders,
1157                            header_idx,
1158                        )))
1159                    }
1160                }
1161                DataType::Dictionary(_, value_type) => {
1162                    // A dictionary of primitive is, itself, primitive
1163                    if Self::is_primitive_type(&value_type) {
1164                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
1165                            options,
1166                            self.array_encoding_strategy.clone(),
1167                            column_index.next_column_index(field.id as u32),
1168                            field.clone(),
1169                        )?))
1170                    } else {
1171                        // A dictionary of logical is, itself, logical and we don't support that today
1172                        // It could be possible (e.g. store indices in one column and values in remaining columns)
1173                        // but would be a significant amount of work
1174                        //
1175                        // An easier fallback implementation would be to decode-on-write and encode-on-read
1176                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1177                    }
1178                }
1179                _ => todo!("Implement encoding for field {}", field),
1180            }
1181        }
1182    }
1183}
1184
1185/// An encoding strategy used for 2.1+ files
1186#[derive(Debug)]
1187pub struct StructuralEncodingStrategy {
1188    pub compression_strategy: Arc<dyn CompressionStrategy>,
1189    pub version: LanceFileVersion,
1190}
1191
1192// For some reason, clippy thinks we can add Default to the above derive but
1193// rustc doesn't agree (no default for Arc<dyn Trait>)
1194#[allow(clippy::derivable_impls)]
1195impl Default for StructuralEncodingStrategy {
1196    fn default() -> Self {
1197        Self {
1198            compression_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1199            version: LanceFileVersion::default(),
1200        }
1201    }
1202}
1203
1204impl StructuralEncodingStrategy {
1205    fn is_primitive_type(data_type: &DataType) -> bool {
1206        matches!(
1207            data_type,
1208            DataType::Boolean
1209                | DataType::Date32
1210                | DataType::Date64
1211                | DataType::Decimal128(_, _)
1212                | DataType::Decimal256(_, _)
1213                | DataType::Duration(_)
1214                | DataType::Float16
1215                | DataType::Float32
1216                | DataType::Float64
1217                | DataType::Int16
1218                | DataType::Int32
1219                | DataType::Int64
1220                | DataType::Int8
1221                | DataType::Interval(_)
1222                | DataType::Null
1223                | DataType::Time32(_)
1224                | DataType::Time64(_)
1225                | DataType::Timestamp(_, _)
1226                | DataType::UInt16
1227                | DataType::UInt32
1228                | DataType::UInt64
1229                | DataType::UInt8
1230                | DataType::FixedSizeBinary(_)
1231                | DataType::FixedSizeList(_, _)
1232                | DataType::Binary
1233                | DataType::LargeBinary
1234                | DataType::Utf8
1235                | DataType::LargeUtf8,
1236        )
1237    }
1238
1239    fn do_create_field_encoder(
1240        &self,
1241        _encoding_strategy_root: &dyn FieldEncodingStrategy,
1242        field: &Field,
1243        column_index: &mut ColumnIndexSequence,
1244        options: &EncodingOptions,
1245        root_field_metadata: &HashMap<String, String>,
1246    ) -> Result<Box<dyn FieldEncoder>> {
1247        let data_type = field.data_type();
1248        if Self::is_primitive_type(&data_type) {
1249            Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1250                options,
1251                self.compression_strategy.clone(),
1252                column_index.next_column_index(field.id as u32),
1253                field.clone(),
1254                Arc::new(root_field_metadata.clone()),
1255            )?))
1256        } else {
1257            match data_type {
1258                DataType::List(_) | DataType::LargeList(_) => {
1259                    let child = field.children.first().expect("List should have a child");
1260                    let child_encoder = self.do_create_field_encoder(
1261                        _encoding_strategy_root,
1262                        child,
1263                        column_index,
1264                        options,
1265                        root_field_metadata,
1266                    )?;
1267                    Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
1268                }
1269                DataType::Struct(_) => {
1270                    if field.is_packed_struct() {
1271                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1272                            options,
1273                            self.compression_strategy.clone(),
1274                            column_index.next_column_index(field.id as u32),
1275                            field.clone(),
1276                            Arc::new(root_field_metadata.clone()),
1277                        )?))
1278                    } else {
1279                        let children_encoders = field
1280                            .children
1281                            .iter()
1282                            .map(|field| {
1283                                self.do_create_field_encoder(
1284                                    _encoding_strategy_root,
1285                                    field,
1286                                    column_index,
1287                                    options,
1288                                    root_field_metadata,
1289                                )
1290                            })
1291                            .collect::<Result<Vec<_>>>()?;
1292                        Ok(Box::new(StructStructuralEncoder::new(children_encoders)))
1293                    }
1294                }
1295                DataType::Dictionary(_, value_type) => {
1296                    // A dictionary of primitive is, itself, primitive
1297                    if Self::is_primitive_type(&value_type) {
1298                        Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1299                            options,
1300                            self.compression_strategy.clone(),
1301                            column_index.next_column_index(field.id as u32),
1302                            field.clone(),
1303                            Arc::new(root_field_metadata.clone()),
1304                        )?))
1305                    } else {
1306                        // A dictionary of logical is, itself, logical and we don't support that today
1307                        // It could be possible (e.g. store indices in one column and values in remaining columns)
1308                        // but would be a significant amount of work
1309                        //
1310                        // An easier fallback implementation would be to decode-on-write and encode-on-read
1311                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1312                    }
1313                }
1314                _ => todo!("Implement encoding for field {}", field),
1315            }
1316        }
1317    }
1318}
1319
1320impl FieldEncodingStrategy for StructuralEncodingStrategy {
1321    fn create_field_encoder(
1322        &self,
1323        encoding_strategy_root: &dyn FieldEncodingStrategy,
1324        field: &Field,
1325        column_index: &mut ColumnIndexSequence,
1326        options: &EncodingOptions,
1327    ) -> Result<Box<dyn FieldEncoder>> {
1328        self.do_create_field_encoder(
1329            encoding_strategy_root,
1330            field,
1331            column_index,
1332            options,
1333            &field.metadata,
1334        )
1335    }
1336}
1337
1338/// A batch encoder that encodes RecordBatch objects by delegating
1339/// to field encoders for each top-level field in the batch.
1340pub struct BatchEncoder {
1341    pub field_encoders: Vec<Box<dyn FieldEncoder>>,
1342    pub field_id_to_column_index: Vec<(u32, u32)>,
1343}
1344
1345impl BatchEncoder {
1346    pub fn try_new(
1347        schema: &Schema,
1348        strategy: &dyn FieldEncodingStrategy,
1349        options: &EncodingOptions,
1350    ) -> Result<Self> {
1351        let mut col_idx = 0;
1352        let mut col_idx_sequence = ColumnIndexSequence::default();
1353        let field_encoders = schema
1354            .fields
1355            .iter()
1356            .map(|field| {
1357                let encoder = strategy.create_field_encoder(
1358                    strategy,
1359                    field,
1360                    &mut col_idx_sequence,
1361                    options,
1362                )?;
1363                col_idx += encoder.as_ref().num_columns();
1364                Ok(encoder)
1365            })
1366            .collect::<Result<Vec<_>>>()?;
1367        Ok(Self {
1368            field_encoders,
1369            field_id_to_column_index: col_idx_sequence.mapping,
1370        })
1371    }
1372
1373    pub fn num_columns(&self) -> u32 {
1374        self.field_encoders
1375            .iter()
1376            .map(|field_encoder| field_encoder.num_columns())
1377            .sum::<u32>()
1378    }
1379}
1380
1381/// An encoded batch of data and a page table describing it
1382///
1383/// This is returned by [`crate::encoder::encode_batch`]
1384#[derive(Debug)]
1385pub struct EncodedBatch {
1386    pub data: Bytes,
1387    pub page_table: Vec<Arc<ColumnInfo>>,
1388    pub schema: Arc<Schema>,
1389    pub top_level_columns: Vec<u32>,
1390    pub num_rows: u64,
1391}
1392
1393fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
1394    let buffers = page.data;
1395    let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
1396    for buffer in buffers {
1397        let buffer_offset = data_buffer.len() as u64;
1398        data_buffer.extend_from_slice(&buffer);
1399        let size = data_buffer.len() as u64 - buffer_offset;
1400        buffer_offsets_and_sizes.push((buffer_offset, size));
1401    }
1402
1403    PageInfo {
1404        buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
1405        encoding: page.description,
1406        num_rows: page.num_rows,
1407        priority: page.row_number,
1408    }
1409}
1410
1411/// Helper method to encode a batch of data into memory
1412///
1413/// This is primarily for testing and benchmarking but could be useful in other
1414/// niche situations like IPC.
1415pub async fn encode_batch(
1416    batch: &RecordBatch,
1417    schema: Arc<Schema>,
1418    encoding_strategy: &dyn FieldEncodingStrategy,
1419    options: &EncodingOptions,
1420) -> Result<EncodedBatch> {
1421    if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
1422    {
1423        return Err(Error::InvalidInput {
1424            source: format!(
1425                "buffer_alignment must be a power of two and at least {}",
1426                MIN_PAGE_BUFFER_ALIGNMENT
1427            )
1428            .into(),
1429            location: location!(),
1430        });
1431    }
1432
1433    let mut data_buffer = BytesMut::new();
1434    let lance_schema = Schema::try_from(batch.schema().as_ref())?;
1435    let options = EncodingOptions {
1436        keep_original_array: true,
1437        ..*options
1438    };
1439    let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
1440    let mut page_table = Vec::new();
1441    let mut col_idx_offset = 0;
1442    for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
1443        let mut external_buffers =
1444            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1445        let repdef = RepDefBuilder::default();
1446        let encoder = encoder.as_mut();
1447        let num_rows = arr.len() as u64;
1448        let mut tasks =
1449            encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
1450        tasks.extend(encoder.flush(&mut external_buffers)?);
1451        for buffer in external_buffers.take_buffers() {
1452            data_buffer.extend_from_slice(&buffer);
1453        }
1454        let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
1455        for task in tasks {
1456            let encoded_page = task.await?;
1457            // Write external buffers first
1458            pages
1459                .entry(encoded_page.column_idx)
1460                .or_default()
1461                .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
1462        }
1463        let mut external_buffers =
1464            OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1465        let encoded_columns = encoder.finish(&mut external_buffers).await?;
1466        for buffer in external_buffers.take_buffers() {
1467            data_buffer.extend_from_slice(&buffer);
1468        }
1469        let num_columns = encoded_columns.len();
1470        for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
1471            let col_idx = col_idx + col_idx_offset;
1472            let mut col_buffer_offsets_and_sizes = Vec::new();
1473            for buffer in encoded_column.column_buffers {
1474                let buffer_offset = data_buffer.len() as u64;
1475                data_buffer.extend_from_slice(&buffer);
1476                let size = data_buffer.len() as u64 - buffer_offset;
1477                col_buffer_offsets_and_sizes.push((buffer_offset, size));
1478            }
1479            for page in encoded_column.final_pages {
1480                pages
1481                    .entry(page.column_idx)
1482                    .or_default()
1483                    .push(write_page_to_data_buffer(page, &mut data_buffer));
1484            }
1485            let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
1486            page_table.push(Arc::new(ColumnInfo {
1487                index: col_idx as u32,
1488                buffer_offsets_and_sizes: Arc::from(
1489                    col_buffer_offsets_and_sizes.into_boxed_slice(),
1490                ),
1491                page_infos: Arc::from(col_pages.into_boxed_slice()),
1492                encoding: encoded_column.encoding,
1493            }))
1494        }
1495        col_idx_offset += num_columns;
1496    }
1497    let top_level_columns = batch_encoder
1498        .field_id_to_column_index
1499        .iter()
1500        .map(|(_, idx)| *idx)
1501        .collect();
1502    Ok(EncodedBatch {
1503        data: data_buffer.freeze(),
1504        top_level_columns,
1505        page_table,
1506        schema,
1507        num_rows: batch.num_rows() as u64,
1508    })
1509}
1510
1511#[cfg(test)]
1512pub mod tests {
1513    use crate::version::LanceFileVersion;
1514    use arrow_array::{ArrayRef, StringArray};
1515    use arrow_schema::Field;
1516    use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
1517    use std::collections::HashMap;
1518    use std::sync::Arc;
1519
1520    use super::check_fixed_size_encoding;
1521    use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};
1522
1523    fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
1524        let arr = StringArray::from(arr);
1525        let arr = Arc::new(arr) as ArrayRef;
1526        check_dict_encoding(&[arr], threshold)
1527    }
1528
1529    #[test]
1530    fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
1531        assert!(is_dict_encoding_applicable(
1532            vec![Some("a"), Some("b"), Some("a"), Some("b")],
1533            3,
1534        ));
1535    }
1536
1537    #[test]
1538    fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
1539        assert!(!is_dict_encoding_applicable(
1540            vec![Some("a"), Some("b"), Some("c"), Some("d")],
1541            3,
1542        ));
1543    }
1544
1545    #[test]
1546    fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
1547        assert!(!is_dict_encoding_applicable(
1548            vec![Some("a"), Some("b"), Some("c"), Some("a")],
1549            3,
1550        ));
1551    }
1552
1553    #[test]
1554    fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
1555        assert!(!is_dict_encoding_applicable(vec![], 3));
1556    }
1557
1558    #[test]
1559    fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
1560        assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
1561    }
1562
1563    fn is_fixed_size_encoding_applicable(
1564        arrays: Vec<Vec<Option<&str>>>,
1565        version: LanceFileVersion,
1566    ) -> bool {
1567        let mut final_arrays = Vec::new();
1568        for arr in arrays {
1569            let arr = StringArray::from(arr);
1570            let arr = Arc::new(arr) as ArrayRef;
1571            final_arrays.push(arr);
1572        }
1573
1574        check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
1575    }
1576
1577    #[test]
1578    fn test_fixed_size_binary_encoding_applicable() {
1579        assert!(!is_fixed_size_encoding_applicable(
1580            vec![vec![]],
1581            LanceFileVersion::V2_1
1582        ));
1583
1584        assert!(is_fixed_size_encoding_applicable(
1585            vec![vec![Some("a"), Some("b")]],
1586            LanceFileVersion::V2_1
1587        ));
1588
1589        assert!(!is_fixed_size_encoding_applicable(
1590            vec![vec![Some("abc"), Some("de")]],
1591            LanceFileVersion::V2_1
1592        ));
1593
1594        assert!(is_fixed_size_encoding_applicable(
1595            vec![vec![Some("pqr"), None]],
1596            LanceFileVersion::V2_1
1597        ));
1598
1599        assert!(!is_fixed_size_encoding_applicable(
1600            vec![vec![Some("pqr"), Some("")]],
1601            LanceFileVersion::V2_1
1602        ));
1603
1604        assert!(!is_fixed_size_encoding_applicable(
1605            vec![vec![Some(""), Some("")]],
1606            LanceFileVersion::V2_1
1607        ));
1608    }
1609
1610    #[test]
1611    fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
1612        assert!(is_fixed_size_encoding_applicable(
1613            vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
1614            LanceFileVersion::V2_1
1615        ));
1616
1617        assert!(!is_fixed_size_encoding_applicable(
1618            vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
1619            LanceFileVersion::V2_1
1620        ));
1621
1622        assert!(!is_fixed_size_encoding_applicable(
1623            vec![vec![Some("ab"), None], vec![None, Some("d")]],
1624            LanceFileVersion::V2_1
1625        ));
1626
1627        assert!(is_fixed_size_encoding_applicable(
1628            vec![vec![Some("a"), None], vec![None, Some("d")]],
1629            LanceFileVersion::V2_1
1630        ));
1631
1632        assert!(!is_fixed_size_encoding_applicable(
1633            vec![vec![Some(""), None], vec![None, Some("")]],
1634            LanceFileVersion::V2_1
1635        ));
1636
1637        assert!(!is_fixed_size_encoding_applicable(
1638            vec![vec![None, None], vec![None, None]],
1639            LanceFileVersion::V2_1
1640        ));
1641    }
1642
1643    fn verify_array_encoder(
1644        array: ArrayRef,
1645        field_meta: Option<HashMap<String, String>>,
1646        version: LanceFileVersion,
1647        expected_encoder: &str,
1648    ) {
1649        let encoding_strategy = CoreArrayEncodingStrategy { version };
1650        let mut field = Field::new("test_field", array.data_type().clone(), true);
1651        if let Some(field_meta) = field_meta {
1652            field.set_metadata(field_meta);
1653        }
1654        let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
1655        let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
1656        assert!(encoder_result.is_ok());
1657        let encoder = encoder_result.unwrap();
1658        assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
1659    }
1660
1661    #[test]
1662    fn test_choose_encoder_for_zstd_compressed_string_field() {
1663        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1664                             Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
1665                             LanceFileVersion::V2_1,
1666                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
1667    }
1668
1669    #[test]
1670    fn test_choose_encoder_for_zstd_compression_level() {
1671        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1672                             Some(HashMap::from([
1673                                 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
1674                                 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
1675                             ])),
1676                             LanceFileVersion::V2_1,
1677                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
1678    }
1679}