1use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
4
5use arrow::datatypes::UInt32Type;
6use arrow_array::{PrimitiveArray, UInt32Array};
7use snafu::location;
8
9use lance_core::{Error, Result};
10
11pub mod encodings;
12pub mod ffi;
13pub mod local;
14pub mod object_reader;
15pub mod object_store;
16pub mod object_writer;
17pub mod scheduler;
18pub mod stream;
19#[cfg(test)]
20pub mod testing;
21pub mod traits;
22pub mod utils;
23
24pub use scheduler::{bytes_read_counter, iops_counter};
25
26#[derive(Debug, Clone)]
28pub enum ReadBatchParams {
29 Range(Range<usize>),
31 RangeFull,
33 RangeTo(RangeTo<usize>),
35 RangeFrom(RangeFrom<usize>),
37 Indices(UInt32Array),
39}
40
41impl std::fmt::Display for ReadBatchParams {
42 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
43 match self {
44 Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
45 Self::RangeFull => write!(f, "RangeFull"),
46 Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
47 Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
48 Self::Indices(indices) => {
49 let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
50 acc.push_str(&v.to_string());
51 acc.push(',');
52 acc
53 });
54 if !indices_str.is_empty() {
55 indices_str.pop();
56 }
57 write!(f, "Indices({})", indices_str)
58 }
59 }
60 }
61}
62
63impl Default for ReadBatchParams {
64 fn default() -> Self {
65 Self::RangeFull
67 }
68}
69
70impl From<&[u32]> for ReadBatchParams {
71 fn from(value: &[u32]) -> Self {
72 Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
73 }
74}
75
76impl From<UInt32Array> for ReadBatchParams {
77 fn from(value: UInt32Array) -> Self {
78 Self::Indices(value)
79 }
80}
81
82impl From<RangeFull> for ReadBatchParams {
83 fn from(_: RangeFull) -> Self {
84 Self::RangeFull
85 }
86}
87
88impl From<Range<usize>> for ReadBatchParams {
89 fn from(r: Range<usize>) -> Self {
90 Self::Range(r)
91 }
92}
93
94impl From<RangeTo<usize>> for ReadBatchParams {
95 fn from(r: RangeTo<usize>) -> Self {
96 Self::RangeTo(r)
97 }
98}
99
100impl From<RangeFrom<usize>> for ReadBatchParams {
101 fn from(r: RangeFrom<usize>) -> Self {
102 Self::RangeFrom(r)
103 }
104}
105
106impl From<&Self> for ReadBatchParams {
107 fn from(params: &Self) -> Self {
108 params.clone()
109 }
110}
111
112impl ReadBatchParams {
113 pub fn valid_given_len(&self, len: usize) -> bool {
115 match self {
116 Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
117 Self::Range(r) => r.start < len && r.end <= len,
118 Self::RangeFull => true,
119 Self::RangeTo(r) => r.end <= len,
120 Self::RangeFrom(r) => r.start < len,
121 }
122 }
123
124 pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
144 let out_of_bounds = |size: usize| {
145 Err(Error::InvalidInput {
146 source: format!(
147 "Cannot slice from {} with length {} given a selection of size {}",
148 start, length, size
149 )
150 .into(),
151 location: location!(),
152 })
153 };
154
155 match self {
156 Self::Indices(indices) => {
157 if start + length > indices.len() {
158 return out_of_bounds(indices.len());
159 }
160 Ok(Self::Indices(indices.slice(start, length)))
161 }
162 Self::Range(r) => {
163 if (r.start + start + length) > r.end {
164 return out_of_bounds(r.end - r.start);
165 }
166 Ok(Self::Range((r.start + start)..(r.start + start + length)))
167 }
168 Self::RangeFull => Ok(Self::Range(start..(start + length))),
169 Self::RangeTo(range) => {
170 if start + length > range.end {
171 return out_of_bounds(range.end);
172 }
173 Ok(Self::Range(start..(start + length)))
174 }
175 Self::RangeFrom(r) => {
176 Ok(Self::Range((r.start + start)..(r.start + start + length)))
178 }
179 }
180 }
181
182 pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
187 match self {
188 Self::Indices(indices) => Ok(indices.clone()),
189 Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
190 r.start as u32..r.end as u32,
191 ))),
192 Self::RangeFull => Err(Error::invalid_input(
193 "cannot materialize RangeFull",
194 location!(),
195 )),
196 Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
197 Self::RangeFrom(_) => Err(Error::invalid_input(
198 "cannot materialize RangeFrom",
199 location!(),
200 )),
201 }
202 }
203
204 pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
205 match self {
206 Self::Indices(indices) => indices.clone(),
207 Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
208 Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
209 Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
210 Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
211 }
212 }
213}
214
215#[cfg(test)]
216mod test {
217 use std::ops::{RangeFrom, RangeTo};
218
219 use arrow_array::UInt32Array;
220
221 use crate::ReadBatchParams;
222
223 #[test]
224 fn test_params_to_offsets() {
225 let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
226 let offsets = params
227 .slice(base_offset, length)
228 .unwrap()
229 .to_offsets()
230 .unwrap();
231 let expected = UInt32Array::from(expected);
232 assert_eq!(offsets, expected);
233 };
234
235 check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
236 check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
237 check(
238 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
239 0,
240 100,
241 (500..600).collect(),
242 );
243 check(
244 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
245 100,
246 100,
247 (600..700).collect(),
248 );
249 check(
250 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
251 0,
252 100,
253 (0..100).collect(),
254 );
255 check(
256 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
257 200,
258 100,
259 (200..300).collect(),
260 );
261 check(
262 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
263 0,
264 2,
265 vec![1, 3],
266 );
267 check(
268 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
269 2,
270 2,
271 vec![5, 7],
272 );
273
274 let check_error = |params: ReadBatchParams, base_offset, length| {
275 assert!(params.slice(base_offset, length).is_err());
276 };
277
278 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
279 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
280 check_error(ReadBatchParams::Range(0..10), 5, 6);
281 check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
282
283 assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
284 assert!(ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
285 .to_offsets()
286 .is_err());
287 }
288}