use std::{
collections::{BinaryHeap, VecDeque},
ops::Range,
sync::Arc,
};
use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
use arrow_schema::{DataType, Fields};
use futures::{
future::BoxFuture,
stream::{FuturesOrdered, FuturesUnordered},
FutureExt, StreamExt, TryStreamExt,
};
use itertools::Itertools;
use lance_arrow::FieldExt;
use log::trace;
use snafu::{location, Location};
use crate::{
decoder::{
DecodeArrayTask, DecodedArray, DecoderReady, FieldScheduler, FilterExpression, LoadedPage,
LogicalPageDecoder, MessageType, NextDecodeTask, PageEncoding, PriorityRange,
ScheduledScanLine, SchedulerContext, SchedulingJob, StructuralDecodeArrayTask,
StructuralFieldDecoder, StructuralFieldScheduler, StructuralSchedulingJob,
},
encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
format::pb,
repdef::RepDefBuilder,
};
use lance_core::{Error, Result};
use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
#[derive(Debug)]
struct SchedulingJobWithStatus<'a> {
col_idx: u32,
col_name: &'a str,
job: Box<dyn SchedulingJob + 'a>,
rows_scheduled: u64,
rows_remaining: u64,
}
impl PartialEq for SchedulingJobWithStatus<'_> {
fn eq(&self, other: &Self) -> bool {
self.col_idx == other.col_idx
}
}
impl Eq for SchedulingJobWithStatus<'_> {}
impl PartialOrd for SchedulingJobWithStatus<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SchedulingJobWithStatus<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.rows_scheduled.cmp(&self.rows_scheduled)
}
}
#[derive(Debug)]
struct SimpleStructSchedulerJob<'a> {
scheduler: &'a SimpleStructScheduler,
children: BinaryHeap<SchedulingJobWithStatus<'a>>,
rows_scheduled: u64,
num_rows: u64,
initialized: bool,
}
impl<'a> SimpleStructSchedulerJob<'a> {
fn new(
scheduler: &'a SimpleStructScheduler,
children: Vec<Box<dyn SchedulingJob + 'a>>,
num_rows: u64,
) -> Self {
let children = children
.into_iter()
.enumerate()
.map(|(idx, job)| SchedulingJobWithStatus {
col_idx: idx as u32,
col_name: scheduler.child_fields[idx].name(),
job,
rows_scheduled: 0,
rows_remaining: num_rows,
})
.collect::<BinaryHeap<_>>();
Self {
scheduler,
children,
rows_scheduled: 0,
num_rows,
initialized: false,
}
}
}
impl SchedulingJob for SimpleStructSchedulerJob<'_> {
fn schedule_next(
&mut self,
mut context: &mut SchedulerContext,
priority: &dyn PriorityRange,
) -> Result<ScheduledScanLine> {
let mut decoders = Vec::new();
if !self.initialized {
let struct_decoder = Box::new(SimpleStructDecoder::new(
self.scheduler.child_fields.clone(),
self.num_rows,
));
let struct_decoder = context.locate_decoder(struct_decoder);
decoders.push(MessageType::DecoderReady(struct_decoder));
self.initialized = true;
}
let old_rows_scheduled = self.rows_scheduled;
while old_rows_scheduled == self.rows_scheduled {
let mut next_child = self.children.pop().unwrap();
trace!("Scheduling more rows for child {}", next_child.col_idx);
let scoped = context.push(next_child.col_name, next_child.col_idx);
let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
trace!(
"Scheduled {} rows for child {}",
child_scan.rows_scheduled,
next_child.col_idx
);
next_child.rows_scheduled += child_scan.rows_scheduled;
next_child.rows_remaining -= child_scan.rows_scheduled;
decoders.extend(child_scan.decoders);
self.children.push(next_child);
self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
context = scoped.pop();
}
let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
Ok(ScheduledScanLine {
decoders,
rows_scheduled: struct_rows_scheduled,
})
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
pub struct SimpleStructScheduler {
children: Vec<Arc<dyn FieldScheduler>>,
child_fields: Fields,
num_rows: u64,
}
impl SimpleStructScheduler {
pub fn new(children: Vec<Arc<dyn FieldScheduler>>, child_fields: Fields) -> Self {
debug_assert!(!children.is_empty());
let num_rows = children[0].num_rows();
debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
Self {
children,
child_fields,
num_rows,
}
}
}
impl FieldScheduler for SimpleStructScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn SchedulingJob + 'a>> {
let child_schedulers = self
.children
.iter()
.map(|child| child.schedule_ranges(ranges, filter))
.collect::<Result<Vec<_>>>()?;
let num_rows = child_schedulers[0].num_rows();
Ok(Box::new(SimpleStructSchedulerJob::new(
self,
child_schedulers,
num_rows,
)))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
fn initialize<'a>(
&'a self,
_filter: &'a FilterExpression,
_context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
let futures = self
.children
.iter()
.map(|child| child.initialize(_filter, _context))
.collect::<FuturesUnordered<_>>();
async move {
futures
.map(|res| res.map(|_| ()))
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
.boxed()
}
}
#[derive(Debug)]
struct StructuralSchedulingJobWithStatus<'a> {
col_idx: u32,
col_name: &'a str,
job: Box<dyn StructuralSchedulingJob + 'a>,
rows_scheduled: u64,
rows_remaining: u64,
}
impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
fn eq(&self, other: &Self) -> bool {
self.col_idx == other.col_idx
}
}
impl Eq for StructuralSchedulingJobWithStatus<'_> {}
impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for StructuralSchedulingJobWithStatus<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.rows_scheduled.cmp(&self.rows_scheduled)
}
}
#[derive(Debug)]
struct RepDefStructSchedulingJob<'a> {
children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
rows_scheduled: u64,
}
impl<'a> RepDefStructSchedulingJob<'a> {
fn new(
scheduler: &'a StructuralStructScheduler,
children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
num_rows: u64,
) -> Self {
let children = children
.into_iter()
.enumerate()
.map(|(idx, job)| StructuralSchedulingJobWithStatus {
col_idx: idx as u32,
col_name: scheduler.child_fields[idx].name(),
job,
rows_scheduled: 0,
rows_remaining: num_rows,
})
.collect::<BinaryHeap<_>>();
Self {
children,
rows_scheduled: 0,
}
}
}
impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
fn schedule_next(
&mut self,
mut context: &mut SchedulerContext,
) -> Result<Option<ScheduledScanLine>> {
let mut decoders = Vec::new();
let old_rows_scheduled = self.rows_scheduled;
while old_rows_scheduled == self.rows_scheduled {
let mut next_child = self.children.pop().unwrap();
let scoped = context.push(next_child.col_name, next_child.col_idx);
let child_scan = next_child.job.schedule_next(scoped.context)?;
if child_scan.is_none() {
return Ok(None);
}
let child_scan = child_scan.unwrap();
trace!(
"Scheduled {} rows for child {}",
child_scan.rows_scheduled,
next_child.col_idx
);
next_child.rows_scheduled += child_scan.rows_scheduled;
next_child.rows_remaining -= child_scan.rows_scheduled;
decoders.extend(child_scan.decoders);
self.children.push(next_child);
self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
context = scoped.pop();
}
let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
Ok(Some(ScheduledScanLine {
decoders,
rows_scheduled: struct_rows_scheduled,
}))
}
}
#[derive(Debug)]
pub struct StructuralStructScheduler {
children: Vec<Box<dyn StructuralFieldScheduler>>,
child_fields: Fields,
}
impl StructuralStructScheduler {
pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
debug_assert!(!children.is_empty());
Self {
children,
child_fields,
}
}
}
impl StructuralFieldScheduler for StructuralStructScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let child_schedulers = self
.children
.iter()
.map(|child| child.schedule_ranges(ranges, filter))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(RepDefStructSchedulingJob::new(
self,
child_schedulers,
num_rows,
)))
}
fn initialize<'a>(
&'a mut self,
filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
let children_initialization = self
.children
.iter_mut()
.map(|child| child.initialize(filter, context))
.collect::<FuturesUnordered<_>>();
async move {
children_initialization
.map(|res| res.map(|_| ()))
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
.boxed()
}
}
#[derive(Debug)]
struct ChildState {
scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
rows_loaded: u64,
rows_drained: u64,
rows_popped: u64,
num_rows: u64,
field_index: u32,
}
struct CompositeDecodeTask {
tasks: Vec<Box<dyn DecodeArrayTask>>,
num_rows: u64,
has_more: bool,
}
impl CompositeDecodeTask {
fn decode(self) -> Result<ArrayRef> {
let arrays = self
.tasks
.into_iter()
.map(|task| task.decode())
.collect::<Result<Vec<_>>>()?;
let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
Ok(arrow_select::concat::concat(&array_refs)?)
}
}
impl ChildState {
fn new(num_rows: u64, field_index: u32) -> Self {
Self {
scheduled: VecDeque::new(),
rows_loaded: 0,
rows_drained: 0,
rows_popped: 0,
num_rows,
field_index,
}
}
async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
trace!(
"Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
self.field_index,
loaded_need,
self.rows_loaded,
);
let mut fully_loaded = self.rows_popped;
for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
if next_decoder.rows_unloaded() > 0 {
let mut current_need = loaded_need;
current_need -= fully_loaded;
let rows_in_page = next_decoder.num_rows();
let need_for_page = (rows_in_page - 1).min(current_need);
trace!(
"Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
self.field_index,
page_idx,
need_for_page,
rows_in_page,
);
next_decoder.wait_for_loaded(need_for_page).await?;
let now_loaded = next_decoder.rows_loaded();
fully_loaded += now_loaded;
trace!(
"Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
self.field_index,
page_idx,
now_loaded,
fully_loaded
);
} else {
fully_loaded += next_decoder.num_rows();
}
if fully_loaded > loaded_need {
break;
}
}
self.rows_loaded = fully_loaded;
trace!(
"Struct child {} loaded {} new rows and now {} are loaded",
self.field_index,
fully_loaded,
self.rows_loaded
);
Ok(())
}
fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
trace!("Struct draining {} rows", num_rows);
trace!(
"Draining {} rows from struct page with {} rows already drained",
num_rows,
self.rows_drained
);
let mut remaining = num_rows;
let mut composite = CompositeDecodeTask {
tasks: Vec::new(),
num_rows: 0,
has_more: true,
};
while remaining > 0 {
let next = self.scheduled.front_mut().unwrap();
let rows_to_take = remaining.min(next.rows_left());
let next_task = next.drain(rows_to_take)?;
if next.rows_left() == 0 {
trace!("Completely drained page");
self.rows_popped += next.num_rows();
self.scheduled.pop_front();
}
remaining -= rows_to_take;
composite.tasks.push(next_task.task);
composite.num_rows += next_task.num_rows;
}
self.rows_drained += num_rows;
composite.has_more = self.rows_drained != self.num_rows;
Ok(composite)
}
}
struct WaitOrder<'a>(&'a mut ChildState);
impl Eq for WaitOrder<'_> {}
impl PartialEq for WaitOrder<'_> {
fn eq(&self, other: &Self) -> bool {
self.0.rows_loaded == other.0.rows_loaded
}
}
impl Ord for WaitOrder<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.0.rows_loaded.cmp(&self.0.rows_loaded)
}
}
impl PartialOrd for WaitOrder<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug)]
pub struct StructuralStructDecoder {
children: Vec<Box<dyn StructuralFieldDecoder>>,
data_type: DataType,
child_fields: Fields,
is_root: bool,
}
impl StructuralStructDecoder {
pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
let children = fields
.iter()
.map(|field| Self::field_to_decoder(field, should_validate))
.collect();
let data_type = DataType::Struct(fields.clone());
Self {
data_type,
children,
child_fields: fields,
is_root,
}
}
fn field_to_decoder(
field: &Arc<arrow_schema::Field>,
should_validate: bool,
) -> Box<dyn StructuralFieldDecoder> {
match field.data_type() {
DataType::Struct(fields) => {
if field.is_packed_struct() {
let decoder =
StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
Box::new(decoder)
} else {
Box::new(Self::new(fields.clone(), should_validate, false))
}
}
DataType::List(child_field) | DataType::LargeList(child_field) => {
let child_decoder = Self::field_to_decoder(child_field, should_validate);
Box::new(StructuralListDecoder::new(
child_decoder,
field.data_type().clone(),
))
}
DataType::RunEndEncoded(_, _) => todo!(),
DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
DataType::Map(_, _) => todo!(),
DataType::Union(_, _) => todo!(),
_ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
}
}
}
impl StructuralFieldDecoder for StructuralStructDecoder {
fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
let child_idx = child.path.pop_front().unwrap();
self.children[child_idx as usize].accept_page(child)?;
Ok(())
}
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let child_tasks = self
.children
.iter_mut()
.map(|child| child.drain(num_rows))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(RepDefStructDecodeTask {
children: child_tasks,
child_fields: self.child_fields.clone(),
is_root: self.is_root,
}))
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
#[derive(Debug)]
struct RepDefStructDecodeTask {
children: Vec<Box<dyn StructuralDecodeArrayTask>>,
child_fields: Fields,
is_root: bool,
}
impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
let arrays = self
.children
.into_iter()
.map(|task| task.decode())
.collect::<Result<Vec<_>>>()?;
let mut children = Vec::with_capacity(arrays.len());
let mut arrays_iter = arrays.into_iter();
let first_array = arrays_iter.next().unwrap();
let length = first_array.array.len();
let mut repdef = first_array.repdef;
children.push(first_array.array);
for array in arrays_iter {
debug_assert_eq!(length, array.array.len());
children.push(array.array);
}
let validity = if self.is_root {
None
} else {
repdef.unravel_validity(length)
};
let array = StructArray::new(self.child_fields, children, validity);
Ok(DecodedArray {
array: Arc::new(array),
repdef,
})
}
}
#[derive(Debug)]
pub struct SimpleStructDecoder {
children: Vec<ChildState>,
child_fields: Fields,
data_type: DataType,
num_rows: u64,
}
impl SimpleStructDecoder {
pub fn new(child_fields: Fields, num_rows: u64) -> Self {
let data_type = DataType::Struct(child_fields.clone());
Self {
children: child_fields
.iter()
.enumerate()
.map(|(idx, _)| ChildState::new(num_rows, idx as u32))
.collect(),
child_fields,
data_type,
num_rows,
}
}
async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
let mut wait_orders = self
.children
.iter_mut()
.filter_map(|child| {
if child.rows_loaded <= loaded_need {
Some(WaitOrder(child))
} else {
None
}
})
.collect::<BinaryHeap<_>>();
while !wait_orders.is_empty() {
let next_waiter = wait_orders.pop().unwrap();
let next_highest = wait_orders
.peek()
.map(|w| w.0.rows_loaded)
.unwrap_or(u64::MAX);
let limit = loaded_need.min(next_highest);
next_waiter.0.wait_for_loaded(limit).await?;
log::trace!(
"Struct child {} finished await pass and now {} are loaded",
next_waiter.0.field_index,
next_waiter.0.rows_loaded
);
if next_waiter.0.rows_loaded <= loaded_need {
wait_orders.push(next_waiter);
}
}
Ok(())
}
}
impl LogicalPageDecoder for SimpleStructDecoder {
fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
let child_idx = child.path.pop_front().unwrap();
if child.path.is_empty() {
self.children[child_idx as usize]
.scheduled
.push_back(child.decoder);
} else {
let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
intended.accept_child(child)?;
}
Ok(())
}
fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
self.do_wait_for_loaded(loaded_need).boxed()
}
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
let child_tasks = self
.children
.iter_mut()
.map(|child| child.drain(num_rows))
.collect::<Result<Vec<_>>>()?;
let num_rows = child_tasks[0].num_rows;
let has_more = child_tasks[0].has_more;
debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
debug_assert!(child_tasks.iter().all(|task| task.has_more == has_more));
Ok(NextDecodeTask {
task: Box::new(SimpleStructDecodeTask {
children: child_tasks,
child_fields: self.child_fields.clone(),
}),
num_rows,
has_more,
})
}
fn rows_loaded(&self) -> u64 {
self.children.iter().map(|c| c.rows_loaded).min().unwrap()
}
fn rows_drained(&self) -> u64 {
debug_assert!(self
.children
.iter()
.all(|c| c.rows_drained == self.children[0].rows_drained));
self.children[0].rows_drained
}
fn num_rows(&self) -> u64 {
self.num_rows
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
struct SimpleStructDecodeTask {
children: Vec<CompositeDecodeTask>,
child_fields: Fields,
}
impl DecodeArrayTask for SimpleStructDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let child_arrays = self
.children
.into_iter()
.map(|child| child.decode())
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::try_new(
self.child_fields,
child_arrays,
None,
)?))
}
}
pub struct StructStructuralEncoder {
children: Vec<Box<dyn FieldEncoder>>,
}
impl StructStructuralEncoder {
pub fn new(children: Vec<Box<dyn FieldEncoder>>) -> Self {
Self { children }
}
}
impl FieldEncoder for StructStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let struct_array = array.as_struct();
if let Some(validity) = struct_array.nulls() {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_no_null(struct_array.len());
}
let child_tasks = self
.children
.iter_mut()
.zip(struct_array.columns().iter())
.map(|(encoder, arr)| {
encoder.maybe_encode(
arr.clone(),
external_buffers,
repdef.clone(),
row_number,
num_rows,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
self.children
.iter_mut()
.map(|encoder| encoder.flush(external_buffers))
.flatten_ok()
.collect::<Result<Vec<_>>>()
}
fn num_columns(&self) -> u32 {
self.children
.iter()
.map(|child| child.num_columns())
.sum::<u32>()
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
let mut child_columns = self
.children
.iter_mut()
.map(|child| child.finish(external_buffers))
.collect::<FuturesOrdered<_>>();
async move {
let mut encoded_columns = Vec::with_capacity(child_columns.len());
while let Some(child_cols) = child_columns.next().await {
encoded_columns.extend(child_cols?);
}
Ok(encoded_columns)
}
.boxed()
}
}
pub struct StructFieldEncoder {
children: Vec<Box<dyn FieldEncoder>>,
column_index: u32,
num_rows_seen: u64,
}
impl StructFieldEncoder {
#[allow(dead_code)]
pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
Self {
children,
column_index,
num_rows_seen: 0,
}
}
}
impl FieldEncoder for StructFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
self.num_rows_seen += array.len() as u64;
let struct_array = array.as_struct();
let child_tasks = self
.children
.iter_mut()
.zip(struct_array.columns().iter())
.map(|(encoder, arr)| {
encoder.maybe_encode(
arr.clone(),
external_buffers,
repdef.clone(),
row_number,
num_rows,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
let child_tasks = self
.children
.iter_mut()
.map(|encoder| encoder.flush(external_buffers))
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn num_columns(&self) -> u32 {
self.children
.iter()
.map(|child| child.num_columns())
.sum::<u32>()
+ 1
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
let mut child_columns = self
.children
.iter_mut()
.map(|child| child.finish(external_buffers))
.collect::<FuturesOrdered<_>>();
let num_rows_seen = self.num_rows_seen;
let column_index = self.column_index;
async move {
let mut columns = Vec::new();
let mut header = EncodedColumn::default();
header.final_pages.push(EncodedPage {
data: Vec::new(),
description: PageEncoding::Legacy(pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
pb::SimpleStruct {},
)),
}),
num_rows: num_rows_seen,
column_idx: column_index,
row_number: 0, });
columns.push(header);
while let Some(child_cols) = child_columns.next().await {
columns.extend(child_cols?);
}
Ok(columns)
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_array::{
builder::{Int32Builder, ListBuilder},
Array, ArrayRef, Int32Array, StructArray,
};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field, Fields};
use crate::{
testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
version::LanceFileVersion,
};
#[test_log::test(tokio::test)]
async fn test_simple_struct() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let field = Field::new("", data_type, false);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_nullable_struct() {
let inner_fields = Fields::from(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, true),
]);
let inner_struct = DataType::Struct(inner_fields.clone());
let outer_fields = Fields::from(vec![
Field::new("score", DataType::Int32, true),
Field::new("location", inner_struct, true),
]);
let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
let locations = StructArray::new(
inner_fields,
vec![Arc::new(x_vals), Arc::new(y_vals)],
Some(location_validity),
);
let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
let rows = StructArray::new(
outer_fields,
vec![Arc::new(scores), Arc::new(locations)],
Some(rows_validity),
);
let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
}
#[test_log::test(tokio::test)]
async fn test_struct_list() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
),
Field::new("outer_int", DataType::Int32, true),
]));
let field = Field::new("row", data_type, false);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_complicated_struct() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new("int", DataType::Int32, true),
Field::new(
"inner",
DataType::Struct(Fields::from(vec![
Field::new("inner_int", DataType::Int32, true),
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
),
])),
true,
),
Field::new("outer_binary", DataType::Binary, true),
]));
let field = Field::new("row", data_type, false);
check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
}
#[test_log::test(tokio::test)]
async fn test_ragged_scheduling() {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
for _ in 0..10000 {
list_builder.append_null();
}
let list_array = Arc::new(list_builder.finish());
let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
let fields = vec![
Field::new("", list_array.data_type().clone(), true),
Field::new("", int_array.data_type().clone(), true),
];
let struct_array = Arc::new(StructArray::new(
Fields::from(fields),
vec![list_array, int_array],
None,
)) as ArrayRef;
let struct_arrays = (0..10000)
.step_by(437)
.map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
.collect::<Vec<_>>();
check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
.await;
}
}