lance_encoding/encodings/physical/
bitmap.rs1use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::BooleanBufferBuilder;
7use bytes::Bytes;
8
9use futures::{future::BoxFuture, FutureExt};
10use lance_core::Result;
11use log::trace;
12
13use crate::{
14 buffer::LanceBuffer,
15 data::{BlockInfo, DataBlock, FixedWidthDataBlock},
16 decoder::{PageScheduler, PrimitivePageDecoder},
17 EncodingsIo,
18};
19
20#[derive(Debug, Clone, Copy)]
25pub struct DenseBitmapScheduler {
26 buffer_offset: u64,
27}
28
29impl DenseBitmapScheduler {
30 pub fn new(buffer_offset: u64) -> Self {
31 Self { buffer_offset }
32 }
33}
34
35impl PageScheduler for DenseBitmapScheduler {
36 fn schedule_ranges(
37 &self,
38 ranges: &[Range<u64>],
39 scheduler: &Arc<dyn EncodingsIo>,
40 top_level_row: u64,
41 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
42 let mut min = u64::MAX;
43 let mut max = 0;
44 let chunk_reqs = ranges
45 .iter()
46 .map(|range| {
47 debug_assert_ne!(range.start, range.end);
48 let start = self.buffer_offset + range.start / 8;
49 let bit_offset = range.start % 8;
50 let end = self.buffer_offset + range.end.div_ceil(8);
51 let byte_range = start..end;
52 min = min.min(start);
53 max = max.max(end);
54 (byte_range, bit_offset, range.end - range.start)
55 })
56 .collect::<Vec<_>>();
57
58 let byte_ranges = chunk_reqs
59 .iter()
60 .map(|(range, _, _)| range.clone())
61 .collect::<Vec<_>>();
62 trace!(
63 "Scheduling I/O for {} ranges across byte range {}..{}",
64 byte_ranges.len(),
65 min,
66 max
67 );
68 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
69
70 async move {
71 let bytes = bytes.await?;
72 let chunks = bytes
73 .into_iter()
74 .zip(chunk_reqs)
75 .map(|(bytes, (_, bit_offset, length))| BitmapData {
76 data: bytes,
77 bit_offset,
78 length,
79 })
80 .collect::<Vec<_>>();
81 Ok(Box::new(BitmapDecoder { chunks }) as Box<dyn PrimitivePageDecoder>)
82 }
83 .boxed()
84 }
85}
86
87struct BitmapData {
88 data: Bytes,
89 bit_offset: u64,
90 length: u64,
91}
92
93struct BitmapDecoder {
94 chunks: Vec<BitmapData>,
95}
96
97impl PrimitivePageDecoder for BitmapDecoder {
98 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
99 let mut rows_to_skip = rows_to_skip;
100 let mut dest_builder = BooleanBufferBuilder::new(num_rows as usize);
101
102 let mut rows_remaining = num_rows;
103 for chunk in &self.chunks {
104 if chunk.length <= rows_to_skip {
105 rows_to_skip -= chunk.length;
106 } else {
107 let start = rows_to_skip + chunk.bit_offset;
108 let num_vals_to_take = rows_remaining.min(chunk.length);
109 let end = start + num_vals_to_take;
110 dest_builder.append_packed_range(start as usize..end as usize, &chunk.data);
111 rows_to_skip = 0;
112 rows_remaining -= num_vals_to_take;
113 }
114 }
115
116 let bool_buffer = dest_builder.finish().into_inner();
117 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
118 data: LanceBuffer::from(bool_buffer),
119 bits_per_value: 1,
120 num_values: num_rows,
121 block_info: BlockInfo::new(),
122 }))
123 }
124}
125
126#[cfg(test)]
127mod tests {
128
129 use arrow_schema::{DataType, Field};
130 use bytes::Bytes;
131
132 use crate::decoder::PrimitivePageDecoder;
133 use crate::encodings::physical::bitmap::BitmapData;
134 use crate::testing::check_round_trip_encoding_random;
135 use crate::version::LanceFileVersion;
136
137 use super::BitmapDecoder;
138
139 #[test_log::test(tokio::test)]
140 async fn test_bitmap_boolean() {
141 let field = Field::new("", DataType::Boolean, false);
142 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
143 }
144
145 #[test]
146 fn test_bitmap_decoder_edge_cases() {
147 let decoder = BitmapDecoder {
150 chunks: vec![
151 BitmapData {
152 data: Bytes::from_static(&[0b11111111]),
153 bit_offset: 4,
154 length: 4,
155 },
156 BitmapData {
157 data: Bytes::from_static(&[0b00000000]),
158 bit_offset: 4,
159 length: 4,
160 },
161 ],
162 };
163
164 let result = decoder.decode(5, 1);
165 assert!(result.is_ok());
166 }
167}