lance_encoding/encodings/physical/
fixed_size_binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9use lance_core::Result;
10
11use crate::{
12    buffer::LanceBuffer,
13    data::{BlockInfo, DataBlock, FixedWidthDataBlock, VariableWidthBlock},
14    decoder::{PageScheduler, PrimitivePageDecoder},
15    encoder::{ArrayEncoder, EncodedArray},
16    format::ProtobufUtils,
17    EncodingsIo,
18};
19
20/// A scheduler for fixed size binary data
21#[derive(Debug)]
22pub struct FixedSizeBinaryPageScheduler {
23    bytes_scheduler: Box<dyn PageScheduler>,
24    byte_width: u32,
25    bytes_per_offset: u32,
26}
27
28impl FixedSizeBinaryPageScheduler {
29    pub fn new(
30        bytes_scheduler: Box<dyn PageScheduler>,
31        byte_width: u32,
32        bytes_per_offset: u32,
33    ) -> Self {
34        Self {
35            bytes_scheduler,
36            byte_width,
37            bytes_per_offset,
38        }
39    }
40}
41
42impl PageScheduler for FixedSizeBinaryPageScheduler {
43    fn schedule_ranges(
44        &self,
45        ranges: &[std::ops::Range<u64>],
46        scheduler: &Arc<dyn EncodingsIo>,
47        top_level_row: u64,
48    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
49        let expanded_ranges = ranges
50            .iter()
51            .map(|range| {
52                (range.start * self.byte_width as u64)..(range.end * self.byte_width as u64)
53            })
54            .collect::<Vec<_>>();
55
56        let bytes_page_decoder =
57            self.bytes_scheduler
58                .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59
60        let byte_width = self.byte_width as u64;
61        let bytes_per_offset = self.bytes_per_offset;
62
63        async move {
64            let bytes_decoder = bytes_page_decoder.await?;
65            Ok(Box::new(FixedSizeBinaryDecoder {
66                bytes_decoder,
67                byte_width,
68                bytes_per_offset,
69            }) as Box<dyn PrimitivePageDecoder>)
70        }
71        .boxed()
72    }
73}
74
75pub struct FixedSizeBinaryDecoder {
76    bytes_decoder: Box<dyn PrimitivePageDecoder>,
77    byte_width: u64,
78    bytes_per_offset: u32,
79}
80
81impl PrimitivePageDecoder for FixedSizeBinaryDecoder {
82    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
83        let rows_to_skip = rows_to_skip * self.byte_width;
84        let num_bytes = num_rows * self.byte_width;
85        let bytes = self.bytes_decoder.decode(rows_to_skip, num_bytes)?;
86        let bytes = bytes.as_fixed_width().unwrap();
87        debug_assert_eq!(bytes.bits_per_value, self.byte_width * 8);
88
89        let offsets_buffer = match self.bytes_per_offset {
90            8 => {
91                let offsets_vec = (0..(num_rows + 1))
92                    .map(|i| i * self.byte_width)
93                    .collect::<Vec<_>>();
94
95                ScalarBuffer::from(offsets_vec).into_inner()
96            }
97            4 => {
98                let offsets_vec = (0..(num_rows as u32 + 1))
99                    .map(|i| i * self.byte_width as u32)
100                    .collect::<Vec<_>>();
101
102                ScalarBuffer::from(offsets_vec).into_inner()
103            }
104            _ => panic!("Unsupported offsets type"),
105        };
106
107        let string_data = DataBlock::VariableWidth(VariableWidthBlock {
108            bits_per_offset: (self.bytes_per_offset * 8) as u8,
109            data: bytes.data,
110            num_values: num_rows,
111            offsets: LanceBuffer::from(offsets_buffer),
112            block_info: BlockInfo::new(),
113        });
114
115        Ok(string_data)
116    }
117}
118
119#[derive(Debug)]
120pub struct FixedSizeBinaryEncoder {
121    bytes_encoder: Box<dyn ArrayEncoder>,
122    byte_width: usize,
123}
124
125impl FixedSizeBinaryEncoder {
126    pub fn new(bytes_encoder: Box<dyn ArrayEncoder>, byte_width: usize) -> Self {
127        Self {
128            bytes_encoder,
129            byte_width,
130        }
131    }
132}
133
134impl ArrayEncoder for FixedSizeBinaryEncoder {
135    fn encode(
136        &self,
137        data: DataBlock,
138        _data_type: &DataType,
139        buffer_index: &mut u32,
140    ) -> Result<EncodedArray> {
141        let bytes_data = data.as_variable_width().unwrap();
142        let fixed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
143            bits_per_value: 8 * self.byte_width as u64,
144            data: bytes_data.data,
145            num_values: bytes_data.num_values,
146            block_info: BlockInfo::new(),
147        });
148
149        let encoded_data = self.bytes_encoder.encode(
150            fixed_data,
151            &DataType::FixedSizeBinary(self.byte_width as i32),
152            buffer_index,
153        )?;
154        let encoding =
155            ProtobufUtils::fixed_size_binary(encoded_data.encoding, self.byte_width as u32);
156
157        Ok(EncodedArray {
158            data: encoded_data.data,
159            encoding,
160        })
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use std::{collections::HashMap, sync::Arc};
167
168    use arrow::array::LargeStringBuilder;
169    use arrow_array::{Array, ArrayRef, FixedSizeBinaryArray, LargeStringArray, StringArray};
170    use arrow_buffer::Buffer;
171    use arrow_data::ArrayData;
172    use arrow_schema::{DataType, Field};
173
174    use crate::data::{DataBlock, FixedWidthDataBlock};
175    use crate::decoder::PrimitivePageDecoder;
176    use crate::encodings::physical::fixed_size_binary::FixedSizeBinaryDecoder;
177    use crate::{
178        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
179        version::LanceFileVersion,
180    };
181
182    #[test_log::test(tokio::test)]
183    async fn test_fixed_size_utf8_binary() {
184        let field = Field::new("", DataType::Utf8, false);
185        // This test only generates fixed size binary arrays anyway
186        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
187    }
188
189    #[test_log::test(tokio::test)]
190    async fn test_fixed_size_binary() {
191        let field = Field::new("", DataType::Binary, false);
192        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
193    }
194
195    #[test_log::test(tokio::test)]
196    async fn test_fixed_size_large_binary() {
197        let field = Field::new("", DataType::LargeBinary, true);
198        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
199    }
200
201    #[test_log::test(tokio::test)]
202    async fn test_fixed_size_large_utf8() {
203        let field = Field::new("", DataType::LargeUtf8, true);
204        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
205    }
206
207    #[test_log::test(tokio::test)]
208    async fn test_simple_fixed_size_utf8() {
209        let string_array = StringArray::from(vec![
210            Some("abc"),
211            Some("def"),
212            Some("ghi"),
213            Some("jkl"),
214            Some("mno"),
215        ]);
216
217        let test_cases = TestCases::default()
218            .with_range(0..2)
219            .with_range(0..3)
220            .with_range(1..3)
221            .with_indices(vec![0, 1, 3, 4]);
222
223        check_round_trip_encoding_of_data(
224            vec![Arc::new(string_array)],
225            &test_cases,
226            HashMap::new(),
227        )
228        .await;
229    }
230
231    #[test_log::test(tokio::test)]
232    async fn test_simple_fixed_size_with_nulls_utf8() {
233        let string_array =
234            LargeStringArray::from(vec![Some("abc"), None, Some("ghi"), None, Some("mno")]);
235
236        let test_cases = TestCases::default()
237            .with_range(0..2)
238            .with_range(0..3)
239            .with_range(1..3)
240            .with_indices(vec![0, 1, 3, 4]);
241
242        check_round_trip_encoding_of_data(
243            vec![Arc::new(string_array)],
244            &test_cases,
245            HashMap::new(),
246        )
247        .await;
248    }
249
250    #[test_log::test(tokio::test)]
251    async fn test_fixed_size_sliced_utf8() {
252        let string_array = StringArray::from(vec![Some("abc"), Some("def"), None, Some("fgh")]);
253        let string_array = string_array.slice(1, 3);
254
255        let test_cases = TestCases::default()
256            .with_range(0..1)
257            .with_range(0..2)
258            .with_range(1..2);
259        check_round_trip_encoding_of_data(
260            vec![Arc::new(string_array)],
261            &test_cases,
262            HashMap::new(),
263        )
264        .await;
265    }
266
267    #[test_log::test(tokio::test)]
268    async fn test_fixed_size_empty_strings() {
269        // All strings are empty
270
271        // When encoding an array of empty strings there are no bytes to encode
272        // which is strange and we want to ensure we handle it
273        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
274
275        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
276        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
277            .await;
278        let test_cases = test_cases.with_batch_size(1);
279        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
280    }
281
282    #[test_log::test(tokio::test)]
283    #[ignore] // This test is quite slow in debug mode
284    async fn test_jumbo_string() {
285        // This is an overflow test.  We have a list of lists where each list
286        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
287        // offsets range
288        let mut string_builder = LargeStringBuilder::new();
289        // a 1 MiB string
290        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
291        for _ in 0..5000 {
292            string_builder.append_option(Some(&giant_string));
293        }
294        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
295        let arrs = vec![giant_array];
296
297        // // We can't validate because our validation relies on concatenating all input arrays
298        let test_cases = TestCases::default().without_validation();
299        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
300    }
301
302    struct FixedWidthCloningPageDecoder {
303        data_block: FixedWidthDataBlock,
304    }
305
306    impl PrimitivePageDecoder for FixedWidthCloningPageDecoder {
307        // clone the given data block as decoded data block
308        fn decode(
309            &self,
310            _rows_to_skip: u64,
311            _num_rows: u64,
312        ) -> lance_core::error::Result<DataBlock> {
313            Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
314                data: self.data_block.data.deep_copy(),
315                bits_per_value: self.data_block.bits_per_value,
316                num_values: self.data_block.num_values,
317                block_info: self.data_block.block_info.clone(),
318            }))
319        }
320    }
321
322    #[test]
323    fn test_fixed_size_binary_decoder() {
324        let values: [u8; 6] = *b"aaabbb";
325        let num_values = 2u64;
326        let byte_width = 3;
327        let array_data = ArrayData::builder(DataType::FixedSizeBinary(byte_width))
328            .len(num_values as usize)
329            .add_buffer(Buffer::from(&values[..]))
330            .build()
331            .unwrap();
332        let fixed_size_binary_array = FixedSizeBinaryArray::from(array_data);
333        let arrays = vec![Arc::new(fixed_size_binary_array) as ArrayRef];
334        let fixed_width_data_block = DataBlock::from_arrays(&arrays, num_values);
335        assert_eq!(fixed_width_data_block.name(), "FixedWidth");
336
337        let bytes_decoder = FixedWidthCloningPageDecoder {
338            data_block: fixed_width_data_block.as_fixed_width().unwrap(),
339        };
340        let decoder = FixedSizeBinaryDecoder {
341            bytes_decoder: Box::new(bytes_decoder),
342            byte_width: byte_width as u64,
343            bytes_per_offset: 4, // 32-bits offset binary
344        };
345
346        let decoded_binary = decoder.decode(0, num_values).unwrap();
347        let maybe_data = decoded_binary.into_arrow(DataType::Utf8, true);
348        assert!(maybe_data.is_ok());
349        let data = maybe_data.unwrap();
350        let string_array = StringArray::from(data);
351        assert_eq!(string_array.len(), num_values as usize);
352        assert_eq!(string_array.value(0), "aaa");
353        assert_eq!(string_array.value(1), "bbb");
354    }
355}