lance_encoding/encodings/physical/
basic.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9
10use crate::{
11    data::{AllNullDataBlock, BlockInfo, DataBlock, NullableDataBlock},
12    decoder::{PageScheduler, PrimitivePageDecoder},
13    encoder::{ArrayEncoder, EncodedArray},
14    format::ProtobufUtils,
15    EncodingsIo,
16};
17
18use lance_core::Result;
19
20struct DataDecoders {
21    validity: Box<dyn PrimitivePageDecoder>,
22    values: Box<dyn PrimitivePageDecoder>,
23}
24
25enum DataNullStatus {
26    // Neither validity nor values
27    All,
28    // Values only
29    None(Box<dyn PrimitivePageDecoder>),
30    // Validity and values
31    Some(DataDecoders),
32}
33
34#[derive(Debug)]
35struct DataSchedulers {
36    validity: Box<dyn PageScheduler>,
37    values: Box<dyn PageScheduler>,
38}
39
40#[derive(Debug)]
41enum SchedulerNullStatus {
42    // Values only
43    None(Box<dyn PageScheduler>),
44    // Validity and values
45    Some(DataSchedulers),
46    // Neither validity nor values
47    All,
48}
49
50impl SchedulerNullStatus {
51    fn values_scheduler(&self) -> Option<&dyn PageScheduler> {
52        match self {
53            Self::All => None,
54            Self::None(values) => Some(values.as_ref()),
55            Self::Some(schedulers) => Some(schedulers.values.as_ref()),
56        }
57    }
58}
59
60/// A physical scheduler for "basic" fields.  These are fields that have an optional
61/// validity bitmap and some kind of values buffer.
62///
63/// No actual decoding happens here, we are simply aggregating the two buffers.
64///
65/// If everything is null then there are no data buffers at all.
66// TODO: Add support/tests for primitive nulls
67// TODO: Add tests for the all-null case
68//
69// Right now this is always present on primitive fields.  In the future we may use a
70// sentinel encoding instead.
71#[derive(Debug)]
72pub struct BasicPageScheduler {
73    mode: SchedulerNullStatus,
74}
75
76impl BasicPageScheduler {
77    /// Creates a new instance that expects a validity bitmap
78    pub fn new_nullable(
79        validity_decoder: Box<dyn PageScheduler>,
80        values_decoder: Box<dyn PageScheduler>,
81    ) -> Self {
82        Self {
83            mode: SchedulerNullStatus::Some(DataSchedulers {
84                validity: validity_decoder,
85                values: values_decoder,
86            }),
87        }
88    }
89
90    /// Create a new instance that does not need a validity bitmap because no item is null
91    pub fn new_non_nullable(values_decoder: Box<dyn PageScheduler>) -> Self {
92        Self {
93            mode: SchedulerNullStatus::None(values_decoder),
94        }
95    }
96
97    /// Create a new instance where all values are null
98    ///
99    /// It may seem strange we need `values_decoder` here but Arrow requires that value
100    /// buffers still be allocated / sized even if everything is null.  So we need the value
101    /// decoder to calculate the capacity of the garbage buffer.
102    pub fn new_all_null() -> Self {
103        Self {
104            mode: SchedulerNullStatus::All,
105        }
106    }
107}
108
109impl PageScheduler for BasicPageScheduler {
110    fn schedule_ranges(
111        &self,
112        ranges: &[std::ops::Range<u64>],
113        scheduler: &Arc<dyn EncodingsIo>,
114        top_level_row: u64,
115    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
116        let validity_future = match &self.mode {
117            SchedulerNullStatus::None(_) | SchedulerNullStatus::All => None,
118            SchedulerNullStatus::Some(schedulers) => Some(schedulers.validity.schedule_ranges(
119                ranges,
120                scheduler,
121                top_level_row,
122            )),
123        };
124
125        let values_future = if let Some(values_scheduler) = self.mode.values_scheduler() {
126            Some(
127                values_scheduler
128                    .schedule_ranges(ranges, scheduler, top_level_row)
129                    .boxed(),
130            )
131        } else {
132            trace!("No values fetch needed since values all null");
133            None
134        };
135
136        async move {
137            let mode = match (values_future, validity_future) {
138                (None, None) => DataNullStatus::All,
139                (Some(values_future), None) => DataNullStatus::None(values_future.await?),
140                (Some(values_future), Some(validity_future)) => {
141                    DataNullStatus::Some(DataDecoders {
142                        values: values_future.await?,
143                        validity: validity_future.await?,
144                    })
145                }
146                _ => unreachable!(),
147            };
148            Ok(Box::new(BasicPageDecoder { mode }) as Box<dyn PrimitivePageDecoder>)
149        }
150        .boxed()
151    }
152}
153
154struct BasicPageDecoder {
155    mode: DataNullStatus,
156}
157
158impl PrimitivePageDecoder for BasicPageDecoder {
159    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
160        match &self.mode {
161            DataNullStatus::Some(decoders) => {
162                let validity = decoders.validity.decode(rows_to_skip, num_rows)?;
163                let validity = validity.as_fixed_width().unwrap();
164                let values = decoders.values.decode(rows_to_skip, num_rows)?;
165                Ok(DataBlock::Nullable(NullableDataBlock {
166                    data: Box::new(values),
167                    nulls: validity.data,
168                    block_info: BlockInfo::new(),
169                }))
170            }
171            DataNullStatus::All => Ok(DataBlock::AllNull(AllNullDataBlock {
172                num_values: num_rows,
173            })),
174            DataNullStatus::None(values) => values.decode(rows_to_skip, num_rows),
175        }
176    }
177}
178
179#[derive(Debug)]
180pub struct BasicEncoder {
181    values_encoder: Box<dyn ArrayEncoder>,
182}
183
184impl BasicEncoder {
185    pub fn new(values_encoder: Box<dyn ArrayEncoder>) -> Self {
186        Self { values_encoder }
187    }
188}
189
190impl ArrayEncoder for BasicEncoder {
191    fn encode(
192        &self,
193        data: DataBlock,
194        data_type: &DataType,
195        buffer_index: &mut u32,
196    ) -> Result<EncodedArray> {
197        match data {
198            DataBlock::AllNull(_) => {
199                let encoding = ProtobufUtils::basic_all_null_encoding();
200                Ok(EncodedArray { data, encoding })
201            }
202            DataBlock::Nullable(nullable) => {
203                let validity_buffer_index = *buffer_index;
204                *buffer_index += 1;
205
206                let validity_desc = ProtobufUtils::flat_encoding(
207                    1,
208                    validity_buffer_index,
209                    /*compression=*/ None,
210                );
211                let encoded_values =
212                    self.values_encoder
213                        .encode(*nullable.data, data_type, buffer_index)?;
214                let encoding =
215                    ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
216                let encoded = DataBlock::Nullable(NullableDataBlock {
217                    data: Box::new(encoded_values.data),
218                    nulls: nullable.nulls,
219                    block_info: BlockInfo::new(),
220                });
221                Ok(EncodedArray {
222                    data: encoded,
223                    encoding,
224                })
225            }
226            _ => {
227                let encoded_values = self.values_encoder.encode(data, data_type, buffer_index)?;
228                let encoding = ProtobufUtils::basic_no_null_encoding(encoded_values.encoding);
229                Ok(EncodedArray {
230                    data: encoded_values.data,
231                    encoding,
232                })
233            }
234        }
235    }
236}