lance_encoding/encodings/physical/
basic.rs1use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9
10use crate::{
11 data::{AllNullDataBlock, BlockInfo, DataBlock, NullableDataBlock},
12 decoder::{PageScheduler, PrimitivePageDecoder},
13 encoder::{ArrayEncoder, EncodedArray},
14 format::ProtobufUtils,
15 EncodingsIo,
16};
17
18use lance_core::Result;
19
20struct DataDecoders {
21 validity: Box<dyn PrimitivePageDecoder>,
22 values: Box<dyn PrimitivePageDecoder>,
23}
24
25enum DataNullStatus {
26 All,
28 None(Box<dyn PrimitivePageDecoder>),
30 Some(DataDecoders),
32}
33
34#[derive(Debug)]
35struct DataSchedulers {
36 validity: Box<dyn PageScheduler>,
37 values: Box<dyn PageScheduler>,
38}
39
40#[derive(Debug)]
41enum SchedulerNullStatus {
42 None(Box<dyn PageScheduler>),
44 Some(DataSchedulers),
46 All,
48}
49
50impl SchedulerNullStatus {
51 fn values_scheduler(&self) -> Option<&dyn PageScheduler> {
52 match self {
53 Self::All => None,
54 Self::None(values) => Some(values.as_ref()),
55 Self::Some(schedulers) => Some(schedulers.values.as_ref()),
56 }
57 }
58}
59
60#[derive(Debug)]
72pub struct BasicPageScheduler {
73 mode: SchedulerNullStatus,
74}
75
76impl BasicPageScheduler {
77 pub fn new_nullable(
79 validity_decoder: Box<dyn PageScheduler>,
80 values_decoder: Box<dyn PageScheduler>,
81 ) -> Self {
82 Self {
83 mode: SchedulerNullStatus::Some(DataSchedulers {
84 validity: validity_decoder,
85 values: values_decoder,
86 }),
87 }
88 }
89
90 pub fn new_non_nullable(values_decoder: Box<dyn PageScheduler>) -> Self {
92 Self {
93 mode: SchedulerNullStatus::None(values_decoder),
94 }
95 }
96
97 pub fn new_all_null() -> Self {
103 Self {
104 mode: SchedulerNullStatus::All,
105 }
106 }
107}
108
109impl PageScheduler for BasicPageScheduler {
110 fn schedule_ranges(
111 &self,
112 ranges: &[std::ops::Range<u64>],
113 scheduler: &Arc<dyn EncodingsIo>,
114 top_level_row: u64,
115 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
116 let validity_future = match &self.mode {
117 SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
118 SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges(
119 ranges,
120 scheduler,
121 top_level_row,
122 )),
123 };
124
125 let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
126 Some(
127 values_scheduler
128 .schedule_ranges(ranges, scheduler, top_level_row)
129 .boxed(),
130 )
131 } else {
132 trace!("No values fetch needed since values all null");
133 None
134 };
135
136 async move {
137 let mode = match (values_future, validity_future) {
138 (None, None) => DataNullStatus::All,
139 (Some(values_future), None) => DataNullStatus::None(values_future.await?),
140 (Some(values_future), Some(validity_future)) => {
141 DataNullStatus::Some(DataDecoders {
142 values: values_future.await?,
143 validity: validity_future.await?,
144 })
145 }
146 _ => unreachable!(),
147 };
148 Ok(Box::new(BasicPageDecoder { mode }) as Box<dyn PrimitivePageDecoder>)
149 }
150 .boxed()
151 }
152}
153
154struct BasicPageDecoder {
155 mode: DataNullStatus,
156}
157
158impl PrimitivePageDecoder for BasicPageDecoder {
159 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
160 match &self.mode {
161 DataNullStatus::Some(decoders) => {
162 let validity = decoders.validity.decode(rows_to_skip, num_rows)?;
163 let validity = validity.as_fixed_width().unwrap();
164 let values = decoders.values.decode(rows_to_skip, num_rows)?;
165 Ok(DataBlock::Nullable(NullableDataBlock {
166 data: Box::new(values),
167 nulls: validity.data,
168 block_info: BlockInfo::new(),
169 }))
170 }
171 DataNullStatus::All => Ok(DataBlock::AllNull(AllNullDataBlock {
172 num_values: num_rows,
173 })),
174 DataNullStatus::None(values) => values.decode(rows_to_skip, num_rows),
175 }
176 }
177}
178
179#[derive(Debug)]
180pub struct BasicEncoder {
181 values_encoder: Box<dyn ArrayEncoder>,
182}
183
184impl BasicEncoder {
185 pub fn new(values_encoder: Box<dyn ArrayEncoder>) -> Self {
186 Self { values_encoder }
187 }
188}
189
190impl ArrayEncoder for BasicEncoder {
191 fn encode(
192 &self,
193 data: DataBlock,
194 data_type: &DataType,
195 buffer_index: &mut u32,
196 ) -> Result<EncodedArray> {
197 match data {
198 DataBlock::AllNull(_) => {
199 let encoding = ProtobufUtils::basic_all_null_encoding();
200 Ok(EncodedArray { data, encoding })
201 }
202 DataBlock::Nullable(nullable) => {
203 let validity_buffer_index = *buffer_index;
204 *buffer_index += 1;
205
206 let validity_desc = ProtobufUtils::flat_encoding(
207 1,
208 validity_buffer_index,
209 None,
210 );
211 let encoded_values =
212 self.values_encoder
213 .encode(*nullable.data, data_type, buffer_index)?;
214 let encoding =
215 ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
216 let encoded = DataBlock::Nullable(NullableDataBlock {
217 data: Box::new(encoded_values.data),
218 nulls: nullable.nulls,
219 block_info: BlockInfo::new(),
220 });
221 Ok(EncodedArray {
222 data: encoded,
223 encoding,
224 })
225 }
226 _ => {
227 let encoded_values = self.values_encoder.encode(data, data_type, buffer_index)?;
228 let encoding = ProtobufUtils::basic_no_null_encoding(encoded_values.encoding);
229 Ok(EncodedArray {
230 data: encoded_values.data,
231 encoding,
232 })
233 }
234 }
235 }
236}