lance_table/rowids/
index.rsuse std::ops::RangeInclusive;
use std::sync::Arc;
use deepsize::DeepSizeOf;
use lance_core::utils::address::RowAddress;
use lance_core::{Error, Result};
use rangemap::RangeInclusiveMap;
use snafu::{location, Location};
use super::{RowIdSequence, U64Segment};
#[derive(Debug)]
pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
impl RowIdIndex {
pub fn new(fragment_indices: &[(u32, Arc<RowIdSequence>)]) -> Result<Self> {
let mut pieces = fragment_indices
.iter()
.flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
.collect::<Vec<_>>();
pieces.sort_by_key(|(range, _)| *range.start());
if pieces.windows(2).any(|w| w[0].0.end() >= w[1].0.start()) {
return Err(Error::NotSupported {
source: "Overlapping ranges are not yet supported".into(),
location: location!(),
});
}
Ok(Self(RangeInclusiveMap::from_iter(pieces)))
}
pub fn get(&self, row_id: u64) -> Option<RowAddress> {
let (row_id_segment, address_segment) = self.0.get(&row_id)?;
let pos = row_id_segment.position(row_id)?;
let address = address_segment.get(pos)?;
Some(RowAddress::from(address))
}
}
impl DeepSizeOf for RowIdIndex {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.0
.iter()
.map(|(_, (row_id_segment, address_segment))| {
(2 * std::mem::size_of::<u64>())
+ std::mem::size_of::<(U64Segment, U64Segment)>()
+ row_id_segment.deep_size_of_children(context)
+ address_segment.deep_size_of_children(context)
})
.sum()
}
}
fn decompose_sequence(
fragment_id: u32,
sequence: &RowIdSequence,
) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
sequence
.0
.iter()
.filter_map(|segment| {
let segment_len = segment.len() as u64;
let address_segment = U64Segment::Range(start_address..(start_address + segment_len));
start_address += segment_len;
let coverage = segment.range()?;
Some((coverage, (segment.clone(), address_segment)))
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_index() {
let fragment_indices = vec![
(
10,
Arc::new(RowIdSequence(vec![
U64Segment::Range(0..10),
U64Segment::RangeWithHoles {
range: 10..17,
holes: vec![12, 15].into(),
},
U64Segment::SortedArray(vec![20, 25, 30].into()),
])),
),
(
20,
Arc::new(RowIdSequence(vec![
U64Segment::RangeWithBitmap {
range: 17..20,
bitmap: [true, false, true].as_slice().into(),
},
U64Segment::Array(vec![40, 50, 60].into()),
])),
),
];
let index = RowIdIndex::new(&fragment_indices).unwrap();
assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
assert_eq!(index.get(15), None);
assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
assert_eq!(index.get(61), None);
}
}