use std::{collections::VecDeque, ops::Range, sync::Arc};
use arrow_array::{
cast::AsArray,
new_empty_array,
types::{Int32Type, Int64Type, UInt64Type},
Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
use arrow_schema::{DataType, Field, Fields};
use futures::{future::BoxFuture, FutureExt};
use lance_arrow::list::ListArrayExt;
use log::trace;
use snafu::{location, Location};
use tokio::task::JoinHandle;
use lance_core::{cache::FileMetadataCache, Error, Result};
use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, FixedWidthDataBlock},
decoder::{
DecodeArrayTask, DecodeBatchScheduler, DecodedArray, FieldScheduler, FilterExpression,
ListPriorityRange, LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding,
PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
StructuralSchedulingJob,
},
encoder::{
ArrayEncoder, EncodeTask, EncodedArray, EncodedColumn, EncodedPage, FieldEncoder,
OutOfLineBuffers,
},
encodings::logical::r#struct::SimpleStructScheduler,
format::pb,
repdef::RepDefBuilder,
EncodingsIo,
};
use super::{primitive::AccumulationQueue, r#struct::SimpleStructDecoder};
#[derive(Debug)]
struct ListRequest {
num_lists: u64,
includes_extra_offset: bool,
null_offset_adjustment: u64,
items_offset: u64,
}
#[derive(Debug)]
struct ListRequestsIter {
list_requests: VecDeque<ListRequest>,
offsets_requests: Vec<Range<u64>>,
}
impl ListRequestsIter {
fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
let mut items_offset = 0;
let mut offsets_offset = 0;
let mut page_infos_iter = page_infos.iter();
let mut cur_page_info = page_infos_iter.next().unwrap();
let mut list_requests = VecDeque::new();
let mut offsets_requests = Vec::new();
for range in row_ranges {
let mut range = range.clone();
while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
offsets_offset += cur_page_info.offsets_in_page;
items_offset += cur_page_info.num_items_referenced_by_page;
cur_page_info = page_infos_iter.next().unwrap();
}
let mut includes_extra_offset = range.start != offsets_offset;
if includes_extra_offset {
offsets_requests.push(range.start - 1..range.end);
} else {
offsets_requests.push(range.clone());
}
while !range.is_empty() {
let end = offsets_offset + cur_page_info.offsets_in_page;
let last = end >= range.end;
let end = end.min(range.end);
list_requests.push_back(ListRequest {
num_lists: end - range.start,
includes_extra_offset,
null_offset_adjustment: cur_page_info.null_offset_adjustment,
items_offset,
});
includes_extra_offset = false;
range.start = end;
if !last {
offsets_offset += cur_page_info.offsets_in_page;
items_offset += cur_page_info.num_items_referenced_by_page;
cur_page_info = page_infos_iter.next().unwrap();
}
}
}
Self {
list_requests,
offsets_requests,
}
}
fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
let mut list_requests = Vec::new();
while num_offsets > 0 {
let req = self.list_requests.front_mut().unwrap();
if req.includes_extra_offset {
num_offsets -= 1;
debug_assert_ne!(num_offsets, 0);
}
if num_offsets >= req.num_lists {
num_offsets -= req.num_lists;
list_requests.push(self.list_requests.pop_front().unwrap());
} else {
let sub_req = ListRequest {
num_lists: num_offsets,
includes_extra_offset: req.includes_extra_offset,
null_offset_adjustment: req.null_offset_adjustment,
items_offset: req.items_offset,
};
list_requests.push(sub_req);
req.includes_extra_offset = false;
req.num_lists -= num_offsets;
num_offsets = 0;
}
}
list_requests
}
}
fn decode_offsets(
offsets: &dyn Array,
list_requests: &[ListRequest],
null_offset_adjustment: u64,
) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
let numeric_offsets = offsets.as_primitive::<UInt64Type>();
let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
normalized_offsets.push(0);
let mut last_normalized_offset = 0;
let offsets_values = numeric_offsets.values();
let mut item_ranges = VecDeque::new();
let mut offsets_offset: u32 = 0;
debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
for req in list_requests {
let num_lists = req.num_lists;
let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
if !req.includes_extra_offset {
let first_offset_idx = 0_usize;
let num_offsets = num_lists as usize;
let items_start = 0;
let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
let items_range = items_start..items_end;
(items_range, first_offset_idx, num_offsets)
} else {
let first_offset_idx = offsets_offset as usize;
let num_offsets = num_lists as usize + 1;
let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
let items_end =
offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
let items_range = items_start..items_end;
(items_range, first_offset_idx, num_offsets)
};
let validity_start = if !req.includes_extra_offset {
0
} else {
offsets_to_norm_start + 1
};
for off in offsets_values
.slice(validity_start, num_lists as usize)
.iter()
{
validity_buffer.append(*off < null_offset_adjustment);
}
if !req.includes_extra_offset {
let first_item = offsets_values[0] % null_offset_adjustment;
normalized_offsets.push(first_item);
last_normalized_offset = first_item;
}
normalized_offsets.extend(
offsets_values
.slice(offsets_to_norm_start, num_offsets_to_norm)
.windows(2)
.map(|w| {
let start = w[0] % null_offset_adjustment;
let end = w[1] % null_offset_adjustment;
if end < start {
panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
}
let length = end - start;
last_normalized_offset += length;
last_normalized_offset
}),
);
trace!(
"List offsets range of {} lists maps to item range {:?}",
num_lists,
items_range
);
offsets_offset += num_offsets_to_norm as u32;
if !items_range.is_empty() {
let items_range =
items_range.start + req.items_offset..items_range.end + req.items_offset;
item_ranges.push_back(items_range);
}
}
let validity = validity_buffer.finish();
(item_ranges, normalized_offsets, validity)
}
#[allow(clippy::too_many_arguments)]
async fn indirect_schedule_task(
mut offsets_decoder: Box<dyn LogicalPageDecoder>,
list_requests: Vec<ListRequest>,
null_offset_adjustment: u64,
items_scheduler: Arc<dyn FieldScheduler>,
items_type: DataType,
io: Arc<dyn EncodingsIo>,
cache: Arc<FileMetadataCache>,
priority: Box<dyn PriorityRange>,
) -> Result<IndirectlyLoaded> {
let num_offsets = offsets_decoder.num_rows();
offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
let decode_task = offsets_decoder.drain(num_offsets)?;
let offsets = decode_task.task.decode()?;
let (item_ranges, offsets, validity) =
decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
trace!(
"Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
item_ranges,
items_scheduler.num_rows(),
priority
);
let offsets: Arc<[u64]> = offsets.into();
if item_ranges.is_empty() {
debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
return Ok(IndirectlyLoaded {
root_decoder: None,
offsets,
validity,
});
}
let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
let indirect_root_scheduler =
SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone());
let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
Arc::new(indirect_root_scheduler),
root_fields.clone(),
cache,
);
let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
&item_ranges,
&FilterExpression::no_filter(),
io,
Some(priority),
)?;
for message in indirect_messages {
for decoder in message.decoders {
let decoder = decoder.into_legacy();
if !decoder.path.is_empty() {
root_decoder.accept_child(decoder)?;
}
}
}
Ok(IndirectlyLoaded {
offsets,
validity,
root_decoder: Some(root_decoder),
})
}
#[derive(Debug)]
struct ListFieldSchedulingJob<'a> {
scheduler: &'a ListFieldScheduler,
offsets: Box<dyn SchedulingJob + 'a>,
num_rows: u64,
list_requests_iter: ListRequestsIter,
}
impl<'a> ListFieldSchedulingJob<'a> {
fn try_new(
scheduler: &'a ListFieldScheduler,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Self> {
let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let offsets = scheduler
.offsets_scheduler
.schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
Ok(Self {
scheduler,
offsets,
list_requests_iter,
num_rows,
})
}
}
impl SchedulingJob for ListFieldSchedulingJob<'_> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
priority: &dyn PriorityRange,
) -> Result<ScheduledScanLine> {
let next_offsets = self.offsets.schedule_next(context, priority)?;
let offsets_scheduled = next_offsets.rows_scheduled;
let list_reqs = self.list_requests_iter.next(offsets_scheduled);
trace!(
"Scheduled {} offsets which maps to list requests: {:?}",
offsets_scheduled,
list_reqs
);
let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
debug_assert!(list_reqs
.iter()
.all(|req| req.null_offset_adjustment == null_offset_adjustment));
let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
let next_offsets_decoder = next_offsets
.decoders
.into_iter()
.next()
.unwrap()
.into_legacy()
.decoder;
let items_scheduler = self.scheduler.items_scheduler.clone();
let items_type = self.scheduler.items_field.data_type().clone();
let io = context.io().clone();
let cache = context.cache().clone();
let indirect_fut = tokio::spawn(indirect_schedule_task(
next_offsets_decoder,
list_reqs,
null_offset_adjustment,
items_scheduler,
items_type,
io,
cache,
priority.box_clone(),
));
let decoder = Box::new(ListPageDecoder {
offsets: Arc::new([]),
validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
item_decoder: None,
rows_drained: 0,
rows_loaded: 0,
items_field: self.scheduler.items_field.clone(),
num_rows,
unloaded: Some(indirect_fut),
offset_type: self.scheduler.offset_type.clone(),
data_type: self.scheduler.list_type.clone(),
});
let decoder = context.locate_decoder(decoder);
Ok(ScheduledScanLine {
decoders: vec![MessageType::DecoderReady(decoder)],
rows_scheduled: num_rows,
})
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
pub struct ListFieldScheduler {
offsets_scheduler: Arc<dyn FieldScheduler>,
items_scheduler: Arc<dyn FieldScheduler>,
items_field: Arc<Field>,
offset_type: DataType,
list_type: DataType,
offset_page_info: Vec<OffsetPageInfo>,
}
#[derive(Debug)]
pub struct OffsetPageInfo {
pub offsets_in_page: u64,
pub null_offset_adjustment: u64,
pub num_items_referenced_by_page: u64,
}
impl ListFieldScheduler {
pub fn new(
offsets_scheduler: Arc<dyn FieldScheduler>,
items_scheduler: Arc<dyn FieldScheduler>,
items_field: Arc<Field>,
offset_type: DataType,
offset_page_info: Vec<OffsetPageInfo>,
) -> Self {
let list_type = match &offset_type {
DataType::Int32 => DataType::List(items_field.clone()),
DataType::Int64 => DataType::LargeList(items_field.clone()),
_ => panic!("Unexpected offset type {}", offset_type),
};
Self {
offsets_scheduler,
items_scheduler,
items_field,
offset_type,
offset_page_info,
list_type,
}
}
}
impl FieldScheduler for ListFieldScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn SchedulingJob + 'a>> {
Ok(Box::new(ListFieldSchedulingJob::try_new(
self, ranges, filter,
)?))
}
fn num_rows(&self) -> u64 {
self.offsets_scheduler.num_rows()
}
fn initialize<'a>(
&'a self,
_filter: &'a FilterExpression,
_context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
}
#[derive(Debug)]
struct ListPageDecoder {
unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
offsets: Arc<[u64]>,
validity: BooleanBuffer,
item_decoder: Option<SimpleStructDecoder>,
num_rows: u64,
rows_drained: u64,
rows_loaded: u64,
items_field: Arc<Field>,
offset_type: DataType,
data_type: DataType,
}
struct ListDecodeTask {
offsets: Vec<u64>,
validity: BooleanBuffer,
items: Option<Box<dyn DecodeArrayTask>>,
items_field: Arc<Field>,
offset_type: DataType,
}
impl DecodeArrayTask for ListDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let items = self
.items
.map(|items| {
let wrapped_items = items.decode()?;
Result::Ok(wrapped_items.as_struct().column(0).clone())
})
.unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
let offsets = UInt64Array::from(self.offsets);
let validity = NullBuffer::new(self.validity);
let validity = if validity.null_count() == 0 {
None
} else {
Some(validity)
};
let min_offset = UInt64Array::new_scalar(offsets.value(0));
let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
match &self.offset_type {
DataType::Int32 => {
let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
let offsets_i32 = offsets.as_primitive::<Int32Type>();
let offsets = OffsetBuffer::new(offsets_i32.values().clone());
Ok(Arc::new(ListArray::try_new(
self.items_field.clone(),
offsets,
items,
validity,
)?))
}
DataType::Int64 => {
let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
let offsets_i64 = offsets.as_primitive::<Int64Type>();
let offsets = OffsetBuffer::new(offsets_i64.values().clone());
Ok(Arc::new(LargeListArray::try_new(
self.items_field.clone(),
offsets,
items,
validity,
)?))
}
_ => panic!("ListDecodeTask with data type that is not i32 or i64"),
}
}
}
fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
let mut result = match to_search.binary_search(&target) {
Ok(idx) => idx,
Err(idx) => idx - 1,
};
while result < (to_search.len() - 1) && to_search[result + 1] == target {
result += 1;
}
result as u64
}
impl LogicalPageDecoder for ListPageDecoder {
fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
async move {
if self.unloaded.is_some() {
trace!("List scheduler needs to wait for indirect I/O to complete");
let indirectly_loaded = self.unloaded.take().unwrap().await;
if indirectly_loaded.is_err() {
match indirectly_loaded.unwrap_err().try_into_panic() {
Ok(err) => std::panic::resume_unwind(err),
Err(err) => panic!("{:?}", err),
};
}
let indirectly_loaded = indirectly_loaded.unwrap()?;
self.offsets = indirectly_loaded.offsets;
self.validity = indirectly_loaded.validity;
self.item_decoder = indirectly_loaded.root_decoder;
}
if self.rows_loaded > loaded_need {
return Ok(());
}
let boundary = loaded_need as usize;
debug_assert!(boundary < self.num_rows as usize);
let items_needed = self.offsets[boundary + 1].saturating_sub(1);
trace!(
"List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
loaded_need,
self.rows_loaded,
self.num_rows,
items_needed,
);
let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
item_decoder.wait_for_loaded(items_needed).await?;
item_decoder.rows_loaded()
} else {
0
};
self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
trace!("List decoder now has {} loaded rows", self.rows_loaded);
Ok(())
}
.boxed()
}
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
let mut actual_num_rows = num_rows;
let item_start = self.offsets[self.rows_drained as usize];
if self.offset_type != DataType::Int64 {
while actual_num_rows > 0 {
let num_items =
self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
if num_items <= i32::MAX as u64 {
break;
}
actual_num_rows -= 1;
}
}
if actual_num_rows < num_rows {
return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
}
let offsets = self.offsets
[self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
.to_vec();
let validity = self
.validity
.slice(self.rows_drained as usize, actual_num_rows as usize);
let start = offsets[0];
let end = offsets[offsets.len() - 1];
let num_items_to_drain = end - start;
let item_decode = if num_items_to_drain == 0 {
None
} else {
self.item_decoder
.as_mut()
.map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
.transpose()?
};
self.rows_drained += num_rows;
let has_more = self.rows_left() > 0;
Ok(NextDecodeTask {
has_more,
num_rows,
task: Box::new(ListDecodeTask {
offsets,
validity,
items_field: self.items_field.clone(),
items: item_decode,
offset_type: self.offset_type.clone(),
}) as Box<dyn DecodeArrayTask>,
})
}
fn num_rows(&self) -> u64 {
self.num_rows
}
fn rows_loaded(&self) -> u64 {
self.rows_loaded
}
fn rows_drained(&self) -> u64 {
self.rows_drained
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
struct IndirectlyLoaded {
offsets: Arc<[u64]>,
validity: BooleanBuffer,
root_decoder: Option<SimpleStructDecoder>,
}
impl std::fmt::Debug for IndirectlyLoaded {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IndirectlyLoaded")
.field("offsets", &self.offsets)
.field("validity", &self.validity)
.finish()
}
}
#[derive(Debug)]
struct ListOffsetsEncoder {
accumulation_queue: AccumulationQueue,
inner_encoder: Arc<dyn ArrayEncoder>,
column_index: u32,
}
impl ListOffsetsEncoder {
fn new(
cache_bytes: u64,
keep_original_array: bool,
column_index: u32,
inner_encoder: Arc<dyn ArrayEncoder>,
) -> Self {
Self {
accumulation_queue: AccumulationQueue::new(
cache_bytes,
column_index,
keep_original_array,
),
inner_encoder,
column_index,
}
}
fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
match list_arr.data_type() {
DataType::List(_) => {
let offsets = list_arr.as_list::<i32>().offsets().clone();
Arc::new(Int32Array::new(offsets.into_inner(), None))
}
DataType::LargeList(_) => {
let offsets = list_arr.as_list::<i64>().offsets().clone();
Arc::new(Int64Array::new(offsets.into_inner(), None))
}
_ => panic!(),
}
}
fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
if let Some(validity) = list_arr.nulls() {
Arc::new(BooleanArray::new(validity.inner().clone(), None))
} else {
new_empty_array(&DataType::Boolean)
}
}
fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
let inner_encoder = self.inner_encoder.clone();
let column_idx = self.column_index;
let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
tokio::task::spawn(async move {
let num_rows =
offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
let num_rows = num_rows as u64;
let mut buffer_index = 0;
let array = Self::do_encode(
offset_arrays,
validity_arrays,
&mut buffer_index,
num_rows,
inner_encoder,
)?;
let (data, description) = array.into_buffers();
Ok(EncodedPage {
data,
description: PageEncoding::Legacy(description),
num_rows,
column_idx,
row_number: 0, })
})
.map(|res_res| res_res.unwrap())
.boxed()
}
fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
let offsets = Self::extract_offsets(list_arr);
let validity = Self::extract_validity(list_arr);
let num_rows = offsets.len() as u64;
if let Some(mut arrays) = self
.accumulation_queue
.insert(offsets, 0, num_rows)
{
arrays.0.push(validity);
Some(self.make_encode_task(arrays.0))
} else if let Some(arrays) = self
.accumulation_queue
.insert(validity, 0, num_rows)
{
Some(self.make_encode_task(arrays.0))
} else {
None
}
}
fn flush(&mut self) -> Option<EncodeTask> {
if let Some(arrays) = self.accumulation_queue.flush() {
Some(self.make_encode_task(arrays.0))
} else {
None
}
}
fn get_offset_span(array: &dyn Array) -> u64 {
match array.data_type() {
DataType::Int32 => {
let arr_i32 = array.as_primitive::<Int32Type>();
(arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
}
DataType::Int64 => {
let arr_i64 = array.as_primitive::<Int64Type>();
(arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
}
_ => panic!(),
}
}
fn extend_offsets_vec_u64(
dest: &mut Vec<u64>,
offsets: &dyn Array,
validity: Option<&BooleanArray>,
base: u64,
null_offset_adjustment: u64,
) {
match offsets.data_type() {
DataType::Int32 => {
let offsets_i32 = offsets.as_primitive::<Int32Type>();
let start = offsets_i32.value(0) as u64;
let modifier = base as i64 - start as i64;
if let Some(validity) = validity {
dest.extend(
offsets_i32
.values()
.iter()
.skip(1)
.zip(validity.values().iter())
.map(|(&off, valid)| {
(off as i64 + modifier) as u64
+ (!valid as u64 * null_offset_adjustment)
}),
);
} else {
dest.extend(
offsets_i32
.values()
.iter()
.skip(1)
.map(|&v| (v as i64 + modifier) as u64),
);
}
}
DataType::Int64 => {
let offsets_i64 = offsets.as_primitive::<Int64Type>();
let start = offsets_i64.value(0) as u64;
let modifier = base as i64 - start as i64;
if let Some(validity) = validity {
dest.extend(
offsets_i64
.values()
.iter()
.skip(1)
.zip(validity.values().iter())
.map(|(&off, valid)| {
(off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
}),
)
} else {
dest.extend(
offsets_i64
.values()
.iter()
.skip(1)
.map(|&v| (v + modifier) as u64),
);
}
}
_ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
}
}
fn do_encode_u64(
offset_arrays: Vec<ArrayRef>,
validity: Vec<Option<&BooleanArray>>,
num_offsets: u64,
null_offset_adjustment: u64,
buffer_index: &mut u32,
inner_encoder: Arc<dyn ArrayEncoder>,
) -> Result<EncodedArray> {
let mut offsets = Vec::with_capacity(num_offsets as usize);
for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
Self::extend_offsets_vec_u64(
&mut offsets,
&offsets_arr,
validity_arr,
last_prev_offset,
null_offset_adjustment,
);
}
let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value: 64,
data: LanceBuffer::reinterpret_vec(offsets),
num_values: num_offsets,
block_info: BlockInfo::new(),
});
inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
}
fn do_encode(
offset_arrays: Vec<ArrayRef>,
validity_arrays: Vec<ArrayRef>,
buffer_index: &mut u32,
num_offsets: u64,
inner_encoder: Arc<dyn ArrayEncoder>,
) -> Result<EncodedArray> {
let validity_arrays = validity_arrays
.iter()
.map(|v| {
if v.is_empty() {
None
} else {
Some(v.as_boolean())
}
})
.collect::<Vec<_>>();
debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
let total_span = offset_arrays
.iter()
.map(|arr| Self::get_offset_span(arr.as_ref()))
.sum::<u64>();
let null_offset_adjustment = total_span + 1;
let encoded_offsets = Self::do_encode_u64(
offset_arrays,
validity_arrays,
num_offsets,
null_offset_adjustment,
buffer_index,
inner_encoder,
)?;
Ok(EncodedArray {
data: encoded_offsets.data,
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
pb::List {
offsets: Some(Box::new(encoded_offsets.encoding)),
null_offset_adjustment,
num_items: total_span,
},
))),
},
})
}
}
pub struct ListFieldEncoder {
offsets_encoder: ListOffsetsEncoder,
items_encoder: Box<dyn FieldEncoder>,
}
impl ListFieldEncoder {
pub fn new(
items_encoder: Box<dyn FieldEncoder>,
inner_offsets_encoder: Arc<dyn ArrayEncoder>,
cache_bytes_per_columns: u64,
keep_original_array: bool,
column_index: u32,
) -> Self {
Self {
offsets_encoder: ListOffsetsEncoder::new(
cache_bytes_per_columns,
keep_original_array,
column_index,
inner_offsets_encoder,
),
items_encoder,
}
}
fn combine_tasks(
offsets_tasks: Vec<EncodeTask>,
item_tasks: Vec<EncodeTask>,
) -> Result<Vec<EncodeTask>> {
let mut all_tasks = offsets_tasks;
let item_tasks = item_tasks;
all_tasks.extend(item_tasks);
Ok(all_tasks)
}
}
impl FieldEncoder for ListFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let items = match array.data_type() {
DataType::List(_) => {
let list_arr = array.as_list::<i32>();
let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
let items_end =
list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
list_arr
.values()
.slice(items_start, items_end - items_start)
}
DataType::LargeList(_) => {
let list_arr = array.as_list::<i64>();
let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
let items_end =
list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
list_arr
.values()
.slice(items_start, items_end - items_start)
}
_ => panic!(),
};
let offsets_tasks = self
.offsets_encoder
.maybe_encode_offsets_and_validity(array.as_ref())
.map(|task| vec![task])
.unwrap_or_default();
let mut item_tasks = self.items_encoder.maybe_encode(
items,
external_buffers,
repdef,
row_number,
num_rows,
)?;
if !offsets_tasks.is_empty() && item_tasks.is_empty() {
item_tasks = self.items_encoder.flush(external_buffers)?;
}
Self::combine_tasks(offsets_tasks, item_tasks)
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
let offsets_tasks = self
.offsets_encoder
.flush()
.map(|task| vec![task])
.unwrap_or_default();
let item_tasks = self.items_encoder.flush(external_buffers)?;
Self::combine_tasks(offsets_tasks, item_tasks)
}
fn num_columns(&self) -> u32 {
self.items_encoder.num_columns() + 1
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
let inner_columns = self.items_encoder.finish(external_buffers);
async move {
let mut columns = vec![EncodedColumn::default()];
let inner_columns = inner_columns.await?;
columns.extend(inner_columns);
Ok(columns)
}
.boxed()
}
}
pub struct ListStructuralEncoder {
child: Box<dyn FieldEncoder>,
}
impl ListStructuralEncoder {
pub fn new(child: Box<dyn FieldEncoder>) -> Self {
Self { child }
}
}
impl FieldEncoder for ListStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let values = if let Some(list_arr) = array.as_list_opt::<i32>() {
let has_garbage_values =
repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
if has_garbage_values {
list_arr.filter_garbage_nulls().trimmed_values()
} else {
list_arr.trimmed_values()
}
} else if let Some(list_arr) = array.as_list_opt::<i64>() {
let has_garbage_values =
repdef.add_offsets(list_arr.offsets().clone(), array.nulls().cloned());
if has_garbage_values {
list_arr.filter_garbage_nulls().trimmed_values()
} else {
list_arr.trimmed_values()
}
} else {
panic!("List encoder used for non-list data")
};
self.child
.maybe_encode(values, external_buffers, repdef, row_number, num_rows)
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
self.child.flush(external_buffers)
}
fn num_columns(&self) -> u32 {
self.child.num_columns()
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
self.child.finish(external_buffers)
}
}
#[derive(Debug)]
pub struct StructuralListScheduler {
child: Box<dyn StructuralFieldScheduler>,
}
impl StructuralListScheduler {
pub fn new(child: Box<dyn StructuralFieldScheduler>) -> Self {
Self { child }
}
}
impl StructuralFieldScheduler for StructuralListScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let child = self.child.schedule_ranges(ranges, filter)?;
Ok(Box::new(StructuralListSchedulingJob::new(child)))
}
fn initialize<'a>(
&'a mut self,
filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
self.child.initialize(filter, context)
}
}
#[derive(Debug)]
struct StructuralListSchedulingJob<'a> {
child: Box<dyn StructuralSchedulingJob + 'a>,
}
impl<'a> StructuralListSchedulingJob<'a> {
fn new(child: Box<dyn StructuralSchedulingJob + 'a>) -> Self {
Self { child }
}
}
impl StructuralSchedulingJob for StructuralListSchedulingJob<'_> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
) -> Result<Option<ScheduledScanLine>> {
self.child.schedule_next(context)
}
}
#[derive(Debug)]
pub struct StructuralListDecoder {
child: Box<dyn StructuralFieldDecoder>,
data_type: DataType,
}
impl StructuralListDecoder {
pub fn new(child: Box<dyn StructuralFieldDecoder>, data_type: DataType) -> Self {
Self { child, data_type }
}
}
impl StructuralFieldDecoder for StructuralListDecoder {
fn accept_page(&mut self, child: crate::decoder::LoadedPage) -> Result<()> {
self.child.accept_page(child)
}
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let child_task = self.child.drain(num_rows)?;
Ok(Box::new(StructuralListDecodeTask::new(
child_task,
self.data_type.clone(),
)))
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
#[derive(Debug)]
struct StructuralListDecodeTask {
child_task: Box<dyn StructuralDecodeArrayTask>,
data_type: DataType,
}
impl StructuralListDecodeTask {
fn new(child_task: Box<dyn StructuralDecodeArrayTask>, data_type: DataType) -> Self {
Self {
child_task,
data_type,
}
}
}
impl StructuralDecodeArrayTask for StructuralListDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
let DecodedArray { array, mut repdef } = self.child_task.decode()?;
match &self.data_type {
DataType::List(child_field) => {
let (offsets, validity) = repdef.unravel_offsets::<i32>()?;
let list_array = ListArray::try_new(child_field.clone(), offsets, array, validity)?;
Ok(DecodedArray {
array: Arc::new(list_array),
repdef,
})
}
DataType::LargeList(child_field) => {
let (offsets, validity) = repdef.unravel_offsets::<i64>()?;
let list_array =
LargeListArray::try_new(child_field.clone(), offsets, array, validity)?;
Ok(DecodedArray {
array: Arc::new(list_array),
repdef,
})
}
_ => panic!("List decoder did not have a list field"),
}
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow::array::{Int64Builder, LargeListBuilder, StringBuilder};
use arrow_array::{
builder::{Int32Builder, ListBuilder},
Array, ArrayRef, BooleanArray, ListArray, StructArray, UInt64Array,
};
use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, Fields};
use rstest::rstest;
use crate::{
testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
version::LanceFileVersion,
};
fn make_list_type(inner_type: DataType) -> DataType {
DataType::List(Arc::new(Field::new("item", inner_type, true)))
}
fn make_large_list_type(inner_type: DataType) -> DataType {
DataType::LargeList(Arc::new(Field::new("item", inner_type, true)))
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_list(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let field = Field::new("", make_list_type(DataType::Int32), true);
check_round_trip_encoding_random(field, version).await;
}
#[test_log::test(tokio::test)]
async fn test_large_list() {
let field = Field::new("", make_large_list_type(DataType::Int32), true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_nested_strings() {
let field = Field::new("", make_list_type(DataType::Utf8), true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_nested_list() {
let field = Field::new("", make_list_type(make_list_type(DataType::Int32)), true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_list_struct_list() {
let struct_type = DataType::Struct(Fields::from(vec![Field::new(
"inner_str",
DataType::Utf8,
false,
)]));
let field = Field::new("", make_list_type(struct_type), true);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_list_struct_empty() {
let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
let items = UInt64Array::from(Vec::<u64>::new());
let structs = StructArray::new(fields, vec![Arc::new(items)], None);
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
let lists = ListArray::new(
Arc::new(Field::new("item", structs.data_type().clone(), true)),
offsets,
Arc::new(structs),
None,
);
check_round_trip_encoding_of_data(
vec![Arc::new(lists)],
&TestCases::default(),
HashMap::new(),
)
.await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_list(
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
) {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
list_builder.append_value([Some(1), Some(2), Some(3)]);
list_builder.append_value([Some(4), Some(5)]);
list_builder.append_null();
list_builder.append_value([Some(6), Some(7), Some(8)]);
let list_array = list_builder.finish();
let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![1, 3])
.with_indices(vec![2])
.with_file_version(version);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_simple_sliced_list() {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
list_builder.append_value([Some(1), Some(2), Some(3)]);
list_builder.append_value([Some(4), Some(5)]);
list_builder.append_null();
list_builder.append_value([Some(6), Some(7), Some(8)]);
let list_array = list_builder.finish();
let list_array = list_array.slice(1, 2);
let test_cases = TestCases::default()
.with_range(0..2)
.with_range(1..2)
.with_indices(vec![0])
.with_indices(vec![1])
.with_file_version(LanceFileVersion::V2_1);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}
#[rstest]
#[test_log::test(tokio::test)]
async fn test_list_with_garbage_nulls() {
let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let offsets = ScalarBuffer::<i32>::from(vec![0, 5, 8, 10]);
let offsets = OffsetBuffer::new(offsets);
let list_validity = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
let list_arr = ListArray::new(
Arc::new(Field::new("item", DataType::UInt64, true)),
offsets,
Arc::new(items),
Some(list_validity),
);
let test_cases = TestCases::default()
.with_range(0..3)
.with_range(1..2)
.with_indices(vec![1])
.with_indices(vec![2])
.with_file_version(LanceFileVersion::V2_1);
check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
.await;
}
#[test_log::test(tokio::test)]
async fn test_simple_two_page_list() {
let items_builder = Int64Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
for i in 0..512 {
list_builder.append_value([Some(i), Some(i * 2)]);
}
let list_array_1 = list_builder.finish();
let items_builder = Int64Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
for i in 0..512 {
let i = i + 512;
list_builder.append_value([Some(i), Some(i * 2)]);
}
let list_array_2 = list_builder.finish();
let test_cases = TestCases::default()
.with_file_version(LanceFileVersion::V2_1)
.with_page_sizes(vec![100])
.with_range(800..900);
check_round_trip_encoding_of_data(
vec![Arc::new(list_array_1), Arc::new(list_array_2)],
&test_cases,
HashMap::new(),
)
.await;
}
#[test_log::test(tokio::test)]
async fn test_simple_large_list() {
let items_builder = Int32Builder::new();
let mut list_builder = LargeListBuilder::new(items_builder);
list_builder.append_value([Some(1), Some(2), Some(3)]);
list_builder.append_value([Some(4), Some(5)]);
list_builder.append_null();
list_builder.append_value([Some(6), Some(7), Some(8)]);
let list_array = list_builder.finish();
let test_cases = TestCases::default()
.with_range(0..2)
.with_range(0..3)
.with_range(1..3)
.with_indices(vec![1, 3]);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}
#[test_log::test(tokio::test)]
async fn test_empty_lists() {
let values = [vec![Some(1), Some(2), Some(3)], vec![], vec![None]];
for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
for idx in order {
list_builder.append_value(values[idx].clone());
}
let list_array = Arc::new(list_builder.finish());
let test_cases = TestCases::default()
.with_indices(vec![1])
.with_indices(vec![0])
.with_indices(vec![2])
.with_indices(vec![0, 1]);
check_round_trip_encoding_of_data(
vec![list_array.clone()],
&test_cases,
HashMap::new(),
)
.await;
let test_cases = test_cases.with_batch_size(1);
check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
}
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
list_builder.append(true);
list_builder.append_null();
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
.await;
let test_cases = test_cases.with_batch_size(1);
check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
let items_builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(items_builder);
list_builder.append(true);
list_builder.append_null();
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
check_round_trip_encoding_of_data(vec![list_array.clone()], &test_cases, HashMap::new())
.await;
let test_cases = test_cases.with_batch_size(1);
check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
}
#[test_log::test(tokio::test)]
#[ignore] async fn test_jumbo_list() {
let items = BooleanArray::new_null(1024 * 1024);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1024 * 1024]));
let list_arr = Arc::new(ListArray::new(
Arc::new(Field::new("item", DataType::Boolean, true)),
offsets,
Arc::new(items),
None,
)) as ArrayRef;
let arrs = vec![list_arr; 5000];
let test_cases = TestCases::default().without_validation();
check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
}
}