lance_encoding/encodings/logical/
binary.rs1use std::sync::Arc;
5
6use arrow_array::{
7 cast::AsArray,
8 types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, UInt8Type, Utf8Type},
9 Array, ArrayRef, GenericByteArray, GenericListArray,
10};
11
12use arrow_schema::DataType;
13use futures::{future::BoxFuture, FutureExt};
14use lance_core::Result;
15use log::trace;
16
17use crate::decoder::{
18 DecodeArrayTask, DecoderReady, FieldScheduler, FilterExpression, LogicalPageDecoder,
19 MessageType, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
20};
21
22#[derive(Debug)]
25pub struct BinarySchedulingJob<'a> {
26 scheduler: &'a BinaryFieldScheduler,
27 inner: Box<dyn SchedulingJob + 'a>,
28}
29
30impl SchedulingJob for BinarySchedulingJob<'_> {
31 fn schedule_next(
32 &mut self,
33 context: &mut SchedulerContext,
34 priority: &dyn PriorityRange,
35 ) -> Result<ScheduledScanLine> {
36 let inner_scan = self.inner.schedule_next(context, priority)?;
37 let wrapped_decoders = inner_scan
38 .decoders
39 .into_iter()
40 .map(|message| {
41 let decoder = message.into_legacy();
42 MessageType::DecoderReady(DecoderReady {
43 path: decoder.path,
44 decoder: Box::new(BinaryPageDecoder {
45 inner: decoder.decoder,
46 data_type: self.scheduler.data_type.clone(),
47 }),
48 })
49 })
50 .collect::<Vec<_>>();
51 Ok(ScheduledScanLine {
52 decoders: wrapped_decoders,
53 rows_scheduled: inner_scan.rows_scheduled,
54 })
55 }
56
57 fn num_rows(&self) -> u64 {
58 self.inner.num_rows()
59 }
60}
61
62#[derive(Debug)]
64pub struct BinaryFieldScheduler {
65 varbin_scheduler: Arc<dyn FieldScheduler>,
66 data_type: DataType,
67}
68
69impl BinaryFieldScheduler {
70 pub fn new(varbin_scheduler: Arc<dyn FieldScheduler>, data_type: DataType) -> Self {
72 Self {
73 varbin_scheduler,
74 data_type,
75 }
76 }
77}
78
79impl FieldScheduler for BinaryFieldScheduler {
80 fn schedule_ranges<'a>(
81 &'a self,
82 ranges: &[std::ops::Range<u64>],
83 filter: &FilterExpression,
84 ) -> Result<Box<dyn SchedulingJob + 'a>> {
85 trace!("Scheduling binary for {} ranges", ranges.len());
86 let varbin_job = self.varbin_scheduler.schedule_ranges(ranges, filter)?;
87 Ok(Box::new(BinarySchedulingJob {
88 scheduler: self,
89 inner: varbin_job,
90 }))
91 }
92
93 fn num_rows(&self) -> u64 {
94 self.varbin_scheduler.num_rows()
95 }
96
97 fn initialize<'a>(
98 &'a self,
99 _filter: &'a FilterExpression,
100 _context: &'a SchedulerContext,
101 ) -> BoxFuture<'a, Result<()>> {
102 std::future::ready(Ok(())).boxed()
104 }
105}
106
107#[derive(Debug)]
108pub struct BinaryPageDecoder {
109 inner: Box<dyn LogicalPageDecoder>,
110 data_type: DataType,
111}
112
113impl LogicalPageDecoder for BinaryPageDecoder {
114 fn wait_for_loaded(&mut self, num_rows: u64) -> BoxFuture<Result<()>> {
115 self.inner.wait_for_loaded(num_rows)
116 }
117
118 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
119 let inner_task = self.inner.drain(num_rows)?;
120 Ok(NextDecodeTask {
121 num_rows: inner_task.num_rows,
122 task: Box::new(BinaryArrayDecoder {
123 inner: inner_task.task,
124 data_type: self.data_type.clone(),
125 }),
126 })
127 }
128
129 fn data_type(&self) -> &DataType {
130 &self.data_type
131 }
132
133 fn rows_loaded(&self) -> u64 {
134 self.inner.rows_loaded()
135 }
136
137 fn num_rows(&self) -> u64 {
138 self.inner.num_rows()
139 }
140
141 fn rows_drained(&self) -> u64 {
142 self.inner.rows_drained()
143 }
144}
145
146pub struct BinaryArrayDecoder {
147 inner: Box<dyn DecodeArrayTask>,
148 data_type: DataType,
149}
150
151impl BinaryArrayDecoder {
152 fn from_list_array<T: ByteArrayType>(array: &GenericListArray<T::Offset>) -> ArrayRef {
153 let values = array
154 .values()
155 .as_primitive::<UInt8Type>()
156 .values()
157 .inner()
158 .clone();
159 let offsets = array.offsets().clone();
160 Arc::new(GenericByteArray::<T>::new(
161 offsets,
162 values,
163 array.nulls().cloned(),
164 ))
165 }
166}
167
168impl DecodeArrayTask for BinaryArrayDecoder {
169 fn decode(self: Box<Self>) -> Result<ArrayRef> {
170 let data_type = self.data_type;
171 let arr = self.inner.decode()?;
172 match data_type {
173 DataType::Binary => Ok(Self::from_list_array::<BinaryType>(arr.as_list::<i32>())),
174 DataType::LargeBinary => Ok(Self::from_list_array::<LargeBinaryType>(
175 arr.as_list::<i64>(),
176 )),
177 DataType::Utf8 => Ok(Self::from_list_array::<Utf8Type>(arr.as_list::<i32>())),
178 DataType::LargeUtf8 => Ok(Self::from_list_array::<LargeUtf8Type>(arr.as_list::<i64>())),
179 _ => panic!("Binary decoder does not support this data type"),
180 }
181 }
182}