lance_encoding/encodings/physical/
fixed_size_list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use lance_core::Result;
9use log::trace;
10
11use crate::{
12    data::{DataBlock, FixedSizeListBlock},
13    decoder::{PageScheduler, PrimitivePageDecoder},
14    encoder::{ArrayEncoder, EncodedArray},
15    format::ProtobufUtils,
16    EncodingsIo,
17};
18
19/// A scheduler for fixed size lists of primitive values
20///
21/// This scheduler is, itself, primitive
22#[derive(Debug)]
23pub struct FixedListScheduler {
24    items_scheduler: Box<dyn PageScheduler>,
25    dimension: u32,
26}
27
28impl FixedListScheduler {
29    pub fn new(items_scheduler: Box<dyn PageScheduler>, dimension: u32) -> Self {
30        Self {
31            items_scheduler,
32            dimension,
33        }
34    }
35}
36
37impl PageScheduler for FixedListScheduler {
38    fn schedule_ranges(
39        &self,
40        ranges: &[std::ops::Range<u64>],
41        scheduler: &Arc<dyn EncodingsIo>,
42        top_level_row: u64,
43    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
44        let expanded_ranges = ranges
45            .iter()
46            .map(|range| (range.start * self.dimension as u64)..(range.end * self.dimension as u64))
47            .collect::<Vec<_>>();
48        trace!(
49            "Expanding {} fsl ranges across {}..{} to item ranges across {}..{}",
50            ranges.len(),
51            ranges[0].start,
52            ranges[ranges.len() - 1].end,
53            expanded_ranges[0].start,
54            expanded_ranges[expanded_ranges.len() - 1].end
55        );
56        let inner_page_decoder =
57            self.items_scheduler
58                .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59        let dimension = self.dimension;
60        async move {
61            let items_decoder = inner_page_decoder.await?;
62            Ok(Box::new(FixedListDecoder {
63                items_decoder,
64                dimension: dimension as u64,
65            }) as Box<dyn PrimitivePageDecoder>)
66        }
67        .boxed()
68    }
69}
70
71pub struct FixedListDecoder {
72    items_decoder: Box<dyn PrimitivePageDecoder>,
73    dimension: u64,
74}
75
76impl PrimitivePageDecoder for FixedListDecoder {
77    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
78        let rows_to_skip = rows_to_skip * self.dimension;
79        let num_child_rows = num_rows * self.dimension;
80        let child_data = self.items_decoder.decode(rows_to_skip, num_child_rows)?;
81        Ok(DataBlock::FixedSizeList(FixedSizeListBlock {
82            child: Box::new(child_data),
83            dimension: self.dimension,
84        }))
85    }
86}
87
88#[derive(Debug)]
89pub struct FslEncoder {
90    items_encoder: Box<dyn ArrayEncoder>,
91    dimension: u32,
92}
93
94impl FslEncoder {
95    pub fn new(items_encoder: Box<dyn ArrayEncoder>, dimension: u32) -> Self {
96        Self {
97            items_encoder,
98            dimension,
99        }
100    }
101}
102
103impl ArrayEncoder for FslEncoder {
104    fn encode(
105        &self,
106        data: DataBlock,
107        data_type: &DataType,
108        buffer_index: &mut u32,
109    ) -> Result<EncodedArray> {
110        let inner_type = match data_type {
111            DataType::FixedSizeList(inner_field, _) => inner_field.data_type().clone(),
112            _ => panic!("Expected fixed size list data type and got {}", data_type),
113        };
114        let data = data.as_fixed_size_list().unwrap();
115        let child = *data.child;
116
117        let encoded_data = self
118            .items_encoder
119            .encode(child, &inner_type, buffer_index)?;
120
121        let data = DataBlock::FixedSizeList(FixedSizeListBlock {
122            child: Box::new(encoded_data.data),
123            dimension: self.dimension as u64,
124        });
125
126        let encoding = ProtobufUtils::fixed_size_list(encoded_data.encoding, self.dimension as u64);
127        Ok(EncodedArray { data, encoding })
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::{collections::HashMap, sync::Arc};
134
135    use arrow::datatypes::Int32Type;
136    use arrow_array::{FixedSizeListArray, Int32Array};
137    use arrow_buffer::{BooleanBuffer, NullBuffer};
138    use arrow_schema::{DataType, Field};
139    use lance_datagen::{array, gen_array, ArrayGeneratorExt, RowCount};
140    use rstest::rstest;
141
142    use crate::{
143        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
144        version::LanceFileVersion,
145    };
146
147    const PRIMITIVE_TYPES: &[DataType] = &[DataType::Int8, DataType::Float32, DataType::Float64];
148
149    #[rstest]
150    #[test_log::test(tokio::test)]
151    async fn test_value_fsl_primitive(
152        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
153    ) {
154        for data_type in PRIMITIVE_TYPES {
155            let inner_field = Field::new("item", data_type.clone(), true);
156            let data_type = DataType::FixedSizeList(Arc::new(inner_field), 16);
157            let field = Field::new("", data_type, false);
158            check_round_trip_encoding_random(field, version).await;
159        }
160    }
161
162    #[test_log::test(tokio::test)]
163    async fn test_simple_fsl() {
164        let items = Arc::new(Int32Array::from(vec![
165            Some(0),
166            None,
167            Some(2),
168            Some(3),
169            Some(4),
170            Some(5),
171        ]));
172        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
173        let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
174        let list = Arc::new(FixedSizeListArray::new(
175            items_field,
176            2,
177            items,
178            Some(list_nulls),
179        ));
180
181        let test_cases = TestCases::default()
182            .with_range(0..3)
183            .with_range(0..2)
184            .with_range(1..3)
185            .with_indices(vec![0, 1, 2])
186            .with_indices(vec![1])
187            .with_indices(vec![2])
188            .with_file_version(LanceFileVersion::V2_1);
189
190        check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
191    }
192
193    #[test_log::test(tokio::test)]
194    #[ignore]
195    async fn test_simple_wide_fsl() {
196        let items = gen_array(array::rand::<Int32Type>().with_random_nulls(0.1))
197            .into_array_rows(RowCount::from(4096))
198            .unwrap();
199        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
200        let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));
201        let list = Arc::new(FixedSizeListArray::new(
202            items_field,
203            1024,
204            items,
205            Some(list_nulls),
206        ));
207
208        let test_cases = TestCases::default()
209            .with_range(0..3)
210            .with_range(0..2)
211            .with_range(1..3)
212            .with_indices(vec![0, 1, 2])
213            .with_indices(vec![1])
214            .with_indices(vec![2])
215            .with_file_version(LanceFileVersion::V2_1);
216
217        check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
218    }
219
220    #[test_log::test(tokio::test)]
221    async fn test_nested_fsl() {
222        // [[0, 1], NULL], NULL, [[8, 9], [NULL, 11]]
223        let items = Arc::new(Int32Array::from(vec![
224            Some(0),
225            Some(1),
226            None,
227            None,
228            None,
229            None,
230            None,
231            None,
232            Some(8),
233            Some(9),
234            None,
235            Some(11),
236        ]));
237        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
238        let inner_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![
239            true, false, false, false, true, true,
240        ]));
241        let inner_list = Arc::new(FixedSizeListArray::new(
242            items_field.clone(),
243            2,
244            items,
245            Some(inner_list_nulls),
246        ));
247        let inner_list_field = Arc::new(Field::new(
248            "item",
249            DataType::FixedSizeList(items_field, 2),
250            true,
251        ));
252        let outer_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
253        let outer_list = Arc::new(FixedSizeListArray::new(
254            inner_list_field,
255            2,
256            inner_list,
257            Some(outer_list_nulls),
258        ));
259
260        let test_cases = TestCases::default()
261            .with_range(0..3)
262            .with_range(0..2)
263            .with_range(1..3)
264            .with_indices(vec![0, 1, 2])
265            .with_indices(vec![2])
266            .with_file_version(LanceFileVersion::V2_1);
267
268        check_round_trip_encoding_of_data(vec![outer_list], &test_cases, HashMap::default()).await;
269    }
270}