1use arrow_buffer::bit_util;
5use arrow_schema::DataType;
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9use snafu::location;
10use std::ops::Range;
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::LanceBuffer;
14use crate::data::{
15 BlockInfo, ConstantDataBlock, DataBlock, FixedSizeListBlock, FixedWidthDataBlock,
16};
17use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor};
18use crate::encoder::{
19 BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
20 PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES,
21};
22use crate::format::pb::{self, ArrayEncoding};
23use crate::format::ProtobufUtils;
24use crate::{
25 decoder::{PageScheduler, PrimitivePageDecoder},
26 encoder::{ArrayEncoder, EncodedArray},
27 EncodingsIo,
28};
29
30use lance_core::{Error, Result};
31
32use super::block_compress::{CompressionConfig, CompressionScheme, GeneralBufferCompressor};
33
34#[derive(Debug, Clone, Copy)]
36pub struct ValuePageScheduler {
37 bytes_per_value: u64,
40 buffer_offset: u64,
41 buffer_size: u64,
42 compression_config: CompressionConfig,
43}
44
45impl ValuePageScheduler {
46 pub fn new(
47 bytes_per_value: u64,
48 buffer_offset: u64,
49 buffer_size: u64,
50 compression_config: CompressionConfig,
51 ) -> Self {
52 Self {
53 bytes_per_value,
54 buffer_offset,
55 buffer_size,
56 compression_config,
57 }
58 }
59}
60
61impl PageScheduler for ValuePageScheduler {
62 fn schedule_ranges(
63 &self,
64 ranges: &[std::ops::Range<u64>],
65 scheduler: &Arc<dyn EncodingsIo>,
66 top_level_row: u64,
67 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
68 let (mut min, mut max) = (u64::MAX, 0);
69 let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
70 ranges
71 .iter()
72 .map(|range| {
73 let start = self.buffer_offset + (range.start * self.bytes_per_value);
74 let end = self.buffer_offset + (range.end * self.bytes_per_value);
75 min = min.min(start);
76 max = max.max(end);
77 start..end
78 })
79 .collect::<Vec<_>>()
80 } else {
81 min = self.buffer_offset;
82 max = self.buffer_offset + self.buffer_size;
83 vec![Range {
86 start: min,
87 end: max,
88 }]
89 };
90
91 trace!(
92 "Scheduling I/O for {} ranges spread across byte range {}..{}",
93 byte_ranges.len(),
94 min,
95 max
96 );
97 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
98 let bytes_per_value = self.bytes_per_value;
99
100 let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
101 ranges
102 .iter()
103 .map(|range| {
104 let start = (range.start * bytes_per_value) as usize;
105 let end = (range.end * bytes_per_value) as usize;
106 start..end
107 })
108 .collect::<Vec<_>>()
109 } else {
110 vec![]
111 };
112
113 let compression_config = self.compression_config;
114 async move {
115 let bytes = bytes.await?;
116
117 Ok(Box::new(ValuePageDecoder {
118 bytes_per_value,
119 data: bytes,
120 uncompressed_data: Arc::new(Mutex::new(None)),
121 uncompressed_range_offsets: range_offsets,
122 compression_config,
123 }) as Box<dyn PrimitivePageDecoder>)
124 }
125 .boxed()
126 }
127}
128
129struct ValuePageDecoder {
130 bytes_per_value: u64,
131 data: Vec<Bytes>,
132 uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
133 uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
134 compression_config: CompressionConfig,
135}
136
137impl ValuePageDecoder {
138 fn decompress(&self) -> Result<Vec<Bytes>> {
139 let bytes_u8: Vec<u8> = self.data[0].to_vec();
141 let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config);
142 let mut uncompressed_bytes: Vec<u8> = Vec::new();
143 buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
144
145 let mut bytes_in_ranges: Vec<Bytes> =
146 Vec::with_capacity(self.uncompressed_range_offsets.len());
147 for range in &self.uncompressed_range_offsets {
148 let start = range.start;
149 let end = range.end;
150 bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
151 }
152 Ok(bytes_in_ranges)
153 }
154
155 fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
156 let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
157 if uncompressed_bytes.is_none() {
158 *uncompressed_bytes = Some(self.decompress()?);
159 }
160 Ok(Arc::clone(&self.uncompressed_data))
161 }
162
163 fn is_compressed(&self) -> bool {
164 !self.uncompressed_range_offsets.is_empty()
165 }
166
167 fn decode_buffers<'a>(
168 &'a self,
169 buffers: impl IntoIterator<Item = &'a Bytes>,
170 mut bytes_to_skip: u64,
171 mut bytes_to_take: u64,
172 ) -> LanceBuffer {
173 let mut dest: Option<Vec<u8>> = None;
174
175 for buf in buffers.into_iter() {
176 let buf_len = buf.len() as u64;
177 if bytes_to_skip > buf_len {
178 bytes_to_skip -= buf_len;
179 } else {
180 let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
181 bytes_to_take -= bytes_to_take_here;
182 let start = bytes_to_skip as usize;
183 let end = start + bytes_to_take_here as usize;
184 let slice = buf.slice(start..end);
185 match (&mut dest, bytes_to_take) {
186 (None, 0) => {
187 return LanceBuffer::from_bytes(slice, self.bytes_per_value);
190 }
191 (None, _) => {
192 dest.replace(Vec::with_capacity(bytes_to_take as usize));
193 }
194 _ => {}
195 }
196 dest.as_mut().unwrap().extend_from_slice(&slice);
197 bytes_to_skip = 0;
198 }
199 }
200 LanceBuffer::from(dest.unwrap_or_default())
201 }
202}
203
204impl PrimitivePageDecoder for ValuePageDecoder {
205 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
206 let bytes_to_skip = rows_to_skip * self.bytes_per_value;
207 let bytes_to_take = num_rows * self.bytes_per_value;
208
209 let data_buffer = if self.is_compressed() {
210 let decoding_data = self.get_uncompressed_bytes()?;
211 let buffers = decoding_data.lock().unwrap();
212 self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
213 } else {
214 self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
215 };
216 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
217 bits_per_value: self.bytes_per_value * 8,
218 data: data_buffer,
219 num_values: num_rows,
220 block_info: BlockInfo::new(),
221 }))
222 }
223}
224
225#[derive(Debug, Default)]
227pub struct ValueEncoder {}
228
229impl ValueEncoder {
230 fn find_log_vals_per_chunk(bytes_per_value: u64) -> (u64, u64) {
232 let mut size_bytes = 2 * bytes_per_value;
233 let mut log_num_vals = 1;
234 let mut num_vals = 2;
235
236 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
238
239 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals < MAX_MINIBLOCK_VALUES {
240 log_num_vals += 1;
241 size_bytes *= 2;
242 num_vals *= 2;
243 }
244
245 (log_num_vals, num_vals)
246 }
247
248 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
249 debug_assert!(data.bits_per_value % 8 == 0);
251 let bytes_per_value = data.bits_per_value / 8;
252
253 let (log_vals_per_chunk, vals_per_chunk) = Self::find_log_vals_per_chunk(bytes_per_value);
255 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
256 let bytes_per_chunk = bytes_per_value * vals_per_chunk;
257 let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
258
259 let data_buffer = data.data;
260
261 let mut row_offset = 0;
262 let mut chunks = Vec::with_capacity(num_chunks);
263
264 let mut bytes_counter = 0;
265 loop {
266 if row_offset + vals_per_chunk <= data.num_values {
267 chunks.push(MiniBlockChunk {
268 log_num_values: log_vals_per_chunk as u8,
269 num_bytes: bytes_per_chunk,
270 });
271 row_offset += vals_per_chunk;
272 bytes_counter += bytes_per_chunk as u64;
273 } else {
274 let num_bytes = data_buffer.len() as u64 - bytes_counter;
276 let num_bytes = u16::try_from(num_bytes).unwrap();
277 chunks.push(MiniBlockChunk {
278 log_num_values: 0,
279 num_bytes,
280 });
281 break;
282 }
283 }
284
285 MiniBlockCompressed {
286 chunks,
287 data: data_buffer,
288 num_values: data.num_values,
289 }
290 }
291
292 fn make_fsl_encoding(data: &FixedSizeListBlock) -> ArrayEncoding {
293 let inner_encoding = match data.child.as_ref() {
294 DataBlock::FixedWidth(fixed_width) => {
295 ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
296 }
297 DataBlock::FixedSizeList(fsl) => Self::make_fsl_encoding(fsl),
298 _ => unreachable!(),
299 };
300 ProtobufUtils::fixed_size_list(inner_encoding, data.dimension)
301 }
302}
303
304impl BlockCompressor for ValueEncoder {
305 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
306 let data = match data {
307 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
308 _ => unimplemented!(
309 "Cannot compress block of type {} with ValueEncoder",
310 data.name()
311 ),
312 };
313 Ok(data)
314 }
315}
316
317impl ArrayEncoder for ValueEncoder {
318 fn encode(
319 &self,
320 data: DataBlock,
321 _data_type: &DataType,
322 buffer_index: &mut u32,
323 ) -> Result<EncodedArray> {
324 let index = *buffer_index;
325 *buffer_index += 1;
326
327 let encoding = match &data {
328 DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
329 fixed_width.bits_per_value,
330 index,
331 None,
332 )),
333 _ => Err(Error::InvalidInput {
334 source: format!(
335 "Cannot encode a data block of type {} with ValueEncoder",
336 data.name()
337 )
338 .into(),
339 location: location!(),
340 }),
341 }?;
342 Ok(EncodedArray { data, encoding })
343 }
344}
345
346impl MiniBlockCompressor for ValueEncoder {
347 fn compress(
348 &self,
349 chunk: DataBlock,
350 ) -> Result<(
351 crate::encoder::MiniBlockCompressed,
352 crate::format::pb::ArrayEncoding,
353 )> {
354 match chunk {
355 DataBlock::FixedWidth(fixed_width) => {
356 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
357 Ok((Self::chunk_data(fixed_width), encoding))
358 }
359 DataBlock::FixedSizeList(mut fixed_size_list) => {
360 let flattened = fixed_size_list.flatten_as_fixed();
361 let encoding = Self::make_fsl_encoding(&fixed_size_list);
362 Ok((Self::chunk_data(flattened), encoding))
363 }
364 _ => Err(Error::InvalidInput {
365 source: format!(
366 "Cannot compress a data block of type {} with ValueEncoder",
367 chunk.name()
368 )
369 .into(),
370 location: location!(),
371 }),
372 }
373 }
374}
375
376#[derive(Debug)]
378pub struct ConstantDecompressor {
379 scalar: LanceBuffer,
380 num_values: u64,
381}
382
383impl ConstantDecompressor {
384 pub fn new(scalar: LanceBuffer, num_values: u64) -> Self {
385 Self {
386 scalar: scalar.into_borrowed(),
387 num_values,
388 }
389 }
390}
391
392impl BlockDecompressor for ConstantDecompressor {
393 fn decompress(&self, _data: LanceBuffer) -> Result<DataBlock> {
394 Ok(DataBlock::Constant(ConstantDataBlock {
395 data: self.scalar.try_clone().unwrap(),
396 num_values: self.num_values,
397 }))
398 }
399}
400
401#[derive(Debug)]
404pub struct ValueDecompressor {
405 bytes_per_value: u64,
406}
407
408impl ValueDecompressor {
409 pub fn new(description: &pb::Flat) -> Self {
410 assert!(description.bits_per_value % 8 == 0);
411 Self {
412 bytes_per_value: description.bits_per_value / 8,
413 }
414 }
415
416 fn buffer_to_block(&self, data: LanceBuffer) -> DataBlock {
417 DataBlock::FixedWidth(FixedWidthDataBlock {
418 bits_per_value: self.bytes_per_value * 8,
419 num_values: data.len() as u64 / self.bytes_per_value,
420 data,
421 block_info: BlockInfo::new(),
422 })
423 }
424}
425
426impl BlockDecompressor for ValueDecompressor {
427 fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
428 Ok(self.buffer_to_block(data))
429 }
430}
431
432impl MiniBlockDecompressor for ValueDecompressor {
433 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
434 assert_eq!(data.len() as u64, num_values * self.bytes_per_value);
435 Ok(self.buffer_to_block(data))
436 }
437}
438
439impl FixedPerValueDecompressor for ValueDecompressor {
440 fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock> {
441 Ok(DataBlock::FixedWidth(data))
442 }
443
444 fn bits_per_value(&self) -> u64 {
445 self.bytes_per_value * 8
446 }
447}
448
449impl PerValueCompressor for ValueEncoder {
450 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
451 let (data, encoding) = match data {
452 DataBlock::FixedWidth(fixed_width) => {
453 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
454 (PerValueDataBlock::Fixed(fixed_width), encoding)
455 }
456 _ => unimplemented!(
457 "Cannot compress block of type {} with ValueEncoder",
458 data.name()
459 ),
460 };
461 Ok((data, encoding))
462 }
463}
464
465#[cfg(test)]
467pub(crate) mod tests {
468 use std::{collections::HashMap, sync::Arc};
469
470 use arrow_array::{Array, ArrayRef, Decimal128Array, Int32Array};
471 use arrow_schema::{DataType, Field, TimeUnit};
472 use rstest::rstest;
473
474 use crate::{
475 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
476 version::LanceFileVersion,
477 };
478
479 const PRIMITIVE_TYPES: &[DataType] = &[
480 DataType::Null,
481 DataType::FixedSizeBinary(2),
482 DataType::Date32,
483 DataType::Date64,
484 DataType::Int8,
485 DataType::Int16,
486 DataType::Int32,
487 DataType::Int64,
488 DataType::UInt8,
489 DataType::UInt16,
490 DataType::UInt32,
491 DataType::UInt64,
492 DataType::Float16,
493 DataType::Float32,
494 DataType::Float64,
495 DataType::Decimal128(10, 10),
496 DataType::Decimal256(10, 10),
497 DataType::Timestamp(TimeUnit::Nanosecond, None),
498 DataType::Time32(TimeUnit::Second),
499 DataType::Time64(TimeUnit::Nanosecond),
500 DataType::Duration(TimeUnit::Second),
501 ];
505
506 #[rstest]
507 #[test_log::test(tokio::test)]
508 async fn test_value_primitive(
509 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
510 ) {
511 for data_type in PRIMITIVE_TYPES {
512 log::info!("Testing encoding for {:?}", data_type);
513 let field = Field::new("", data_type.clone(), false);
514 check_round_trip_encoding_random(field, version).await;
515 }
516 }
517
518 lazy_static::lazy_static! {
519 static ref LARGE_TYPES: Vec<DataType> = vec![DataType::FixedSizeList(
520 Arc::new(Field::new("", DataType::Int32, false)),
521 128,
522 )];
523 }
524
525 #[rstest]
526 #[test_log::test(tokio::test)]
527 async fn test_large_primitive(
528 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
529 ) {
530 for data_type in LARGE_TYPES.iter() {
531 log::info!("Testing encoding for {:?}", data_type);
532 let field = Field::new("", data_type.clone(), false);
533 check_round_trip_encoding_random(field, version).await;
534 }
535 }
536
537 #[test_log::test(tokio::test)]
538 async fn test_decimal128_dictionary_encoding() {
539 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
540 let decimals: Vec<i32> = (0..100).collect();
541 let repeated_strings: Vec<_> = decimals
542 .iter()
543 .cycle()
544 .take(decimals.len() * 10000)
545 .map(|&v| Some(v as i128))
546 .collect();
547 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
548 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
549 }
550
551 #[test_log::test(tokio::test)]
552 async fn test_miniblock_stress() {
553 let data1 = (0..100)
557 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
558 .collect::<Vec<_>>();
559
560 let data2 = (0..100)
562 .map(|_| {
563 Arc::new(Int32Array::from_iter((0..100).map(|i| {
564 if i % 2 == 0 {
565 Some(i)
566 } else {
567 None
568 }
569 }))) as Arc<dyn Array>
570 })
571 .collect::<Vec<_>>();
572
573 let _data3 = (0..100)
576 .map(|chunk_idx| {
577 Arc::new(Int32Array::from_iter((0..100).map(|i| {
578 if chunk_idx < 50 {
579 None
580 } else {
581 Some(i)
582 }
583 }))) as Arc<dyn Array>
584 })
585 .collect::<Vec<_>>();
586
587 for data in [data1, data2 ] {
588 for batch_size in [10, 100, 1500, 15000] {
589 let test_cases = TestCases::default()
591 .with_page_sizes(vec![1000, 2000, 3000, 60000])
592 .with_batch_size(batch_size)
593 .with_file_version(LanceFileVersion::V2_1);
594
595 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
596 }
597 }
598 }
599}