1use std::{collections::HashSet, ops::Range, sync::Arc};
5
6use arrow_array::BooleanArray;
7use deepsize::{Context, DeepSizeOf};
8use roaring::RoaringBitmap;
9
10const BITMAP_THRESDHOLD: usize = 5_000;
12#[derive(Debug, Clone)]
16pub enum DeletionVector {
17 NoDeletions,
18 Set(HashSet<u32>),
19 Bitmap(RoaringBitmap),
20}
21
22impl DeepSizeOf for DeletionVector {
23 fn deep_size_of_children(&self, context: &mut Context) -> usize {
24 match self {
25 Self::NoDeletions => 0,
26 Self::Set(set) => set.deep_size_of_children(context),
27 Self::Bitmap(bitmap) => bitmap.serialized_size(),
29 }
30 }
31}
32
33impl DeletionVector {
34 #[allow(dead_code)] pub fn len(&self) -> usize {
36 match self {
37 Self::NoDeletions => 0,
38 Self::Set(set) => set.len(),
39 Self::Bitmap(bitmap) => bitmap.len() as usize,
40 }
41 }
42
43 pub fn is_empty(&self) -> bool {
44 self.len() == 0
45 }
46
47 pub fn contains(&self, i: u32) -> bool {
48 match self {
49 Self::NoDeletions => false,
50 Self::Set(set) => set.contains(&i),
51 Self::Bitmap(bitmap) => bitmap.contains(i),
52 }
53 }
54
55 pub fn contains_range(&self, mut range: Range<u32>) -> bool {
56 match self {
57 Self::NoDeletions => range.is_empty(),
58 Self::Set(set) => range.all(|i| set.contains(&i)),
59 Self::Bitmap(bitmap) => bitmap.contains_range(range),
60 }
61 }
62
63 fn range_cardinality(&self, range: Range<u32>) -> u64 {
64 match self {
65 Self::NoDeletions => 0,
66 Self::Set(set) => range.fold(0, |acc, i| acc + set.contains(&i) as u64),
67 Self::Bitmap(bitmap) => bitmap.range_cardinality(range),
68 }
69 }
70
71 pub fn iter(&self) -> Box<dyn Iterator<Item = u32> + Send + '_> {
72 match self {
73 Self::NoDeletions => Box::new(std::iter::empty()),
74 Self::Set(set) => Box::new(set.iter().copied()),
75 Self::Bitmap(bitmap) => Box::new(bitmap.iter()),
76 }
77 }
78
79 pub fn into_sorted_iter(self) -> Box<dyn Iterator<Item = u32> + Send + 'static> {
80 match self {
81 Self::NoDeletions => Box::new(std::iter::empty()),
82 Self::Set(set) => {
83 let mut values = Vec::from_iter(set);
86 values.sort();
87 Box::new(values.into_iter())
88 }
89 Self::Bitmap(bitmap) => Box::new(bitmap.into_iter()),
91 }
92 }
93
94 pub fn to_sorted_iter<'a>(&'a self) -> Box<dyn Iterator<Item = u32> + Send + 'a> {
96 match self {
97 Self::NoDeletions => Box::new(std::iter::empty()),
98 Self::Set(_) => self.clone().into_sorted_iter(),
101 Self::Bitmap(bitmap) => Box::new(bitmap.iter()),
102 }
103 }
104
105 pub fn build_predicate(&self, row_ids: std::slice::Iter<u64>) -> Option<BooleanArray> {
109 match self {
110 Self::Bitmap(bitmap) => Some(
111 row_ids
112 .map(|&id| !bitmap.contains(id as u32))
113 .collect::<Vec<_>>(),
114 ),
115 Self::Set(set) => Some(
116 row_ids
117 .map(|&id| !set.contains(&(id as u32)))
118 .collect::<Vec<_>>(),
119 ),
120 Self::NoDeletions => None,
121 }
122 .map(BooleanArray::from)
123 }
124}
125
126pub struct OffsetMapper {
141 dv: Arc<DeletionVector>,
142 left: u32,
143 last_diff: u32,
144}
145
146impl OffsetMapper {
147 pub fn new(dv: Arc<DeletionVector>) -> Self {
148 Self {
149 dv,
150 left: 0,
151 last_diff: 0,
152 }
153 }
154
155 pub fn map_offset(&mut self, offset: u32) -> u32 {
156 let mut mid = offset + self.last_diff;
160 let mut right = offset + self.dv.len() as u32;
161 loop {
162 let deleted_in_range = self.dv.range_cardinality(0..(mid + 1)) as u32;
163 match mid.cmp(&(offset + deleted_in_range)) {
164 std::cmp::Ordering::Equal if !self.dv.contains(mid) => {
165 self.last_diff = mid - offset;
166 return mid;
167 }
168 std::cmp::Ordering::Less => {
169 assert_ne!(self.left, mid + 1);
170 self.left = mid + 1;
171 mid = self.left + (right - self.left) / 2;
172 }
173 std::cmp::Ordering::Greater | std::cmp::Ordering::Equal => {
176 right = mid;
177 mid = self.left + (right - self.left) / 2;
178 }
179 }
180 }
181 }
182}
183
184impl Default for DeletionVector {
185 fn default() -> Self {
186 Self::NoDeletions
187 }
188}
189
190impl From<&DeletionVector> for RoaringBitmap {
191 fn from(value: &DeletionVector) -> Self {
192 match value {
193 DeletionVector::Bitmap(bitmap) => bitmap.clone(),
194 DeletionVector::Set(set) => Self::from_iter(set.iter()),
195 DeletionVector::NoDeletions => Self::new(),
196 }
197 }
198}
199
200impl PartialEq for DeletionVector {
201 fn eq(&self, other: &Self) -> bool {
202 match (self, other) {
203 (Self::NoDeletions, Self::NoDeletions) => true,
204 (Self::Set(set1), Self::Set(set2)) => set1 == set2,
205 (Self::Bitmap(bitmap1), Self::Bitmap(bitmap2)) => bitmap1 == bitmap2,
206 (Self::Set(set), Self::Bitmap(bitmap)) | (Self::Bitmap(bitmap), Self::Set(set)) => {
207 let set = set.iter().copied().collect::<RoaringBitmap>();
208 set == *bitmap
209 }
210 _ => false,
211 }
212 }
213}
214
215impl Extend<u32> for DeletionVector {
216 fn extend<T: IntoIterator<Item = u32>>(&mut self, iter: T) {
217 let iter = iter.into_iter();
218 *self = match (std::mem::take(self), iter.size_hint()) {
221 (Self::NoDeletions, (_, Some(0))) => Self::NoDeletions,
222 (Self::NoDeletions, (lower, _)) if lower >= BITMAP_THRESDHOLD => {
223 let bitmap = iter.collect::<RoaringBitmap>();
224 Self::Bitmap(bitmap)
225 }
226 (Self::NoDeletions, (_, Some(upper))) if upper < BITMAP_THRESDHOLD => {
227 let set = iter.collect::<HashSet<_>>();
228 Self::Set(set)
229 }
230 (Self::NoDeletions, _) => {
231 let set = iter.collect::<HashSet<_>>();
234 if set.len() > BITMAP_THRESDHOLD {
235 let bitmap = set.into_iter().collect::<RoaringBitmap>();
236 Self::Bitmap(bitmap)
237 } else {
238 Self::Set(set)
239 }
240 }
241 (Self::Set(mut set), _) => {
242 set.extend(iter);
243 if set.len() > BITMAP_THRESDHOLD {
244 let bitmap = set.drain().collect::<RoaringBitmap>();
245 Self::Bitmap(bitmap)
246 } else {
247 Self::Set(set)
248 }
249 }
250 (Self::Bitmap(mut bitmap), _) => {
251 bitmap.extend(iter);
252 Self::Bitmap(bitmap)
253 }
254 };
255 }
256}
257
258impl IntoIterator for DeletionVector {
264 type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
265 type Item = u32;
266
267 fn into_iter(self) -> Self::IntoIter {
268 match self {
269 Self::NoDeletions => Box::new(std::iter::empty()),
270 Self::Set(set) => {
271 let mut sorted = set.into_iter().collect::<Vec<_>>();
274 sorted.sort();
275 Box::new(sorted.into_iter())
276 }
277 Self::Bitmap(bitmap) => Box::new(bitmap.into_iter()),
278 }
279 }
280}
281
282impl FromIterator<u32> for DeletionVector {
283 fn from_iter<T: IntoIterator<Item = u32>>(iter: T) -> Self {
284 let mut deletion_vector = Self::default();
285 deletion_vector.extend(iter);
286 deletion_vector
287 }
288}
289
290#[cfg(test)]
291mod test {
292 use super::*;
293
294 #[test]
295 fn test_deletion_vector() {
296 let set = HashSet::from_iter(0..100);
297 let bitmap = RoaringBitmap::from_iter(0..100);
298
299 let set_dv = DeletionVector::Set(set);
300 let bitmap_dv = DeletionVector::Bitmap(bitmap);
301
302 assert_eq!(set_dv, bitmap_dv);
303 }
304
305 #[test]
306 fn test_threshold() {
307 let dv = DeletionVector::from_iter(0..(BITMAP_THRESDHOLD as u32));
308 assert!(matches!(dv, DeletionVector::Bitmap(_)));
309 }
310
311 #[test]
312 fn test_map_offsets() {
313 let dv = DeletionVector::from_iter(vec![3, 5]);
314 let mut mapper = OffsetMapper::new(Arc::new(dv));
315
316 let offsets = [0, 1, 2, 3, 4, 5, 6];
317 let mut output = Vec::new();
318 for offset in offsets.iter() {
319 output.push(mapper.map_offset(*offset));
320 }
321 assert_eq!(output, vec![0, 1, 2, 4, 6, 7, 8]);
322
323 let dv = DeletionVector::from_iter(vec![0, 1, 2]);
324 let mut mapper = OffsetMapper::new(Arc::new(dv));
325
326 let offsets = [0, 1, 2, 3, 4, 5, 6];
327
328 let mut output = Vec::new();
329 for offset in offsets.iter() {
330 output.push(mapper.map_offset(*offset));
331 }
332 assert_eq!(output, vec![3, 4, 5, 6, 7, 8, 9]);
333 }
334}