lance_encoding/encodings/physical/
fixed_size_binary.rs1use std::sync::Arc;
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9use lance_core::Result;
10
11use crate::{
12 buffer::LanceBuffer,
13 data::{BlockInfo, DataBlock, FixedWidthDataBlock, VariableWidthBlock},
14 decoder::{PageScheduler, PrimitivePageDecoder},
15 encoder::{ArrayEncoder, EncodedArray},
16 format::ProtobufUtils,
17 EncodingsIo,
18};
19
20#[derive(Debug)]
22pub struct FixedSizeBinaryPageScheduler {
23 bytes_scheduler: Box<dyn PageScheduler>,
24 byte_width: u32,
25 bytes_per_offset: u32,
26}
27
28impl FixedSizeBinaryPageScheduler {
29 pub fn new(
30 bytes_scheduler: Box<dyn PageScheduler>,
31 byte_width: u32,
32 bytes_per_offset: u32,
33 ) -> Self {
34 Self {
35 bytes_scheduler,
36 byte_width,
37 bytes_per_offset,
38 }
39 }
40}
41
42impl PageScheduler for FixedSizeBinaryPageScheduler {
43 fn schedule_ranges(
44 &self,
45 ranges: &[std::ops::Range<u64>],
46 scheduler: &Arc<dyn EncodingsIo>,
47 top_level_row: u64,
48 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
49 let expanded_ranges = ranges
50 .iter()
51 .map(|range| {
52 (range.start * self.byte_width as u64)..(range.end * self.byte_width as u64)
53 })
54 .collect::<Vec<_>>();
55
56 let bytes_page_decoder =
57 self.bytes_scheduler
58 .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59
60 let byte_width = self.byte_width as u64;
61 let bytes_per_offset = self.bytes_per_offset;
62
63 async move {
64 let bytes_decoder = bytes_page_decoder.await?;
65 Ok(Box::new(FixedSizeBinaryDecoder {
66 bytes_decoder,
67 byte_width,
68 bytes_per_offset,
69 }) as Box<dyn PrimitivePageDecoder>)
70 }
71 .boxed()
72 }
73}
74
75pub struct FixedSizeBinaryDecoder {
76 bytes_decoder: Box<dyn PrimitivePageDecoder>,
77 byte_width: u64,
78 bytes_per_offset: u32,
79}
80
81impl PrimitivePageDecoder for FixedSizeBinaryDecoder {
82 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
83 let rows_to_skip = rows_to_skip * self.byte_width;
84 let num_bytes = num_rows * self.byte_width;
85 let bytes = self.bytes_decoder.decode(rows_to_skip, num_bytes)?;
86 let bytes = bytes.as_fixed_width().unwrap();
87 debug_assert_eq!(bytes.bits_per_value, self.byte_width * 8);
88
89 let offsets_buffer = match self.bytes_per_offset {
90 8 => {
91 let offsets_vec = (0..(num_rows + 1))
92 .map(|i| i * self.byte_width)
93 .collect::<Vec<_>>();
94
95 ScalarBuffer::from(offsets_vec).into_inner()
96 }
97 4 => {
98 let offsets_vec = (0..(num_rows as u32 + 1))
99 .map(|i| i * self.byte_width as u32)
100 .collect::<Vec<_>>();
101
102 ScalarBuffer::from(offsets_vec).into_inner()
103 }
104 _ => panic!("Unsupported offsets type"),
105 };
106
107 let string_data = DataBlock::VariableWidth(VariableWidthBlock {
108 bits_per_offset: (self.bytes_per_offset * 8) as u8,
109 data: bytes.data,
110 num_values: num_rows,
111 offsets: LanceBuffer::from(offsets_buffer),
112 block_info: BlockInfo::new(),
113 });
114
115 Ok(string_data)
116 }
117}
118
119#[derive(Debug)]
120pub struct FixedSizeBinaryEncoder {
121 bytes_encoder: Box<dyn ArrayEncoder>,
122 byte_width: usize,
123}
124
125impl FixedSizeBinaryEncoder {
126 pub fn new(bytes_encoder: Box<dyn ArrayEncoder>, byte_width: usize) -> Self {
127 Self {
128 bytes_encoder,
129 byte_width,
130 }
131 }
132}
133
134impl ArrayEncoder for FixedSizeBinaryEncoder {
135 fn encode(
136 &self,
137 data: DataBlock,
138 _data_type: &DataType,
139 buffer_index: &mut u32,
140 ) -> Result<EncodedArray> {
141 let bytes_data = data.as_variable_width().unwrap();
142 let fixed_data = DataBlock::FixedWidth(FixedWidthDataBlock {
143 bits_per_value: 8 * self.byte_width as u64,
144 data: bytes_data.data,
145 num_values: bytes_data.num_values,
146 block_info: BlockInfo::new(),
147 });
148
149 let encoded_data = self.bytes_encoder.encode(
150 fixed_data,
151 &DataType::FixedSizeBinary(self.byte_width as i32),
152 buffer_index,
153 )?;
154 let encoding =
155 ProtobufUtils::fixed_size_binary(encoded_data.encoding, self.byte_width as u32);
156
157 Ok(EncodedArray {
158 data: encoded_data.data,
159 encoding,
160 })
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use std::{collections::HashMap, sync::Arc};
167
168 use arrow::array::LargeStringBuilder;
169 use arrow_array::{Array, ArrayRef, FixedSizeBinaryArray, LargeStringArray, StringArray};
170 use arrow_buffer::Buffer;
171 use arrow_data::ArrayData;
172 use arrow_schema::{DataType, Field};
173
174 use crate::data::{DataBlock, FixedWidthDataBlock};
175 use crate::decoder::PrimitivePageDecoder;
176 use crate::encodings::physical::fixed_size_binary::FixedSizeBinaryDecoder;
177 use crate::{
178 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
179 version::LanceFileVersion,
180 };
181
182 #[test_log::test(tokio::test)]
183 async fn test_fixed_size_utf8_binary() {
184 let field = Field::new("", DataType::Utf8, false);
185 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
187 }
188
189 #[test_log::test(tokio::test)]
190 async fn test_fixed_size_binary() {
191 let field = Field::new("", DataType::Binary, false);
192 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
193 }
194
195 #[test_log::test(tokio::test)]
196 async fn test_fixed_size_large_binary() {
197 let field = Field::new("", DataType::LargeBinary, true);
198 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
199 }
200
201 #[test_log::test(tokio::test)]
202 async fn test_fixed_size_large_utf8() {
203 let field = Field::new("", DataType::LargeUtf8, true);
204 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
205 }
206
207 #[test_log::test(tokio::test)]
208 async fn test_simple_fixed_size_utf8() {
209 let string_array = StringArray::from(vec![
210 Some("abc"),
211 Some("def"),
212 Some("ghi"),
213 Some("jkl"),
214 Some("mno"),
215 ]);
216
217 let test_cases = TestCases::default()
218 .with_range(0..2)
219 .with_range(0..3)
220 .with_range(1..3)
221 .with_indices(vec![0, 1, 3, 4]);
222
223 check_round_trip_encoding_of_data(
224 vec![Arc::new(string_array)],
225 &test_cases,
226 HashMap::new(),
227 )
228 .await;
229 }
230
231 #[test_log::test(tokio::test)]
232 async fn test_simple_fixed_size_with_nulls_utf8() {
233 let string_array =
234 LargeStringArray::from(vec![Some("abc"), None, Some("ghi"), None, Some("mno")]);
235
236 let test_cases = TestCases::default()
237 .with_range(0..2)
238 .with_range(0..3)
239 .with_range(1..3)
240 .with_indices(vec![0, 1, 3, 4]);
241
242 check_round_trip_encoding_of_data(
243 vec![Arc::new(string_array)],
244 &test_cases,
245 HashMap::new(),
246 )
247 .await;
248 }
249
250 #[test_log::test(tokio::test)]
251 async fn test_fixed_size_sliced_utf8() {
252 let string_array = StringArray::from(vec![Some("abc"), Some("def"), None, Some("fgh")]);
253 let string_array = string_array.slice(1, 3);
254
255 let test_cases = TestCases::default()
256 .with_range(0..1)
257 .with_range(0..2)
258 .with_range(1..2);
259 check_round_trip_encoding_of_data(
260 vec![Arc::new(string_array)],
261 &test_cases,
262 HashMap::new(),
263 )
264 .await;
265 }
266
267 #[test_log::test(tokio::test)]
268 async fn test_fixed_size_empty_strings() {
269 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
274
275 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
276 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
277 .await;
278 let test_cases = test_cases.with_batch_size(1);
279 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
280 }
281
282 #[test_log::test(tokio::test)]
283 #[ignore] async fn test_jumbo_string() {
285 let mut string_builder = LargeStringBuilder::new();
289 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
291 for _ in 0..5000 {
292 string_builder.append_option(Some(&giant_string));
293 }
294 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
295 let arrs = vec![giant_array];
296
297 let test_cases = TestCases::default().without_validation();
299 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
300 }
301
302 struct FixedWidthCloningPageDecoder {
303 data_block: FixedWidthDataBlock,
304 }
305
306 impl PrimitivePageDecoder for FixedWidthCloningPageDecoder {
307 fn decode(
309 &self,
310 _rows_to_skip: u64,
311 _num_rows: u64,
312 ) -> lance_core::error::Result<DataBlock> {
313 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
314 data: self.data_block.data.deep_copy(),
315 bits_per_value: self.data_block.bits_per_value,
316 num_values: self.data_block.num_values,
317 block_info: self.data_block.block_info.clone(),
318 }))
319 }
320 }
321
322 #[test]
323 fn test_fixed_size_binary_decoder() {
324 let values: [u8; 6] = *b"aaabbb";
325 let num_values = 2u64;
326 let byte_width = 3;
327 let array_data = ArrayData::builder(DataType::FixedSizeBinary(byte_width))
328 .len(num_values as usize)
329 .add_buffer(Buffer::from(&values[..]))
330 .build()
331 .unwrap();
332 let fixed_size_binary_array = FixedSizeBinaryArray::from(array_data);
333 let arrays = vec![Arc::new(fixed_size_binary_array) as ArrayRef];
334 let fixed_width_data_block = DataBlock::from_arrays(&arrays, num_values);
335 assert_eq!(fixed_width_data_block.name(), "FixedWidth");
336
337 let bytes_decoder = FixedWidthCloningPageDecoder {
338 data_block: fixed_width_data_block.as_fixed_width().unwrap(),
339 };
340 let decoder = FixedSizeBinaryDecoder {
341 bytes_decoder: Box::new(bytes_decoder),
342 byte_width: byte_width as u64,
343 bytes_per_offset: 4, };
345
346 let decoded_binary = decoder.decode(0, num_values).unwrap();
347 let maybe_data = decoded_binary.into_arrow(DataType::Utf8, true);
348 assert!(maybe_data.is_ok());
349 let data = maybe_data.unwrap();
350 let string_array = StringArray::from(data);
351 assert_eq!(string_array.len(), num_values as usize);
352 assert_eq!(string_array.value(0), "aaa");
353 assert_eq!(string_array.value(1), "bbb");
354 }
355}