lance_encoding/encodings/physical/
fixed_size_list.rs1use std::sync::Arc;
5
6use arrow_schema::DataType;
7use futures::{future::BoxFuture, FutureExt};
8use lance_core::Result;
9use log::trace;
10
11use crate::{
12 data::{DataBlock, FixedSizeListBlock},
13 decoder::{PageScheduler, PrimitivePageDecoder},
14 encoder::{ArrayEncoder, EncodedArray},
15 format::ProtobufUtils,
16 EncodingsIo,
17};
18
19#[derive(Debug)]
23pub struct FixedListScheduler {
24 items_scheduler: Box<dyn PageScheduler>,
25 dimension: u32,
26}
27
28impl FixedListScheduler {
29 pub fn new(items_scheduler: Box<dyn PageScheduler>, dimension: u32) -> Self {
30 Self {
31 items_scheduler,
32 dimension,
33 }
34 }
35}
36
37impl PageScheduler for FixedListScheduler {
38 fn schedule_ranges(
39 &self,
40 ranges: &[std::ops::Range<u64>],
41 scheduler: &Arc<dyn EncodingsIo>,
42 top_level_row: u64,
43 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
44 let expanded_ranges = ranges
45 .iter()
46 .map(|range| (range.start * self.dimension as u64)..(range.end * self.dimension as u64))
47 .collect::<Vec<_>>();
48 trace!(
49 "Expanding {} fsl ranges across {}..{} to item ranges across {}..{}",
50 ranges.len(),
51 ranges[0].start,
52 ranges[ranges.len() - 1].end,
53 expanded_ranges[0].start,
54 expanded_ranges[expanded_ranges.len() - 1].end
55 );
56 let inner_page_decoder =
57 self.items_scheduler
58 .schedule_ranges(&expanded_ranges, scheduler, top_level_row);
59 let dimension = self.dimension;
60 async move {
61 let items_decoder = inner_page_decoder.await?;
62 Ok(Box::new(FixedListDecoder {
63 items_decoder,
64 dimension: dimension as u64,
65 }) as Box<dyn PrimitivePageDecoder>)
66 }
67 .boxed()
68 }
69}
70
71pub struct FixedListDecoder {
72 items_decoder: Box<dyn PrimitivePageDecoder>,
73 dimension: u64,
74}
75
76impl PrimitivePageDecoder for FixedListDecoder {
77 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
78 let rows_to_skip = rows_to_skip * self.dimension;
79 let num_child_rows = num_rows * self.dimension;
80 let child_data = self.items_decoder.decode(rows_to_skip, num_child_rows)?;
81 Ok(DataBlock::FixedSizeList(FixedSizeListBlock {
82 child: Box::new(child_data),
83 dimension: self.dimension,
84 }))
85 }
86}
87
88#[derive(Debug)]
89pub struct FslEncoder {
90 items_encoder: Box<dyn ArrayEncoder>,
91 dimension: u32,
92}
93
94impl FslEncoder {
95 pub fn new(items_encoder: Box<dyn ArrayEncoder>, dimension: u32) -> Self {
96 Self {
97 items_encoder,
98 dimension,
99 }
100 }
101}
102
103impl ArrayEncoder for FslEncoder {
104 fn encode(
105 &self,
106 data: DataBlock,
107 data_type: &DataType,
108 buffer_index: &mut u32,
109 ) -> Result<EncodedArray> {
110 let inner_type = match data_type {
111 DataType::FixedSizeList(inner_field, _) => inner_field.data_type().clone(),
112 _ => panic!("Expected fixed size list data type and got {}", data_type),
113 };
114 let data = data.as_fixed_size_list().unwrap();
115 let child = *data.child;
116
117 let encoded_data = self
118 .items_encoder
119 .encode(child, &inner_type, buffer_index)?;
120
121 let data = DataBlock::FixedSizeList(FixedSizeListBlock {
122 child: Box::new(encoded_data.data),
123 dimension: self.dimension as u64,
124 });
125
126 let encoding = ProtobufUtils::fixed_size_list(encoded_data.encoding, self.dimension as u64);
127 Ok(EncodedArray { data, encoding })
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use std::{collections::HashMap, sync::Arc};
134
135 use arrow::datatypes::Int32Type;
136 use arrow_array::{FixedSizeListArray, Int32Array};
137 use arrow_buffer::{BooleanBuffer, NullBuffer};
138 use arrow_schema::{DataType, Field};
139 use lance_datagen::{array, gen_array, ArrayGeneratorExt, RowCount};
140 use rstest::rstest;
141
142 use crate::{
143 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
144 version::LanceFileVersion,
145 };
146
147 const PRIMITIVE_TYPES: &[DataType] = &[DataType::Int8, DataType::Float32, DataType::Float64];
148
149 #[rstest]
150 #[test_log::test(tokio::test)]
151 async fn test_value_fsl_primitive(
152 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
153 ) {
154 for data_type in PRIMITIVE_TYPES {
155 let inner_field = Field::new("item", data_type.clone(), true);
156 let data_type = DataType::FixedSizeList(Arc::new(inner_field), 16);
157 let field = Field::new("", data_type, false);
158 check_round_trip_encoding_random(field, version).await;
159 }
160 }
161
162 #[test_log::test(tokio::test)]
163 async fn test_simple_fsl() {
164 let items = Arc::new(Int32Array::from(vec![
165 Some(0),
166 None,
167 Some(2),
168 Some(3),
169 Some(4),
170 Some(5),
171 ]));
172 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
173 let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
174 let list = Arc::new(FixedSizeListArray::new(
175 items_field,
176 2,
177 items,
178 Some(list_nulls),
179 ));
180
181 let test_cases = TestCases::default()
182 .with_range(0..3)
183 .with_range(0..2)
184 .with_range(1..3)
185 .with_indices(vec![0, 1, 2])
186 .with_indices(vec![1])
187 .with_indices(vec![2])
188 .with_file_version(LanceFileVersion::V2_1);
189
190 check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
191 }
192
193 #[test_log::test(tokio::test)]
194 #[ignore]
195 async fn test_simple_wide_fsl() {
196 let items = gen_array(array::rand::<Int32Type>().with_random_nulls(0.1))
197 .into_array_rows(RowCount::from(4096))
198 .unwrap();
199 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
200 let list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));
201 let list = Arc::new(FixedSizeListArray::new(
202 items_field,
203 1024,
204 items,
205 Some(list_nulls),
206 ));
207
208 let test_cases = TestCases::default()
209 .with_range(0..3)
210 .with_range(0..2)
211 .with_range(1..3)
212 .with_indices(vec![0, 1, 2])
213 .with_indices(vec![1])
214 .with_indices(vec![2])
215 .with_file_version(LanceFileVersion::V2_1);
216
217 check_round_trip_encoding_of_data(vec![list], &test_cases, HashMap::default()).await;
218 }
219
220 #[test_log::test(tokio::test)]
221 async fn test_nested_fsl() {
222 let items = Arc::new(Int32Array::from(vec![
224 Some(0),
225 Some(1),
226 None,
227 None,
228 None,
229 None,
230 None,
231 None,
232 Some(8),
233 Some(9),
234 None,
235 Some(11),
236 ]));
237 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
238 let inner_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![
239 true, false, false, false, true, true,
240 ]));
241 let inner_list = Arc::new(FixedSizeListArray::new(
242 items_field.clone(),
243 2,
244 items,
245 Some(inner_list_nulls),
246 ));
247 let inner_list_field = Arc::new(Field::new(
248 "item",
249 DataType::FixedSizeList(items_field, 2),
250 true,
251 ));
252 let outer_list_nulls = NullBuffer::new(BooleanBuffer::from(vec![true, false, true]));
253 let outer_list = Arc::new(FixedSizeListArray::new(
254 inner_list_field,
255 2,
256 inner_list,
257 Some(outer_list_nulls),
258 ));
259
260 let test_cases = TestCases::default()
261 .with_range(0..3)
262 .with_range(0..2)
263 .with_range(1..3)
264 .with_indices(vec![0, 1, 2])
265 .with_indices(vec![2])
266 .with_file_version(LanceFileVersion::V2_1);
267
268 check_round_trip_encoding_of_data(vec![outer_list], &test_cases, HashMap::default()).await;
269 }
270}