lance_encoding/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_buffer::bit_util;
5use arrow_schema::DataType;
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9use snafu::location;
10use std::ops::Range;
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::LanceBuffer;
14use crate::data::{
15    BlockInfo, ConstantDataBlock, DataBlock, FixedSizeListBlock, FixedWidthDataBlock,
16};
17use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor};
18use crate::encoder::{
19    BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
20    PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES,
21};
22use crate::format::pb::{self, ArrayEncoding};
23use crate::format::ProtobufUtils;
24use crate::{
25    decoder::{PageScheduler, PrimitivePageDecoder},
26    encoder::{ArrayEncoder, EncodedArray},
27    EncodingsIo,
28};
29
30use lance_core::{Error, Result};
31
32use super::block_compress::{CompressionConfig, CompressionScheme, GeneralBufferCompressor};
33
34/// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk
35#[derive(Debug, Clone, Copy)]
36pub struct ValuePageScheduler {
37    // TODO: do we really support values greater than 2^32 bytes per value?
38    // I think we want to, in theory, but will need to test this case.
39    bytes_per_value: u64,
40    buffer_offset: u64,
41    buffer_size: u64,
42    compression_config: CompressionConfig,
43}
44
45impl ValuePageScheduler {
46    pub fn new(
47        bytes_per_value: u64,
48        buffer_offset: u64,
49        buffer_size: u64,
50        compression_config: CompressionConfig,
51    ) -> Self {
52        Self {
53            bytes_per_value,
54            buffer_offset,
55            buffer_size,
56            compression_config,
57        }
58    }
59}
60
61impl PageScheduler for ValuePageScheduler {
62    fn schedule_ranges(
63        &self,
64        ranges: &[std::ops::Range<u64>],
65        scheduler: &Arc<dyn EncodingsIo>,
66        top_level_row: u64,
67    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
68        let (mut min, mut max) = (u64::MAX, 0);
69        let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
70            ranges
71                .iter()
72                .map(|range| {
73                    let start = self.buffer_offset + (range.start * self.bytes_per_value);
74                    let end = self.buffer_offset + (range.end * self.bytes_per_value);
75                    min = min.min(start);
76                    max = max.max(end);
77                    start..end
78                })
79                .collect::<Vec<_>>()
80        } else {
81            min = self.buffer_offset;
82            max = self.buffer_offset + self.buffer_size;
83            // for compressed page, the ranges are always the entire page,
84            // and it is guaranteed that only one range is passed
85            vec![Range {
86                start: min,
87                end: max,
88            }]
89        };
90
91        trace!(
92            "Scheduling I/O for {} ranges spread across byte range {}..{}",
93            byte_ranges.len(),
94            min,
95            max
96        );
97        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
98        let bytes_per_value = self.bytes_per_value;
99
100        let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
101            ranges
102                .iter()
103                .map(|range| {
104                    let start = (range.start * bytes_per_value) as usize;
105                    let end = (range.end * bytes_per_value) as usize;
106                    start..end
107                })
108                .collect::<Vec<_>>()
109        } else {
110            vec![]
111        };
112
113        let compression_config = self.compression_config;
114        async move {
115            let bytes = bytes.await?;
116
117            Ok(Box::new(ValuePageDecoder {
118                bytes_per_value,
119                data: bytes,
120                uncompressed_data: Arc::new(Mutex::new(None)),
121                uncompressed_range_offsets: range_offsets,
122                compression_config,
123            }) as Box<dyn PrimitivePageDecoder>)
124        }
125        .boxed()
126    }
127}
128
129struct ValuePageDecoder {
130    bytes_per_value: u64,
131    data: Vec<Bytes>,
132    uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
133    uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
134    compression_config: CompressionConfig,
135}
136
137impl ValuePageDecoder {
138    fn decompress(&self) -> Result<Vec<Bytes>> {
139        // for compressed page, it is guaranteed that only one range is passed
140        let bytes_u8: Vec<u8> = self.data[0].to_vec();
141        let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config);
142        let mut uncompressed_bytes: Vec<u8> = Vec::new();
143        buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
144
145        let mut bytes_in_ranges: Vec<Bytes> =
146            Vec::with_capacity(self.uncompressed_range_offsets.len());
147        for range in &self.uncompressed_range_offsets {
148            let start = range.start;
149            let end = range.end;
150            bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
151        }
152        Ok(bytes_in_ranges)
153    }
154
155    fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
156        let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
157        if uncompressed_bytes.is_none() {
158            *uncompressed_bytes = Some(self.decompress()?);
159        }
160        Ok(Arc::clone(&self.uncompressed_data))
161    }
162
163    fn is_compressed(&self) -> bool {
164        !self.uncompressed_range_offsets.is_empty()
165    }
166
167    fn decode_buffers<'a>(
168        &'a self,
169        buffers: impl IntoIterator<Item = &'a Bytes>,
170        mut bytes_to_skip: u64,
171        mut bytes_to_take: u64,
172    ) -> LanceBuffer {
173        let mut dest: Option<Vec<u8>> = None;
174
175        for buf in buffers.into_iter() {
176            let buf_len = buf.len() as u64;
177            if bytes_to_skip > buf_len {
178                bytes_to_skip -= buf_len;
179            } else {
180                let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
181                bytes_to_take -= bytes_to_take_here;
182                let start = bytes_to_skip as usize;
183                let end = start + bytes_to_take_here as usize;
184                let slice = buf.slice(start..end);
185                match (&mut dest, bytes_to_take) {
186                    (None, 0) => {
187                        // The entire request is contained in one buffer so we can maybe zero-copy
188                        // if the slice is aligned properly
189                        return LanceBuffer::from_bytes(slice, self.bytes_per_value);
190                    }
191                    (None, _) => {
192                        dest.replace(Vec::with_capacity(bytes_to_take as usize));
193                    }
194                    _ => {}
195                }
196                dest.as_mut().unwrap().extend_from_slice(&slice);
197                bytes_to_skip = 0;
198            }
199        }
200        LanceBuffer::from(dest.unwrap_or_default())
201    }
202}
203
204impl PrimitivePageDecoder for ValuePageDecoder {
205    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
206        let bytes_to_skip = rows_to_skip * self.bytes_per_value;
207        let bytes_to_take = num_rows * self.bytes_per_value;
208
209        let data_buffer = if self.is_compressed() {
210            let decoding_data = self.get_uncompressed_bytes()?;
211            let buffers = decoding_data.lock().unwrap();
212            self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
213        } else {
214            self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
215        };
216        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
217            bits_per_value: self.bytes_per_value * 8,
218            data: data_buffer,
219            num_values: num_rows,
220            block_info: BlockInfo::new(),
221        }))
222    }
223}
224
225/// A compression strategy that writes fixed-width data as-is (no compression)
226#[derive(Debug, Default)]
227pub struct ValueEncoder {}
228
229impl ValueEncoder {
230    /// Use the largest chunk we can smaller than 4KiB
231    fn find_log_vals_per_chunk(bytes_per_value: u64) -> (u64, u64) {
232        let mut size_bytes = 2 * bytes_per_value;
233        let mut log_num_vals = 1;
234        let mut num_vals = 2;
235
236        // If the type is so wide that we can't even fit 2 values we shouldn't be here
237        assert!(size_bytes < MAX_MINIBLOCK_BYTES);
238
239        while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals < MAX_MINIBLOCK_VALUES {
240            log_num_vals += 1;
241            size_bytes *= 2;
242            num_vals *= 2;
243        }
244
245        (log_num_vals, num_vals)
246    }
247
248    fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
249        // For now, only support byte-sized data
250        debug_assert!(data.bits_per_value % 8 == 0);
251        let bytes_per_value = data.bits_per_value / 8;
252
253        // Aim for 4KiB chunks
254        let (log_vals_per_chunk, vals_per_chunk) = Self::find_log_vals_per_chunk(bytes_per_value);
255        let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
256        let bytes_per_chunk = bytes_per_value * vals_per_chunk;
257        let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
258
259        let data_buffer = data.data;
260
261        let mut row_offset = 0;
262        let mut chunks = Vec::with_capacity(num_chunks);
263
264        let mut bytes_counter = 0;
265        loop {
266            if row_offset + vals_per_chunk <= data.num_values {
267                chunks.push(MiniBlockChunk {
268                    log_num_values: log_vals_per_chunk as u8,
269                    num_bytes: bytes_per_chunk,
270                });
271                row_offset += vals_per_chunk;
272                bytes_counter += bytes_per_chunk as u64;
273            } else {
274                // Final chunk, special values
275                let num_bytes = data_buffer.len() as u64 - bytes_counter;
276                let num_bytes = u16::try_from(num_bytes).unwrap();
277                chunks.push(MiniBlockChunk {
278                    log_num_values: 0,
279                    num_bytes,
280                });
281                break;
282            }
283        }
284
285        MiniBlockCompressed {
286            chunks,
287            data: data_buffer,
288            num_values: data.num_values,
289        }
290    }
291
292    fn make_fsl_encoding(data: &FixedSizeListBlock) -> ArrayEncoding {
293        let inner_encoding = match data.child.as_ref() {
294            DataBlock::FixedWidth(fixed_width) => {
295                ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
296            }
297            DataBlock::FixedSizeList(fsl) => Self::make_fsl_encoding(fsl),
298            _ => unreachable!(),
299        };
300        ProtobufUtils::fixed_size_list(inner_encoding, data.dimension)
301    }
302}
303
304impl BlockCompressor for ValueEncoder {
305    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
306        let data = match data {
307            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
308            _ => unimplemented!(
309                "Cannot compress block of type {} with ValueEncoder",
310                data.name()
311            ),
312        };
313        Ok(data)
314    }
315}
316
317impl ArrayEncoder for ValueEncoder {
318    fn encode(
319        &self,
320        data: DataBlock,
321        _data_type: &DataType,
322        buffer_index: &mut u32,
323    ) -> Result<EncodedArray> {
324        let index = *buffer_index;
325        *buffer_index += 1;
326
327        let encoding = match &data {
328            DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
329                fixed_width.bits_per_value,
330                index,
331                None,
332            )),
333            _ => Err(Error::InvalidInput {
334                source: format!(
335                    "Cannot encode a data block of type {} with ValueEncoder",
336                    data.name()
337                )
338                .into(),
339                location: location!(),
340            }),
341        }?;
342        Ok(EncodedArray { data, encoding })
343    }
344}
345
346impl MiniBlockCompressor for ValueEncoder {
347    fn compress(
348        &self,
349        chunk: DataBlock,
350    ) -> Result<(
351        crate::encoder::MiniBlockCompressed,
352        crate::format::pb::ArrayEncoding,
353    )> {
354        match chunk {
355            DataBlock::FixedWidth(fixed_width) => {
356                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
357                Ok((Self::chunk_data(fixed_width), encoding))
358            }
359            DataBlock::FixedSizeList(mut fixed_size_list) => {
360                let flattened = fixed_size_list.flatten_as_fixed();
361                let encoding = Self::make_fsl_encoding(&fixed_size_list);
362                Ok((Self::chunk_data(flattened), encoding))
363            }
364            _ => Err(Error::InvalidInput {
365                source: format!(
366                    "Cannot compress a data block of type {} with ValueEncoder",
367                    chunk.name()
368                )
369                .into(),
370                location: location!(),
371            }),
372        }
373    }
374}
375
376/// A decompressor for constant-encoded data
377#[derive(Debug)]
378pub struct ConstantDecompressor {
379    scalar: LanceBuffer,
380    num_values: u64,
381}
382
383impl ConstantDecompressor {
384    pub fn new(scalar: LanceBuffer, num_values: u64) -> Self {
385        Self {
386            scalar: scalar.into_borrowed(),
387            num_values,
388        }
389    }
390}
391
392impl BlockDecompressor for ConstantDecompressor {
393    fn decompress(&self, _data: LanceBuffer) -> Result<DataBlock> {
394        Ok(DataBlock::Constant(ConstantDataBlock {
395            data: self.scalar.try_clone().unwrap(),
396            num_values: self.num_values,
397        }))
398    }
399}
400
401/// A decompressor for fixed-width data that has
402/// been written, as-is, to disk in single contiguous array
403#[derive(Debug)]
404pub struct ValueDecompressor {
405    bytes_per_value: u64,
406}
407
408impl ValueDecompressor {
409    pub fn new(description: &pb::Flat) -> Self {
410        assert!(description.bits_per_value % 8 == 0);
411        Self {
412            bytes_per_value: description.bits_per_value / 8,
413        }
414    }
415
416    fn buffer_to_block(&self, data: LanceBuffer) -> DataBlock {
417        DataBlock::FixedWidth(FixedWidthDataBlock {
418            bits_per_value: self.bytes_per_value * 8,
419            num_values: data.len() as u64 / self.bytes_per_value,
420            data,
421            block_info: BlockInfo::new(),
422        })
423    }
424}
425
426impl BlockDecompressor for ValueDecompressor {
427    fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
428        Ok(self.buffer_to_block(data))
429    }
430}
431
432impl MiniBlockDecompressor for ValueDecompressor {
433    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
434        assert_eq!(data.len() as u64, num_values * self.bytes_per_value);
435        Ok(self.buffer_to_block(data))
436    }
437}
438
439impl FixedPerValueDecompressor for ValueDecompressor {
440    fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock> {
441        Ok(DataBlock::FixedWidth(data))
442    }
443
444    fn bits_per_value(&self) -> u64 {
445        self.bytes_per_value * 8
446    }
447}
448
449impl PerValueCompressor for ValueEncoder {
450    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
451        let (data, encoding) = match data {
452            DataBlock::FixedWidth(fixed_width) => {
453                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
454                (PerValueDataBlock::Fixed(fixed_width), encoding)
455            }
456            _ => unimplemented!(
457                "Cannot compress block of type {} with ValueEncoder",
458                data.name()
459            ),
460        };
461        Ok((data, encoding))
462    }
463}
464
465// public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list
466#[cfg(test)]
467pub(crate) mod tests {
468    use std::{collections::HashMap, sync::Arc};
469
470    use arrow_array::{Array, ArrayRef, Decimal128Array, Int32Array};
471    use arrow_schema::{DataType, Field, TimeUnit};
472    use rstest::rstest;
473
474    use crate::{
475        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
476        version::LanceFileVersion,
477    };
478
479    const PRIMITIVE_TYPES: &[DataType] = &[
480        DataType::Null,
481        DataType::FixedSizeBinary(2),
482        DataType::Date32,
483        DataType::Date64,
484        DataType::Int8,
485        DataType::Int16,
486        DataType::Int32,
487        DataType::Int64,
488        DataType::UInt8,
489        DataType::UInt16,
490        DataType::UInt32,
491        DataType::UInt64,
492        DataType::Float16,
493        DataType::Float32,
494        DataType::Float64,
495        DataType::Decimal128(10, 10),
496        DataType::Decimal256(10, 10),
497        DataType::Timestamp(TimeUnit::Nanosecond, None),
498        DataType::Time32(TimeUnit::Second),
499        DataType::Time64(TimeUnit::Nanosecond),
500        DataType::Duration(TimeUnit::Second),
501        // The Interval type is supported by the reader but the writer works with Lance schema
502        // at the moment and Lance schema can't parse interval
503        // DataType::Interval(IntervalUnit::DayTime),
504    ];
505
506    #[rstest]
507    #[test_log::test(tokio::test)]
508    async fn test_value_primitive(
509        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
510    ) {
511        for data_type in PRIMITIVE_TYPES {
512            log::info!("Testing encoding for {:?}", data_type);
513            let field = Field::new("", data_type.clone(), false);
514            check_round_trip_encoding_random(field, version).await;
515        }
516    }
517
518    lazy_static::lazy_static! {
519        static ref LARGE_TYPES: Vec<DataType> = vec![DataType::FixedSizeList(
520            Arc::new(Field::new("", DataType::Int32, false)),
521            128,
522        )];
523    }
524
525    #[rstest]
526    #[test_log::test(tokio::test)]
527    async fn test_large_primitive(
528        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
529    ) {
530        for data_type in LARGE_TYPES.iter() {
531            log::info!("Testing encoding for {:?}", data_type);
532            let field = Field::new("", data_type.clone(), false);
533            check_round_trip_encoding_random(field, version).await;
534        }
535    }
536
537    #[test_log::test(tokio::test)]
538    async fn test_decimal128_dictionary_encoding() {
539        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
540        let decimals: Vec<i32> = (0..100).collect();
541        let repeated_strings: Vec<_> = decimals
542            .iter()
543            .cycle()
544            .take(decimals.len() * 10000)
545            .map(|&v| Some(v as i128))
546            .collect();
547        let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
548        check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
549    }
550
551    #[test_log::test(tokio::test)]
552    async fn test_miniblock_stress() {
553        // Tests for strange page sizes and batch sizes and validity scenarios for miniblock
554
555        // 10K integers, 100 per array, all valid
556        let data1 = (0..100)
557            .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
558            .collect::<Vec<_>>();
559
560        // Same as above but with mixed validity
561        let data2 = (0..100)
562            .map(|_| {
563                Arc::new(Int32Array::from_iter((0..100).map(|i| {
564                    if i % 2 == 0 {
565                        Some(i)
566                    } else {
567                        None
568                    }
569                }))) as Arc<dyn Array>
570            })
571            .collect::<Vec<_>>();
572
573        // Same as above but with all null for first half then all valid
574        // TODO: Re-enable once the all-null path is complete
575        let _data3 = (0..100)
576            .map(|chunk_idx| {
577                Arc::new(Int32Array::from_iter((0..100).map(|i| {
578                    if chunk_idx < 50 {
579                        None
580                    } else {
581                        Some(i)
582                    }
583                }))) as Arc<dyn Array>
584            })
585            .collect::<Vec<_>>();
586
587        for data in [data1, data2 /*data3*/] {
588            for batch_size in [10, 100, 1500, 15000] {
589                // 40000 bytes of data
590                let test_cases = TestCases::default()
591                    .with_page_sizes(vec![1000, 2000, 3000, 60000])
592                    .with_batch_size(batch_size)
593                    .with_file_version(LanceFileVersion::V2_1);
594
595                check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
596            }
597        }
598    }
599}