lance_encoding/encodings/logical/
binary.rsuse std::sync::Arc;
use arrow_array::{
cast::AsArray,
types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, UInt8Type, Utf8Type},
Array, ArrayRef, GenericByteArray, GenericListArray,
};
use arrow_schema::DataType;
use futures::{future::BoxFuture, FutureExt};
use lance_core::Result;
use log::trace;
use crate::decoder::{
DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
};
#[derive(Debug)]
pub struct BinarySchedulingJob<'a> {
scheduler: &'a BinaryFieldScheduler,
inner: Box<dyn SchedulingJob + 'a>,
}
impl<'a> SchedulingJob for BinarySchedulingJob<'a> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
priority: &dyn PriorityRange,
) -> Result<ScheduledScanLine> {
let inner_scan = self.inner.schedule_next(context, priority)?;
let wrapped_decoders = inner_scan
.decoders
.into_iter()
.map(|message| {
let decoder = message.into_legacy();
MessageType::DecoderReady(DecoderReady {
path: decoder.path,
decoder: Box::new(BinaryPageDecoder {
inner: decoder.decoder,
data_type: self.scheduler.data_type.clone(),
}),
})
})
.collect::<Vec<_>>();
Ok(ScheduledScanLine {
decoders: wrapped_decoders,
rows_scheduled: inner_scan.rows_scheduled,
})
}
fn num_rows(&self) -> u64 {
self.inner.num_rows()
}
}
#[derive(Debug)]
pub struct BinaryFieldScheduler {
varbin_scheduler: Arc<dyn FieldScheduler>,
data_type: DataType,
}
impl BinaryFieldScheduler {
pub fn new(varbin_scheduler: Arc<dyn FieldScheduler>, data_type: DataType) -> Self {
Self {
varbin_scheduler,
data_type,
}
}
}
impl FieldScheduler for BinaryFieldScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[std::ops::Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn SchedulingJob + 'a>> {
trace!("Scheduling binary for {} ranges", ranges.len());
let varbin_job = self.varbin_scheduler.schedule_ranges(ranges, filter)?;
Ok(Box::new(BinarySchedulingJob {
scheduler: self,
inner: varbin_job,
}))
}
fn num_rows(&self) -> u64 {
self.varbin_scheduler.num_rows()
}
fn initialize<'a>(
&'a self,
_filter: &'a FilterExpression,
_context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
}
#[derive(Debug)]
pub struct BinaryPageDecoder {
inner: Box<dyn LogicalPageDecoder>,
data_type: DataType,
}
impl LogicalPageDecoder for BinaryPageDecoder {
fn wait_for_loaded(&mut self, num_rows: u64) -> BoxFuture<Result<()>> {
self.inner.wait_for_loaded(num_rows)
}
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
let inner_task = self.inner.drain(num_rows)?;
Ok(NextDecodeTask {
has_more: inner_task.has_more,
num_rows: inner_task.num_rows,
task: Box::new(BinaryArrayDecoder {
inner: inner_task.task,
data_type: self.data_type.clone(),
}),
})
}
fn data_type(&self) -> &DataType {
&self.data_type
}
fn rows_loaded(&self) -> u64 {
self.inner.rows_loaded()
}
fn num_rows(&self) -> u64 {
self.inner.num_rows()
}
fn rows_drained(&self) -> u64 {
self.inner.rows_drained()
}
}
pub struct BinaryArrayDecoder {
inner: Box<dyn DecodeArrayTask>,
data_type: DataType,
}
impl BinaryArrayDecoder {
fn from_list_array<T: ByteArrayType>(array: &GenericListArray<T::Offset>) -> ArrayRef {
let values = array
.values()
.as_primitive::<UInt8Type>()
.values()
.inner()
.clone();
let offsets = array.offsets().clone();
Arc::new(GenericByteArray::<T>::new(
offsets,
values,
array.nulls().cloned(),
))
}
}
impl DecodeArrayTask for BinaryArrayDecoder {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let data_type = self.data_type;
let arr = self.inner.decode()?;
match data_type {
DataType::Binary => Ok(Self::from_list_array::<BinaryType>(arr.as_list::<i32>())),
DataType::LargeBinary => Ok(Self::from_list_array::<LargeBinaryType>(
arr.as_list::<i64>(),
)),
DataType::Utf8 => Ok(Self::from_list_array::<Utf8Type>(arr.as_list::<i32>())),
DataType::LargeUtf8 => Ok(Self::from_list_array::<LargeUtf8Type>(arr.as_list::<i64>())),
_ => panic!("Binary decoder does not support this data type"),
}
}
}