use core::panic;
use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::ArrayRef;
use arrow_buffer::{bit_util, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
use bytemuck::{cast_slice, try_cast_slice};
use byteorder::{ByteOrder, LittleEndian};
use futures::TryFutureExt;
use lance_core::utils::bit::pad_bytes;
use snafu::{location, Location};
use futures::{future::BoxFuture, FutureExt};
use crate::decoder::{BlockDecompressor, LogicalPageDecoder, MiniBlockDecompressor};
use crate::encoder::{BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor};
use crate::encodings::logical::primitive::PrimitiveFieldDecoder;
use crate::buffer::LanceBuffer;
use crate::data::{
BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
};
use crate::format::ProtobufUtils;
use crate::{
decoder::{PageScheduler, PrimitivePageDecoder},
encoder::{ArrayEncoder, EncodedArray},
EncodingsIo,
};
use arrow_array::{PrimitiveArray, UInt64Array};
use arrow_schema::DataType;
use lance_core::{Error, Result};
use super::block_compress::{BufferCompressor, CompressionConfig, GeneralBufferCompressor};
struct IndicesNormalizer {
indices: Vec<u64>,
validity: BooleanBufferBuilder,
null_adjustment: u64,
}
impl IndicesNormalizer {
fn new(num_rows: u64, null_adjustment: u64) -> Self {
let mut indices = Vec::with_capacity(num_rows as usize);
indices.push(0);
Self {
indices,
validity: BooleanBufferBuilder::new(num_rows as usize),
null_adjustment,
}
}
fn normalize(&self, val: u64) -> (bool, u64) {
if val >= self.null_adjustment {
(false, val - self.null_adjustment)
} else {
(true, val)
}
}
fn extend(&mut self, new_indices: &PrimitiveArray<UInt64Type>, is_start: bool) {
let mut last = *self.indices.last().unwrap();
if is_start {
let (is_valid, val) = self.normalize(new_indices.value(0));
self.indices.push(val);
self.validity.append(is_valid);
last += val;
}
let mut prev = self.normalize(*new_indices.values().first().unwrap()).1;
for w in new_indices.values().windows(2) {
let (is_valid, val) = self.normalize(w[1]);
let next = val - prev + last;
self.indices.push(next);
self.validity.append(is_valid);
prev = val;
last = next;
}
}
fn into_parts(mut self) -> (Vec<u64>, BooleanBuffer) {
(self.indices, self.validity.finish())
}
}
#[derive(Debug)]
pub struct BinaryPageScheduler {
indices_scheduler: Arc<dyn PageScheduler>,
bytes_scheduler: Arc<dyn PageScheduler>,
offsets_type: DataType,
null_adjustment: u64,
}
impl BinaryPageScheduler {
pub fn new(
indices_scheduler: Arc<dyn PageScheduler>,
bytes_scheduler: Arc<dyn PageScheduler>,
offsets_type: DataType,
null_adjustment: u64,
) -> Self {
Self {
indices_scheduler,
bytes_scheduler,
offsets_type,
null_adjustment,
}
}
fn decode_indices(decoder: Arc<dyn PrimitivePageDecoder>, num_rows: u64) -> Result<ArrayRef> {
let mut primitive_wrapper =
PrimitiveFieldDecoder::new_from_data(decoder, DataType::UInt64, num_rows, false);
let drained_task = primitive_wrapper.drain(num_rows)?;
let indices_decode_task = drained_task.task;
indices_decode_task.decode()
}
}
struct IndirectData {
decoded_indices: UInt64Array,
offsets_type: DataType,
validity: BooleanBuffer,
bytes_decoder_fut: BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>,
}
impl PageScheduler for BinaryPageScheduler {
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u64>],
scheduler: &Arc<dyn EncodingsIo>,
top_level_row: u64,
) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
let indices_ranges = ranges
.iter()
.map(|range| {
if range.start != 0 {
(range.start - 1)..range.end
} else {
0..range.end
}
})
.collect::<Vec<std::ops::Range<u64>>>();
let indices_page_decoder =
self.indices_scheduler
.schedule_ranges(&indices_ranges, scheduler, top_level_row);
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let indices_num_rows = indices_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let ranges = ranges.to_vec();
let copy_scheduler = scheduler.clone();
let copy_bytes_scheduler = self.bytes_scheduler.clone();
let null_adjustment = self.null_adjustment;
let offsets_type = self.offsets_type.clone();
tokio::spawn(async move {
let indices_decoder = Arc::from(indices_page_decoder.await?);
let indices = Self::decode_indices(indices_decoder, indices_num_rows)?;
let decoded_indices = indices.as_primitive::<UInt64Type>();
let mut indices_builder = IndicesNormalizer::new(num_rows, null_adjustment);
let mut bytes_ranges = Vec::new();
let mut curr_offset_index = 0;
for curr_row_range in ranges.iter() {
let row_start = curr_row_range.start;
let curr_range_len = (curr_row_range.end - row_start) as usize;
let curr_indices;
if row_start == 0 {
curr_indices = decoded_indices.slice(0, curr_range_len);
curr_offset_index = curr_range_len;
} else {
curr_indices = decoded_indices.slice(curr_offset_index, curr_range_len + 1);
curr_offset_index += curr_range_len + 1;
}
let first = if row_start == 0 {
0
} else {
indices_builder
.normalize(*curr_indices.values().first().unwrap())
.1
};
let last = indices_builder
.normalize(*curr_indices.values().last().unwrap())
.1;
if first != last {
bytes_ranges.push(first..last);
}
indices_builder.extend(&curr_indices, row_start == 0);
}
let (indices, validity) = indices_builder.into_parts();
let decoded_indices = UInt64Array::from(indices);
let bytes_decoder_fut =
copy_bytes_scheduler.schedule_ranges(&bytes_ranges, ©_scheduler, top_level_row);
Ok(IndirectData {
decoded_indices,
validity,
offsets_type,
bytes_decoder_fut,
})
})
.map(|join_handle| join_handle.unwrap())
.and_then(|indirect_data| {
async move {
let bytes_decoder = indirect_data.bytes_decoder_fut.await?;
Ok(Box::new(BinaryPageDecoder {
decoded_indices: indirect_data.decoded_indices,
offsets_type: indirect_data.offsets_type,
validity: indirect_data.validity,
bytes_decoder,
}) as Box<dyn PrimitivePageDecoder>)
}
})
.boxed()
}
}
struct BinaryPageDecoder {
decoded_indices: UInt64Array,
offsets_type: DataType,
validity: BooleanBuffer,
bytes_decoder: Box<dyn PrimitivePageDecoder>,
}
impl PrimitivePageDecoder for BinaryPageDecoder {
fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
let target_validity = self
.validity
.slice(rows_to_skip as usize, num_rows as usize);
let has_nulls = target_validity.count_set_bits() < target_validity.len();
let validity_buffer = if has_nulls {
let num_validity_bits = arrow_buffer::bit_util::ceil(num_rows as usize, 8);
let mut validity_buffer = Vec::with_capacity(num_validity_bits);
if rows_to_skip == 0 {
validity_buffer.extend_from_slice(target_validity.inner().as_slice());
} else {
let target_validity = BooleanBuffer::from_iter(target_validity.iter());
validity_buffer.extend_from_slice(target_validity.inner().as_slice());
}
Some(validity_buffer)
} else {
None
};
let bytes_per_offset = match self.offsets_type {
DataType::Int32 => 4,
DataType::Int64 => 8,
_ => panic!("Unsupported offsets type"),
};
let target_offsets = self
.decoded_indices
.slice(rows_to_skip as usize, (num_rows + 1) as usize);
let target_vec = target_offsets.values();
let start = target_vec[0];
let offsets_buffer =
match bytes_per_offset {
4 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i32))
.into_inner(),
8 => ScalarBuffer::from_iter(target_vec.iter().map(|x| (x - start) as i64))
.into_inner(),
_ => panic!("Unsupported offsets type"),
};
let bytes_to_skip = self.decoded_indices.value(rows_to_skip as usize);
let num_bytes = self
.decoded_indices
.value((rows_to_skip + num_rows) as usize)
- bytes_to_skip;
let bytes = self.bytes_decoder.decode(bytes_to_skip, num_bytes)?;
let bytes = bytes.as_fixed_width().unwrap();
debug_assert_eq!(bytes.bits_per_value, 8);
let string_data = DataBlock::VariableWidth(VariableWidthBlock {
bits_per_offset: bytes_per_offset * 8,
data: bytes.data,
num_values: num_rows,
offsets: LanceBuffer::from(offsets_buffer),
block_info: BlockInfo::new(),
});
if let Some(validity) = validity_buffer {
Ok(DataBlock::Nullable(NullableDataBlock {
data: Box::new(string_data),
nulls: LanceBuffer::from(validity),
block_info: BlockInfo::new(),
}))
} else {
Ok(string_data)
}
}
}
#[derive(Debug)]
pub struct BinaryEncoder {
indices_encoder: Box<dyn ArrayEncoder>,
compression_config: Option<CompressionConfig>,
buffer_compressor: Option<Box<dyn BufferCompressor>>,
}
impl BinaryEncoder {
pub fn new(
indices_encoder: Box<dyn ArrayEncoder>,
compression_config: Option<CompressionConfig>,
) -> Self {
let buffer_compressor = compression_config.map(GeneralBufferCompressor::get_compressor);
Self {
indices_encoder,
compression_config,
buffer_compressor,
}
}
fn all_null_variable_width(data_type: &DataType, num_values: u64) -> VariableWidthBlock {
if matches!(data_type, DataType::Binary | DataType::Utf8) {
VariableWidthBlock {
bits_per_offset: 32,
data: LanceBuffer::empty(),
num_values,
offsets: LanceBuffer::reinterpret_vec(vec![0_u32; num_values as usize + 1]),
block_info: BlockInfo::new(),
}
} else {
VariableWidthBlock {
bits_per_offset: 64,
data: LanceBuffer::empty(),
num_values,
offsets: LanceBuffer::reinterpret_vec(vec![0_u64; num_values as usize + 1]),
block_info: BlockInfo::new(),
}
}
}
}
fn get_indices_from_string_arrays(
mut offsets: LanceBuffer,
bits_per_offset: u8,
nulls: Option<LanceBuffer>,
num_rows: usize,
) -> (DataBlock, u64) {
let mut indices = Vec::with_capacity(num_rows);
let mut last_offset = 0_u64;
if bits_per_offset == 32 {
let offsets = offsets.borrow_to_typed_slice::<i32>();
indices.extend(offsets.as_ref().windows(2).map(|w| {
let strlen = (w[1] - w[0]) as u64;
last_offset += strlen;
last_offset
}));
} else if bits_per_offset == 64 {
let offsets = offsets.borrow_to_typed_slice::<i64>();
indices.extend(offsets.as_ref().windows(2).map(|w| {
let strlen = (w[1] - w[0]) as u64;
last_offset += strlen;
last_offset
}));
}
if indices.is_empty() {
return (
DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: 64,
data: LanceBuffer::empty(),
num_values: 0,
block_info: BlockInfo::new(),
}),
0,
);
}
let last_offset = *indices.last().expect("Indices array is empty");
assert!(
last_offset < u64::MAX / 2,
"Indices array with strings up to 2^63 is too large for this encoding"
);
let null_adjustment: u64 = *indices.last().expect("Indices array is empty") + 1;
if let Some(nulls) = nulls {
let nulls = NullBuffer::new(BooleanBuffer::new(nulls.into_buffer(), 0, num_rows));
indices
.iter_mut()
.zip(nulls.iter())
.for_each(|(index, is_valid)| {
if !is_valid {
*index += null_adjustment;
}
});
}
let indices = DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: 64,
data: LanceBuffer::reinterpret_vec(indices),
num_values: num_rows as u64,
block_info: BlockInfo::new(),
});
(indices, null_adjustment)
}
impl ArrayEncoder for BinaryEncoder {
fn encode(
&self,
data: DataBlock,
data_type: &DataType,
buffer_index: &mut u32,
) -> Result<EncodedArray> {
let (mut data, nulls) = match data {
DataBlock::Nullable(nullable) => {
let data = nullable.data.as_variable_width().unwrap();
(data, Some(nullable.nulls))
}
DataBlock::VariableWidth(variable) => (variable, None),
DataBlock::AllNull(all_null) => {
let data = Self::all_null_variable_width(data_type, all_null.num_values);
let validity =
LanceBuffer::all_unset(bit_util::ceil(all_null.num_values as usize, 8));
(data, Some(validity))
}
_ => panic!("Expected variable width data block but got {}", data.name()),
};
let (indices, null_adjustment) = get_indices_from_string_arrays(
data.offsets,
data.bits_per_offset,
nulls,
data.num_values as usize,
);
let encoded_indices =
self.indices_encoder
.encode(indices, &DataType::UInt64, buffer_index)?;
let encoded_indices_data = encoded_indices.data.as_fixed_width().unwrap();
assert!(encoded_indices_data.bits_per_value <= 64);
if let Some(buffer_compressor) = &self.buffer_compressor {
let mut compressed_data = Vec::with_capacity(data.data.len());
buffer_compressor.compress(&data.data, &mut compressed_data)?;
data.data = LanceBuffer::Owned(compressed_data);
}
let data = DataBlock::VariableWidth(VariableWidthBlock {
bits_per_offset: encoded_indices_data.bits_per_value as u8,
offsets: encoded_indices_data.data,
data: data.data,
num_values: data.num_values,
block_info: BlockInfo::new(),
});
let bytes_buffer_index = *buffer_index;
*buffer_index += 1;
let bytes_encoding = ProtobufUtils::flat_encoding(
8,
bytes_buffer_index,
self.compression_config,
);
let encoding =
ProtobufUtils::binary(encoded_indices.encoding, bytes_encoding, null_adjustment);
Ok(EncodedArray { data, encoding })
}
}
#[derive(Debug, Default)]
pub struct BinaryMiniBlockEncoder {}
const AIM_MINICHUNK_SIZE: u32 = 4 * 1024;
const BINARY_MINIBLOCK_CHUNK_ALIGNMENT: usize = 4;
fn search_next_offset_idx(offsets: &[u32], last_offset_idx: usize) -> usize {
let mut num_values = 1;
let mut new_num_values = num_values * 2;
loop {
if last_offset_idx + new_num_values >= offsets.len() {
if (offsets[offsets.len() - 1] - offsets[last_offset_idx])
+ (offsets.len() - last_offset_idx) as u32 * 4
<= AIM_MINICHUNK_SIZE
{
return offsets.len() - 1;
} else {
return last_offset_idx + num_values;
}
}
if ((offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx])
+ ((new_num_values + 1) * 4) as u32)
<= AIM_MINICHUNK_SIZE
{
num_values = new_num_values;
new_num_values *= 2;
} else {
break;
}
}
last_offset_idx + num_values
}
impl BinaryMiniBlockEncoder {
fn chunk_data(
&self,
mut data: VariableWidthBlock,
) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
assert!(data.bits_per_offset == 32);
let offsets = data.offsets.borrow_to_typed_slice::<u32>();
let offsets = offsets.as_ref();
assert!(offsets.len() > 1);
#[derive(Debug)]
struct ChunkInfo {
chunk_start_offset_in_orig_idx: usize,
chunk_last_offset_in_orig_idx: usize,
bytes_start_offset: usize,
padded_chunk_size: usize,
}
let mut chunks_info = vec![];
let mut chunks = vec![];
let mut last_offset_in_orig_idx = 0;
const CHUNK_PAD_BUFFER: [u8; BINARY_MINIBLOCK_CHUNK_ALIGNMENT] =
[72; BINARY_MINIBLOCK_CHUNK_ALIGNMENT];
loop {
let this_last_offset_in_orig_idx =
search_next_offset_idx(offsets, last_offset_in_orig_idx);
if this_last_offset_in_orig_idx == offsets.len() - 1 {
let num_values_in_this_chunk =
this_last_offset_in_orig_idx - last_offset_in_orig_idx;
let this_chunk_size = (num_values_in_this_chunk + 1) * 4
+ (offsets[offsets.len() - 1] - offsets[last_offset_in_orig_idx]) as usize;
let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
chunks_info.push(ChunkInfo {
chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
bytes_start_offset: this_chunk_bytes_start_offset,
padded_chunk_size,
});
chunks.push(MiniBlockChunk {
log_num_values: 0,
num_bytes: padded_chunk_size as u16,
});
break;
} else {
let num_values_in_this_chunk =
this_last_offset_in_orig_idx - last_offset_in_orig_idx;
let this_chunk_size = (num_values_in_this_chunk + 1) * 4
+ (offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx])
as usize;
let padded_chunk_size = ((this_chunk_size + 3) / 4) * 4;
let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * 4;
chunks_info.push(ChunkInfo {
chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
bytes_start_offset: this_chunk_bytes_start_offset,
padded_chunk_size,
});
chunks.push(MiniBlockChunk {
log_num_values: num_values_in_this_chunk.trailing_zeros() as u8,
num_bytes: padded_chunk_size as u16,
});
last_offset_in_orig_idx = this_last_offset_in_orig_idx;
}
}
let output_total_bytes = chunks_info
.iter()
.map(|chunk_info| chunk_info.padded_chunk_size)
.sum::<usize>();
let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
for chunk in chunks_info {
let this_chunk_offsets = offsets
[chunk.chunk_start_offset_in_orig_idx..chunk.chunk_last_offset_in_orig_idx + 1]
.iter()
.map(|offset| {
offset - offsets[chunk.chunk_start_offset_in_orig_idx]
+ chunk.bytes_start_offset as u32
})
.collect::<Vec<_>>();
output.extend_from_slice(cast_slice(&this_chunk_offsets));
let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx];
let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx];
output.extend_from_slice(&data.data[start_in_orig as usize..end_in_orig as usize]);
output.extend_from_slice(
&CHUNK_PAD_BUFFER[..pad_bytes::<BINARY_MINIBLOCK_CHUNK_ALIGNMENT>(output.len())],
);
}
(
MiniBlockCompressed {
data: LanceBuffer::reinterpret_vec(output),
chunks,
num_values: data.num_values,
},
ProtobufUtils::binary_miniblock(),
)
}
}
impl MiniBlockCompressor for BinaryMiniBlockEncoder {
fn compress(
&self,
data: DataBlock,
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
match data {
DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
_ => Err(Error::InvalidInput {
source: format!(
"Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
data.name()
)
.into(),
location: location!(),
}),
}
}
}
#[derive(Debug, Default)]
pub struct BinaryMiniBlockDecompressor {}
impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
assert!(data.len() >= 8);
let offsets: &[u32] = try_cast_slice(&data)
.expect("casting buffer failed during BinaryMiniBlock decompression");
let result_offsets = offsets[0..(num_values + 1) as usize]
.iter()
.map(|offset| offset - offsets[0])
.collect::<Vec<u32>>();
Ok(DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::Owned(
data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
),
offsets: LanceBuffer::reinterpret_vec(result_offsets),
bits_per_offset: 32,
num_values,
block_info: BlockInfo::new(),
}))
}
}
#[derive(Debug, Default)]
pub struct BinaryBlockEncoder {}
impl BlockCompressor for BinaryBlockEncoder {
fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
let num_values: u32 = data
.num_values()
.try_into()
.expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
match data {
DataBlock::VariableWidth(mut variable_width_data) => {
if variable_width_data.bits_per_offset != 32 {
panic!("BinaryBlockEncoder only works with 32 bits per offset VariableWidth DataBlock.");
}
let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
let offsets = offsets.as_ref();
let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
let output_total_bytes =
bytes_start_offset as usize + variable_width_data.data.len();
let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
output.extend_from_slice(&(num_values).to_le_bytes());
output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
output.extend_from_slice(cast_slice(offsets));
output.extend_from_slice(&variable_width_data.data);
Ok(LanceBuffer::Owned(output))
}
_ => {
panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
}
}
}
}
#[derive(Debug, Default)]
pub struct BinaryBlockDecompressor {}
impl BlockDecompressor for BinaryBlockDecompressor {
fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
let num_values = LittleEndian::read_u32(&data[..4]) as u64;
let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
let offsets = data.slice_with_length(8, bytes_start_offset as usize - 8);
let data = data.slice_with_length(
bytes_start_offset as usize,
data.len() - bytes_start_offset as usize,
);
Ok(DataBlock::VariableWidth(VariableWidthBlock {
data,
offsets,
bits_per_offset: 32,
num_values,
block_info: BlockInfo::new(),
}))
}
}
#[cfg(test)]
pub mod tests {
use arrow_array::{
builder::{LargeStringBuilder, StringBuilder},
ArrayRef, StringArray,
};
use arrow_schema::{DataType, Field};
use rstest::rstest;
use std::{collections::HashMap, sync::Arc, vec};
use crate::{
buffer::LanceBuffer,
data::DataBlock,
testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
version::LanceFileVersion,
};
use super::get_indices_from_string_arrays;
#[rstest]
#[test_log::test(tokio::test)]
async fn test_utf8_binary(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let field = Field::new("", DataType::Utf8, false);
check_round_trip_encoding_random(field, version).await;
}
#[test]
fn test_encode_indices_adjusts_nulls() {
let string_array = Arc::new(StringArray::from(vec![
None,
Some("foo"),
Some("foo"),
None,
None,
None,
])) as ArrayRef;
let string_data = DataBlock::from(string_array).as_nullable().unwrap();
let nulls = string_data.nulls;
let string_data = string_data.data.as_variable_width().unwrap();
let (indices, null_adjustment) = get_indices_from_string_arrays(
string_data.offsets,
string_data.bits_per_offset,
Some(nulls),
string_data.num_values as usize,
);
let indices = indices.as_fixed_width().unwrap();
assert_eq!(indices.bits_per_value, 64);
assert_eq!(
indices.data,
LanceBuffer::reinterpret_vec(vec![7_u64, 3, 6, 13, 13, 13])
);
assert_eq!(null_adjustment, 7);
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_binary(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let field = Field::new("", DataType::Binary, false);
check_round_trip_encoding_random(field, version).await;
}
#[test_log::test(tokio::test)]
async fn test_large_binary() {
let field = Field::new("", DataType::LargeBinary, true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_large_utf8() {
let field = Field::new("", DataType::LargeUtf8, true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_utf8_binary(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![0, 1, 3, 4])
.with_file_version(version);
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&test_cases,
HashMap::new(),
)
.await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_sliced_utf8(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
let string_array = string_array.slice(1, 3);
let test_cases = TestCases::default()
.with_range(0..1)
.with_range(0..2)
.with_range(1..2)
.with_file_version(version);
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&test_cases,
HashMap::new(),
)
.await;
}
#[test_log::test(tokio::test)]
async fn test_bigger_than_max_page_size() {
let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
let string_array = StringArray::from(vec![
Some(big_string),
Some("abc".to_string()),
None,
None,
Some("xyz".to_string()),
]);
let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&test_cases,
HashMap::new(),
)
.await;
let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
check_round_trip_encoding_of_data(
vec![Arc::new(string_array)],
&TestCases::default(),
HashMap::new(),
)
.await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_empty_strings(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let values = [Some("abc"), Some(""), None];
for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
let mut string_builder = StringBuilder::new();
for idx in order {
string_builder.append_option(values[idx]);
}
let string_array = Arc::new(string_builder.finish());
let test_cases = TestCases::default()
.with_indices(vec![1])
.with_indices(vec![0])
.with_indices(vec![2])
.with_indices(vec![0, 1])
.with_file_version(version);
check_round_trip_encoding_of_data(
vec![string_array.clone()],
&test_cases,
HashMap::new(),
)
.await;
let test_cases = test_cases.with_batch_size(1);
check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
.await;
}
let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
.await;
let test_cases = test_cases.with_batch_size(1);
check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
}
#[test_log::test(tokio::test)]
#[ignore] async fn test_jumbo_string() {
let mut string_builder = LargeStringBuilder::new();
let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
for _ in 0..5000 {
string_builder.append_option(Some(&giant_string));
}
let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
let arrs = vec![giant_array];
let test_cases = TestCases::default().without_validation();
check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_binary_miniblock(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let field = Field::new("", DataType::Utf8, false);
check_round_trip_encoding_random(field, version).await;
}
#[test_log::test(tokio::test)]
async fn test_binary_dictionary_encoding() {
let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
let strings = [
"Hal Abelson",
"Charles Babbage",
"Vint Cerf",
"Jim Gray",
"Alonzo Church",
"Edgar F. Codd",
];
let repeated_strings: Vec<_> = strings
.iter()
.cycle()
.take(strings.len() * 10000)
.cloned()
.collect();
let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
}
}