lance_encoding/encodings/physical/
struct_encoding.rsuse arrow::datatypes::UInt64Type;
use lance_core::{Error, Result};
use snafu::{location, Location};
use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
decoder::MiniBlockDecompressor,
encoder::{MiniBlockCompressed, MiniBlockCompressor},
format::{
pb::{self},
ProtobufUtils,
},
statistics::{GetStat, Stat},
};
use super::value::{ValueDecompressor, ValueEncoder};
fn struct_data_block_to_fixed_width_data_block(
struct_data_block: StructDataBlock,
bits_per_values: &[u32],
) -> DataBlock {
let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
let mut output = Vec::with_capacity(data_size as usize);
let num_values = struct_data_block.children[0].num_values();
for i in 0..num_values as usize {
for (j, child) in struct_data_block.children.iter().enumerate() {
let bytes_per_value = (bits_per_values[j] / 8) as usize;
let this_data = child
.as_fixed_width_ref()
.unwrap()
.data
.slice_with_length(bytes_per_value * i, bytes_per_value);
output.extend_from_slice(&this_data);
}
}
DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: bits_per_values
.iter()
.map(|bits_per_value| *bits_per_value as u64)
.sum(),
data: LanceBuffer::Owned(output),
num_values,
block_info: BlockInfo::default(),
})
}
#[derive(Debug, Default)]
pub struct PackedStructFixedWidthMiniBlockEncoder {}
impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
fn compress(
&self,
data: DataBlock,
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
match data {
DataBlock::Struct(struct_data_block) => {
let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::<Vec<_>>();
let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
let (value_miniblock_compressed, value_array_encoding) =
value_miniblock_compressor.compress(data_block)?;
Ok((
value_miniblock_compressed,
ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values),
))
}
_ => Err(Error::InvalidInput {
source: format!(
"Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
data.name()
)
.into(),
location: location!(),
}),
}
}
}
#[derive(Debug)]
pub struct PackedStructFixedWidthMiniBlockDecompressor {
bits_per_values: Vec<u32>,
array_encoding: Box<dyn MiniBlockDecompressor>,
}
impl PackedStructFixedWidthMiniBlockDecompressor {
pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self {
let array_encoding: Box<dyn MiniBlockDecompressor> = match description
.flat
.as_ref()
.unwrap()
.array_encoding
.as_ref()
.unwrap()
{
pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::new(flat)),
_ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
};
Self {
bits_per_values: description.bits_per_values.clone(),
array_encoding,
}
}
}
impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
panic!("ValueDecompressor should output FixedWidth DataBlock")
};
let bytes_per_values = self
.bits_per_values
.iter()
.map(|bits_per_value| *bits_per_value as usize / 8)
.collect::<Vec<_>>();
assert!(encoded_data_block.bits_per_value % 8 == 0);
let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
let mut prefix_sum = vec![0; self.bits_per_values.len()];
for i in 0..(self.bits_per_values.len() - 1) {
prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
}
let mut children_data_block = vec![];
for i in 0..self.bits_per_values.len() {
let child_buf_size = bytes_per_values[i] * num_values as usize;
let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
for j in 0..num_values as usize {
let this_value = encoded_data_block.data.slice_with_length(
prefix_sum[i] + (j * encoded_bytes_per_row),
bytes_per_values[i],
);
child_buf.extend_from_slice(&this_value);
}
let child = DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::Owned(child_buf),
bits_per_value: self.bits_per_values[i] as u64,
num_values,
block_info: BlockInfo::default(),
});
children_data_block.push(child);
}
Ok(DataBlock::Struct(StructDataBlock {
children: children_data_block,
block_info: BlockInfo::default(),
}))
}
}