use std::ops::Range;
mod bitmap;
mod encoded_array;
mod index;
mod segment;
mod serde;
use deepsize::DeepSizeOf;
pub use index::RowIdIndex;
use lance_core::{utils::mask::RowIdTreeMap, Error, Result};
use lance_io::ReadBatchParams;
pub use serde::{read_row_ids, write_row_ids};
use snafu::{location, Location};
use segment::U64Segment;
use crate::utils::LanceIteratorExtension;
#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq)]
pub struct RowIdSequence(Vec<U64Segment>);
impl std::fmt::Display for RowIdSequence {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut iter = self.iter();
let mut first_10 = Vec::new();
let mut last_10 = Vec::new();
for row_id in iter.by_ref() {
first_10.push(row_id);
if first_10.len() > 10 {
break;
}
}
while let Some(row_id) = iter.next_back() {
last_10.push(row_id);
if last_10.len() > 10 {
break;
}
}
last_10.reverse();
let theres_more = iter.next().is_some();
write!(f, "[")?;
for row_id in first_10 {
write!(f, "{}", row_id)?;
}
if theres_more {
write!(f, ", ...")?;
}
for row_id in last_10 {
write!(f, ", {}", row_id)?;
}
write!(f, "]")
}
}
impl From<Range<u64>> for RowIdSequence {
fn from(range: Range<u64>) -> Self {
Self(vec![U64Segment::Range(range)])
}
}
impl RowIdSequence {
pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
self.0.iter().flat_map(|segment| segment.iter())
}
pub fn len(&self) -> u64 {
self.0.iter().map(|segment| segment.len() as u64).sum()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn extend(&mut self, other: Self) {
if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
(self.0.last(), other.0.first())
{
if range1.end == range2.start {
let new_range = U64Segment::Range(range1.start..range2.end);
self.0.pop();
self.0.push(new_range);
self.0.extend(other.0.into_iter().skip(1));
return;
}
}
self.0.extend(other.0);
}
pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
let (row_ids, offsets) = self.find_ids(row_ids);
let capacity = self.0.capacity();
let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
let mut remaining_segments = old_segments.as_slice();
for (segment_idx, range) in offsets {
let segments_handled = old_segments.len() - remaining_segments.len();
let segments_to_add = segment_idx - segments_handled;
self.0
.extend_from_slice(&remaining_segments[..segments_to_add]);
remaining_segments = &remaining_segments[segments_to_add..];
let segment;
(segment, remaining_segments) = remaining_segments.split_first().unwrap();
let segment_ids = &row_ids[range];
self.0.push(segment.delete(segment_ids));
}
self.0.extend_from_slice(remaining_segments);
}
pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
let mut local_positions = Vec::new();
let mut positions_iter = positions.into_iter();
let mut curr_position = positions_iter.next();
let mut offset = 0;
let mut cutoff = 0;
for segment in &mut self.0 {
cutoff += segment.len() as u32;
while let Some(position) = curr_position {
if position >= cutoff {
break;
}
local_positions.push(position - offset);
curr_position = positions_iter.next();
}
if !local_positions.is_empty() {
segment.mask(&local_positions);
local_positions.clear();
}
offset = cutoff;
}
self.0.retain(|segment| segment.len() != 0);
Ok(())
}
fn find_ids(
&self,
row_ids: impl IntoIterator<Item = u64>,
) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
let mut segment_iter = self.0.iter().enumerate().cycle();
let mut segment_matches = vec![Vec::new(); self.0.len()];
row_ids.into_iter().for_each(|row_id| {
let mut i = 0;
while i < self.0.len() {
let (segment_idx, segment) = segment_iter.next().unwrap();
if segment
.range()
.map_or(false, |range| range.contains(&row_id))
{
if let Some(offset) = segment.position(row_id) {
segment_matches.get_mut(segment_idx).unwrap().push(offset);
}
}
i += 1;
}
});
for matches in &mut segment_matches {
matches.sort_unstable();
}
let mut offset = 0;
let segment_ranges = segment_matches
.iter()
.enumerate()
.filter(|(_, matches)| !matches.is_empty())
.map(|(segment_idx, matches)| {
let range = offset..offset + matches.len();
offset += matches.len();
(segment_idx, range)
})
.collect();
let row_ids = segment_matches
.into_iter()
.enumerate()
.flat_map(|(segment_idx, offset)| {
offset
.into_iter()
.map(move |offset| self.0[segment_idx].get(offset).unwrap())
})
.collect();
(row_ids, segment_ranges)
}
pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
if len == 0 {
return RowIdSeqSlice {
segments: &[],
offset_start: 0,
offset_last: 0,
};
}
let mut offset_start = offset;
let mut segment_offset = 0;
for segment in &self.0 {
let segment_len = segment.len();
if offset_start < segment_len {
break;
}
offset_start -= segment_len;
segment_offset += 1;
}
let mut offset_last = offset_start + len;
let mut segment_offset_last = segment_offset;
for segment in &self.0[segment_offset..] {
let segment_len = segment.len();
if offset_last <= segment_len {
break;
}
offset_last -= segment_len;
segment_offset_last += 1;
}
RowIdSeqSlice {
segments: &self.0[segment_offset..=segment_offset_last],
offset_start,
offset_last,
}
}
pub fn get(&self, index: usize) -> Option<u64> {
let mut offset = 0;
for segment in &self.0 {
let segment_len = segment.len();
if index < offset + segment_len {
return segment.get(index - offset);
}
offset += segment_len;
}
None
}
}
impl From<&RowIdSequence> for RowIdTreeMap {
fn from(row_ids: &RowIdSequence) -> Self {
let mut tree_map = Self::new();
for segment in &row_ids.0 {
match segment {
U64Segment::Range(range) => {
tree_map.insert_range(range.clone());
}
U64Segment::RangeWithBitmap { range, bitmap } => {
tree_map.insert_range(range.clone());
for (i, val) in range.clone().enumerate() {
if !bitmap.get(i) {
tree_map.remove(val);
}
}
}
U64Segment::RangeWithHoles { range, holes } => {
tree_map.insert_range(range.clone());
for hole in holes.iter() {
tree_map.remove(hole);
}
}
U64Segment::SortedArray(array) | U64Segment::Array(array) => {
for val in array.iter() {
tree_map.insert(val);
}
}
}
}
tree_map
}
}
#[derive(Debug)]
pub struct RowIdSeqSlice<'a> {
segments: &'a [U64Segment],
offset_start: usize,
offset_last: usize,
}
impl<'a> RowIdSeqSlice<'a> {
pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
known_size -= self.offset_start;
known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
let end = self.segments.len().saturating_sub(1);
self.segments
.iter()
.enumerate()
.flat_map(move |(i, segment)| {
match i {
0 if self.segments.len() == 1 => {
let len = self.offset_last - self.offset_start;
Box::new(segment.iter().skip(self.offset_start).take(len))
as Box<dyn Iterator<Item = u64>>
}
0 => Box::new(segment.iter().skip(self.offset_start)),
i if i == end => Box::new(segment.iter().take(self.offset_last)),
_ => Box::new(segment.iter()),
}
})
.exact_size(known_size)
}
}
pub fn rechunk_sequences(
sequences: impl IntoIterator<Item = RowIdSequence>,
chunk_sizes: impl IntoIterator<Item = u64>,
) -> Result<Vec<RowIdSequence>> {
let chunk_size_iter = chunk_sizes.into_iter();
let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
let mut segment_iter = sequences
.into_iter()
.flat_map(|sequence| sequence.0.into_iter())
.peekable();
let mut segment_offset = 0_u64;
for chunk_size in chunk_size_iter {
let mut sequence = RowIdSequence(Vec::new());
let mut remaining = chunk_size;
let too_many_segments_error = || {
Error::invalid_input(
"Got too many segments for the provided chunk lengths",
location!(),
)
};
while remaining > 0 {
let remaining_in_segment = segment_iter
.peek()
.map_or(0, |segment| segment.len() as u64 - segment_offset);
match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
(std::cmp::Ordering::Greater, _) => {
let segment = segment_iter
.peek()
.ok_or_else(too_many_segments_error)?
.slice(segment_offset as usize, remaining as usize);
sequence.extend(RowIdSequence(vec![segment]));
segment_offset += remaining;
remaining = 0;
}
(_, 0) => {
let segment = segment_iter.next().ok_or_else(too_many_segments_error)?;
sequence.extend(RowIdSequence(vec![segment]));
remaining = 0;
}
(_, _) => {
let segment = segment_iter
.next()
.ok_or_else(too_many_segments_error)?
.slice(segment_offset as usize, remaining_in_segment as usize);
sequence.extend(RowIdSequence(vec![segment]));
segment_offset = 0;
remaining -= remaining_in_segment;
}
}
}
chunked_sequences.push(sequence);
}
if segment_iter.peek().is_some() {
return Err(Error::invalid_input(
"Got too few segments for the provided chunk lengths",
location!(),
));
}
Ok(chunked_sequences)
}
pub fn select_row_ids<'a>(
sequence: &'a RowIdSequence,
offsets: &'a ReadBatchParams,
) -> Result<Vec<u64>> {
let out_of_bounds_err = |offset: u32| {
Error::invalid_input(
format!(
"Index out of bounds: {} for sequence of length {}",
offset,
sequence.len()
),
location!(),
)
};
match offsets {
ReadBatchParams::Indices(indices) => indices
.values()
.iter()
.map(|index| {
sequence
.get(*index as usize)
.ok_or_else(|| out_of_bounds_err(*index))
})
.collect(),
ReadBatchParams::Range(range) => {
if range.end > sequence.len() as usize {
return Err(out_of_bounds_err(range.end as u32));
}
let sequence = sequence.slice(range.start, range.end - range.start);
Ok(sequence.iter().collect())
}
ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
ReadBatchParams::RangeTo(to) => {
if to.end > sequence.len() as usize {
return Err(out_of_bounds_err(to.end as u32));
}
let len = to.end;
let sequence = sequence.slice(0, len);
Ok(sequence.iter().collect())
}
ReadBatchParams::RangeFrom(from) => {
let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
Ok(sequence.iter().collect())
}
}
}
#[cfg(test)]
mod test {
use super::*;
use pretty_assertions::assert_eq;
use test::bitmap::Bitmap;
#[test]
fn test_row_id_sequence_from_range() {
let sequence = RowIdSequence::from(0..10);
assert_eq!(sequence.len(), 10);
assert_eq!(sequence.is_empty(), false);
let iter = sequence.iter();
assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
}
#[test]
fn test_row_id_sequence_extend() {
let mut sequence = RowIdSequence::from(0..10);
sequence.extend(RowIdSequence::from(10..20));
assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
let mut sequence = RowIdSequence::from(0..10);
sequence.extend(RowIdSequence::from(20..30));
assert_eq!(
sequence.0,
vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
);
}
#[test]
fn test_row_id_sequence_delete() {
let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![1, 3, 5, 7, 9]);
let mut expected_bitmap = Bitmap::new_empty(9);
for i in [0, 2, 4, 6, 8] {
expected_bitmap.set(i as usize);
}
assert_eq!(
sequence.0,
vec![U64Segment::RangeWithBitmap {
range: 0..9,
bitmap: expected_bitmap
},]
);
let mut sequence = RowIdSequence::from(0..10);
sequence.extend(RowIdSequence::from(12..20));
sequence.delete(vec![0, 9, 10, 11, 12, 13]);
assert_eq!(
sequence.0,
vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
);
let mut sequence = RowIdSequence::from(0..10);
sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
}
#[test]
fn test_row_id_slice() {
let sequence = RowIdSequence(vec![
U64Segment::Range(30..35), U64Segment::RangeWithHoles {
range: 50..60,
holes: vec![53, 54].into(),
},
U64Segment::SortedArray(vec![7, 9].into()), U64Segment::RangeWithBitmap {
range: 0..5,
bitmap: [true, false, true, false, true].as_slice().into(),
},
U64Segment::Array(vec![35, 39].into()),
U64Segment::Range(40..50),
]);
for offset in 0..sequence.len() as usize {
for len in 0..sequence.len() as usize {
if offset + len > sequence.len() as usize {
continue;
}
let slice = sequence.slice(offset, len);
let actual = slice.iter().collect::<Vec<_>>();
let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
assert_eq!(
actual, expected,
"Failed for offset {} and len {}",
offset, len
);
let (claimed_size, claimed_max) = slice.iter().size_hint();
assert_eq!(claimed_max, Some(claimed_size)); assert_eq!(claimed_size, actual.len()); }
}
}
#[test]
fn test_row_id_slice_empty() {
let sequence = RowIdSequence::from(0..10);
let slice = sequence.slice(10, 0);
assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
}
#[test]
fn test_row_id_sequence_rechunk() {
fn assert_rechunked(
input: Vec<RowIdSequence>,
chunk_sizes: Vec<u64>,
expected: Vec<RowIdSequence>,
) {
let chunked = rechunk_sequences(input, chunk_sizes).unwrap();
assert_eq!(chunked, expected);
}
let many_segments = vec![
RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
RowIdSequence::from(10..18),
RowIdSequence::from(18..28),
RowIdSequence::from(28..30),
];
let fewer_segments = vec![
RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
RowIdSequence::from(10..30),
];
assert_rechunked(
many_segments.clone(),
fewer_segments.iter().map(|seq| seq.len()).collect(),
fewer_segments.clone(),
);
assert_rechunked(
fewer_segments,
many_segments.iter().map(|seq| seq.len()).collect(),
many_segments.clone(),
);
assert_rechunked(
many_segments.clone(),
many_segments.iter().map(|seq| seq.len()).collect(),
many_segments.clone(),
);
let result = rechunk_sequences(many_segments.clone(), vec![100]);
assert!(result.is_err());
let result = rechunk_sequences(many_segments, vec![5]);
assert!(result.is_err());
}
#[test]
fn test_select_row_ids() {
let offsets = [
ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
ReadBatchParams::Range(2..8),
ReadBatchParams::RangeFull,
ReadBatchParams::RangeTo(..5),
ReadBatchParams::RangeFrom(5..),
];
let sequences = [
RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::RangeWithHoles {
range: 50..60,
holes: vec![53, 54].into(),
},
U64Segment::SortedArray(vec![7, 9].into()),
]),
RowIdSequence(vec![
U64Segment::RangeWithBitmap {
range: 0..5,
bitmap: [true, false, true, false, true].as_slice().into(),
},
U64Segment::Array(vec![30, 20, 10].into()),
U64Segment::Range(40..50),
]),
];
for params in offsets {
for sequence in &sequences {
let row_ids = select_row_ids(sequence, ¶ms).unwrap();
let flat_sequence = sequence.iter().collect::<Vec<_>>();
let selection: Vec<usize> = match ¶ms {
ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
ReadBatchParams::Range(range) => range.clone().collect(),
ReadBatchParams::Indices(indices) => {
indices.values().iter().map(|i| *i as usize).collect()
}
};
let expected = selection
.into_iter()
.map(|i| flat_sequence[i])
.collect::<Vec<_>>();
assert_eq!(
row_ids, expected,
"Failed for params {:?} on the sequence {:?}",
¶ms, sequence
);
}
}
}
#[test]
fn test_select_row_ids_out_of_bounds() {
let offsets = [
ReadBatchParams::Indices(vec![1, 1000, 4].into()),
ReadBatchParams::Range(2..1000),
ReadBatchParams::RangeTo(..1000),
];
let sequence = RowIdSequence::from(0..10);
for params in offsets {
let result = select_row_ids(&sequence, ¶ms);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
}
}
#[test]
fn test_row_id_sequence_to_treemap() {
let sequence = RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::RangeWithHoles {
range: 50..60,
holes: vec![53, 54].into(),
},
U64Segment::SortedArray(vec![7, 9].into()),
U64Segment::RangeWithBitmap {
range: 10..15,
bitmap: [true, false, true, false, true].as_slice().into(),
},
U64Segment::Array(vec![35, 39].into()),
U64Segment::Range(40..50),
]);
let tree_map = RowIdTreeMap::from(&sequence);
let expected = vec![
0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 55, 56, 57, 58, 59,
]
.into_iter()
.collect::<RowIdTreeMap>();
assert_eq!(tree_map, expected);
}
#[test]
fn test_row_id_mask() {
let sequence = RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::RangeWithHoles {
range: 50..60,
holes: vec![53, 54].into(),
},
U64Segment::SortedArray(vec![7, 9].into()),
U64Segment::RangeWithBitmap {
range: 10..15,
bitmap: [true, false, true, false, true].as_slice().into(),
},
U64Segment::Array(vec![35, 39].into()),
]);
let values_to_remove = [4, 55, 7, 12, 39];
let positions_to_remove = sequence
.iter()
.enumerate()
.filter_map(|(i, val)| {
if values_to_remove.contains(&val) {
Some(i as u32)
} else {
None
}
})
.collect::<Vec<_>>();
let mut sequence = sequence;
sequence.mask(positions_to_remove).unwrap();
let expected = RowIdSequence(vec![
U64Segment::Range(0..4),
U64Segment::RangeWithBitmap {
range: 50..60,
bitmap: [
true, true, true, false, false, false, true, true, true, true,
]
.as_slice()
.into(),
},
U64Segment::Range(9..10),
U64Segment::RangeWithBitmap {
range: 10..15,
bitmap: [true, false, false, false, true].as_slice().into(),
},
U64Segment::Array(vec![35].into()),
]);
assert_eq!(sequence, expected);
}
#[test]
fn test_row_id_mask_everything() {
let mut sequence = RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::SortedArray(vec![7, 9].into()),
]);
sequence.mask(0..sequence.len() as u32).unwrap();
let expected = RowIdSequence(vec![]);
assert_eq!(sequence, expected);
}
}