lance_io/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
4
5use arrow::datatypes::UInt32Type;
6use arrow_array::{PrimitiveArray, UInt32Array};
7use snafu::location;
8
9use lance_core::{Error, Result};
10
11pub mod encodings;
12pub mod ffi;
13pub mod local;
14pub mod object_reader;
15pub mod object_store;
16pub mod object_writer;
17pub mod scheduler;
18pub mod stream;
19#[cfg(test)]
20pub mod testing;
21pub mod traits;
22pub mod utils;
23
24pub use scheduler::{bytes_read_counter, iops_counter};
25
26/// Defines a selection of rows to read from a file/batch
27#[derive(Debug, Clone)]
28pub enum ReadBatchParams {
29    /// Select a contiguous range of rows
30    Range(Range<usize>),
31    /// Select all rows (this is the default)
32    RangeFull,
33    /// Select all rows up to a given index
34    RangeTo(RangeTo<usize>),
35    /// Select all rows starting at a given index
36    RangeFrom(RangeFrom<usize>),
37    /// Select scattered non-contiguous rows
38    Indices(UInt32Array),
39}
40
41impl std::fmt::Display for ReadBatchParams {
42    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
43        match self {
44            Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
45            Self::RangeFull => write!(f, "RangeFull"),
46            Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
47            Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
48            Self::Indices(indices) => {
49                let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
50                    acc.push_str(&v.to_string());
51                    acc.push(',');
52                    acc
53                });
54                if !indices_str.is_empty() {
55                    indices_str.pop();
56                }
57                write!(f, "Indices({})", indices_str)
58            }
59        }
60    }
61}
62
63impl Default for ReadBatchParams {
64    fn default() -> Self {
65        // Default of ReadBatchParams is reading the full batch.
66        Self::RangeFull
67    }
68}
69
70impl From<&[u32]> for ReadBatchParams {
71    fn from(value: &[u32]) -> Self {
72        Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
73    }
74}
75
76impl From<UInt32Array> for ReadBatchParams {
77    fn from(value: UInt32Array) -> Self {
78        Self::Indices(value)
79    }
80}
81
82impl From<RangeFull> for ReadBatchParams {
83    fn from(_: RangeFull) -> Self {
84        Self::RangeFull
85    }
86}
87
88impl From<Range<usize>> for ReadBatchParams {
89    fn from(r: Range<usize>) -> Self {
90        Self::Range(r)
91    }
92}
93
94impl From<RangeTo<usize>> for ReadBatchParams {
95    fn from(r: RangeTo<usize>) -> Self {
96        Self::RangeTo(r)
97    }
98}
99
100impl From<RangeFrom<usize>> for ReadBatchParams {
101    fn from(r: RangeFrom<usize>) -> Self {
102        Self::RangeFrom(r)
103    }
104}
105
106impl From<&Self> for ReadBatchParams {
107    fn from(params: &Self) -> Self {
108        params.clone()
109    }
110}
111
112impl ReadBatchParams {
113    /// Validate that the selection is valid given the length of the batch
114    pub fn valid_given_len(&self, len: usize) -> bool {
115        match self {
116            Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
117            Self::Range(r) => r.start < len && r.end <= len,
118            Self::RangeFull => true,
119            Self::RangeTo(r) => r.end <= len,
120            Self::RangeFrom(r) => r.start < len,
121        }
122    }
123
124    /// Slice the selection
125    ///
126    /// For example, given ReadBatchParams::RangeFull and slice(10, 20), the output will be
127    /// ReadBatchParams::Range(10..20)
128    ///
129    /// Given ReadBatchParams::Range(10..20) and slice(5, 3), the output will be
130    /// ReadBatchParams::Range(15..18)
131    ///
132    /// Given ReadBatchParams::RangeTo(20) and slice(10, 5), the output will be
133    /// ReadBatchParams::Range(10..15)
134    ///
135    /// Given ReadBatchParams::RangeFrom(20) and slice(10, 5), the output will be
136    /// ReadBatchParams::Range(30..35)
137    ///
138    /// Given ReadBatchParams::Indices([1, 3, 5, 7, 9]) and slice(1, 3), the output will be
139    /// ReadBatchParams::Indices([3, 5, 7])
140    ///
141    /// You cannot slice beyond the bounds of the selection and an attempt to do so will
142    /// return an error.
143    pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
144        let out_of_bounds = |size: usize| {
145            Err(Error::InvalidInput {
146                source: format!(
147                    "Cannot slice from {} with length {} given a selection of size {}",
148                    start, length, size
149                )
150                .into(),
151                location: location!(),
152            })
153        };
154
155        match self {
156            Self::Indices(indices) => {
157                if start + length > indices.len() {
158                    return out_of_bounds(indices.len());
159                }
160                Ok(Self::Indices(indices.slice(start, length)))
161            }
162            Self::Range(r) => {
163                if (r.start + start + length) > r.end {
164                    return out_of_bounds(r.end - r.start);
165                }
166                Ok(Self::Range((r.start + start)..(r.start + start + length)))
167            }
168            Self::RangeFull => Ok(Self::Range(start..(start + length))),
169            Self::RangeTo(range) => {
170                if start + length > range.end {
171                    return out_of_bounds(range.end);
172                }
173                Ok(Self::Range(start..(start + length)))
174            }
175            Self::RangeFrom(r) => {
176                // No way to validate out_of_bounds, assume caller will do so
177                Ok(Self::Range((r.start + start)..(r.start + start + length)))
178            }
179        }
180    }
181
182    /// Convert a read range into a vector of row offsets
183    ///
184    /// RangeFull and RangeFrom are unbounded and cannot be converted into row offsets
185    /// and any attempt to do so will return an error.  Call slice first
186    pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
187        match self {
188            Self::Indices(indices) => Ok(indices.clone()),
189            Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
190                r.start as u32..r.end as u32,
191            ))),
192            Self::RangeFull => Err(Error::invalid_input(
193                "cannot materialize RangeFull",
194                location!(),
195            )),
196            Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
197            Self::RangeFrom(_) => Err(Error::invalid_input(
198                "cannot materialize RangeFrom",
199                location!(),
200            )),
201        }
202    }
203
204    pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
205        match self {
206            Self::Indices(indices) => indices.clone(),
207            Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
208            Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
209            Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
210            Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
211        }
212    }
213}
214
215#[cfg(test)]
216mod test {
217    use std::ops::{RangeFrom, RangeTo};
218
219    use arrow_array::UInt32Array;
220
221    use crate::ReadBatchParams;
222
223    #[test]
224    fn test_params_to_offsets() {
225        let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
226            let offsets = params
227                .slice(base_offset, length)
228                .unwrap()
229                .to_offsets()
230                .unwrap();
231            let expected = UInt32Array::from(expected);
232            assert_eq!(offsets, expected);
233        };
234
235        check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
236        check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
237        check(
238            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
239            0,
240            100,
241            (500..600).collect(),
242        );
243        check(
244            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
245            100,
246            100,
247            (600..700).collect(),
248        );
249        check(
250            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
251            0,
252            100,
253            (0..100).collect(),
254        );
255        check(
256            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
257            200,
258            100,
259            (200..300).collect(),
260        );
261        check(
262            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
263            0,
264            2,
265            vec![1, 3],
266        );
267        check(
268            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
269            2,
270            2,
271            vec![5, 7],
272        );
273
274        let check_error = |params: ReadBatchParams, base_offset, length| {
275            assert!(params.slice(base_offset, length).is_err());
276        };
277
278        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
279        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
280        check_error(ReadBatchParams::Range(0..10), 5, 6);
281        check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
282
283        assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
284        assert!(ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
285            .to_offsets()
286            .is_err());
287    }
288}