lance_file/format/
metadata.rs1use std::collections::BTreeMap;
5use std::ops::Range;
6
7use crate::datatypes::{Fields, FieldsWithMeta};
8use crate::format::pb;
9use deepsize::DeepSizeOf;
10use lance_core::datatypes::Schema;
11use lance_core::{Error, Result};
12use lance_io::traits::ProtoStruct;
13use snafu::location;
14#[derive(Debug, Default, DeepSizeOf, PartialEq)]
16pub struct Metadata {
17 pub batch_offsets: Vec<i32>,
19
20 pub page_table_position: usize,
22
23 pub manifest_position: Option<usize>,
25
26 pub stats_metadata: Option<StatisticsMetadata>,
28}
29
30impl ProtoStruct for Metadata {
31 type Proto = pb::Metadata;
32}
33
34impl From<&Metadata> for pb::Metadata {
35 fn from(m: &Metadata) -> Self {
36 let statistics = if let Some(stats_meta) = &m.stats_metadata {
37 let fields_with_meta: FieldsWithMeta = (&stats_meta.schema).into();
38 Some(pb::metadata::StatisticsMetadata {
39 schema: fields_with_meta.fields.0,
40 fields: stats_meta.leaf_field_ids.clone(),
41 page_table_position: stats_meta.page_table_position as u64,
42 })
43 } else {
44 None
45 };
46
47 Self {
48 batch_offsets: m.batch_offsets.clone(),
49 page_table_position: m.page_table_position as u64,
50 manifest_position: m.manifest_position.unwrap_or(0) as u64,
51 statistics,
52 }
53 }
54}
55
56impl TryFrom<pb::Metadata> for Metadata {
57 type Error = Error;
58 fn try_from(m: pb::Metadata) -> Result<Self> {
59 Ok(Self {
60 batch_offsets: m.batch_offsets.clone(),
61 page_table_position: m.page_table_position as usize,
62 manifest_position: Some(m.manifest_position as usize),
63 stats_metadata: if let Some(stats_meta) = m.statistics {
64 Some(StatisticsMetadata {
65 schema: Schema::from(FieldsWithMeta {
66 fields: Fields(stats_meta.schema),
67 metadata: Default::default(),
68 }),
69 leaf_field_ids: stats_meta.fields,
70 page_table_position: stats_meta.page_table_position as usize,
71 })
72 } else {
73 None
74 },
75 })
76 }
77}
78
79#[derive(Debug, PartialEq)]
80pub struct BatchOffsets {
81 pub batch_id: i32,
82 pub offsets: Vec<u32>,
83}
84
85impl Metadata {
86 pub fn num_batches(&self) -> usize {
88 if self.batch_offsets.is_empty() {
89 0
90 } else {
91 self.batch_offsets.len() - 1
92 }
93 }
94
95 pub fn len(&self) -> usize {
97 *self.batch_offsets.last().unwrap_or(&0) as usize
98 }
99
100 pub fn is_empty(&self) -> bool {
101 self.len() == 0
102 }
103
104 pub fn push_batch_length(&mut self, batch_len: i32) {
106 if self.batch_offsets.is_empty() {
107 self.batch_offsets.push(0)
108 }
109 self.batch_offsets
110 .push(batch_len + self.batch_offsets.last().unwrap())
111 }
112
113 pub fn get_offset(&self, batch_id: i32) -> Option<i32> {
115 self.batch_offsets.get(batch_id as usize).copied()
116 }
117
118 pub fn get_batch_length(&self, batch_id: i32) -> Option<i32> {
120 self.get_offset(batch_id + 1)
121 .map(|o| o - self.get_offset(batch_id).unwrap_or_default())
122 }
123
124 pub fn group_indices_to_batches(&self, indices: &[u32]) -> Vec<BatchOffsets> {
129 let mut batch_id: i32 = 0;
130 let num_batches = self.num_batches() as i32;
131 let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();
132
133 let mut indices = Vec::from(indices);
134 indices.sort_unstable();
136
137 for idx in indices.iter() {
138 while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
139 {
140 batch_id += 1;
141 }
142 indices_per_batch
143 .entry(batch_id)
144 .and_modify(|v| v.push(*idx))
145 .or_insert(vec![*idx]);
146 }
147
148 indices_per_batch
149 .iter()
150 .map(|(batch_id, indices)| {
151 let batch_offset = self.batch_offsets[*batch_id as usize];
152 let in_batch_offsets = indices
154 .iter()
155 .map(|i| i - batch_offset as u32)
156 .collect::<Vec<_>>();
157 BatchOffsets {
158 batch_id: *batch_id,
159 offsets: in_batch_offsets,
160 }
161 })
162 .collect()
163 }
164
165 pub fn range_to_batches(&self, range: Range<usize>) -> Result<Vec<(i32, Range<usize>)>> {
170 if range.end > *(self.batch_offsets.last().unwrap()) as usize {
171 return Err(Error::io(
172 format!(
173 "Range {:?} is out of bounds {}",
174 range,
175 self.batch_offsets.last().unwrap()
176 ),
177 location!(),
178 ));
179 }
180 let offsets = self.batch_offsets.as_slice();
181 let mut batch_id = offsets
182 .binary_search(&(range.start as i32))
183 .unwrap_or_else(|x| x - 1);
184 let mut batches = vec![];
185
186 while batch_id < self.num_batches() {
187 let batch_start = offsets[batch_id] as usize;
188 if batch_start >= range.end {
189 break;
190 }
191 let start = std::cmp::max(range.start, batch_start) - batch_start;
192 let end = std::cmp::min(range.end, offsets[batch_id + 1] as usize) - batch_start;
193 batches.push((batch_id as i32, start..end));
194 batch_id += 1;
195 }
196 Ok(batches)
197 }
198}
199
200#[derive(Debug, PartialEq, DeepSizeOf)]
202pub struct StatisticsMetadata {
203 pub schema: Schema,
208 pub leaf_field_ids: Vec<i32>,
209 pub page_table_position: usize,
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[test]
217 fn test_group_indices_to_batch() {
218 let mut metadata = Metadata::default();
219 metadata.push_batch_length(20);
220 metadata.push_batch_length(20);
221
222 let batches = metadata.group_indices_to_batches(&[6, 24]);
223 assert_eq!(batches.len(), 2);
224 assert_eq!(
225 batches,
226 vec![
227 BatchOffsets {
228 batch_id: 0,
229 offsets: vec![6]
230 },
231 BatchOffsets {
232 batch_id: 1,
233 offsets: vec![4]
234 }
235 ]
236 );
237 }
238
239 #[test]
240 fn test_range_to_batches() {
241 let mut metadata = Metadata::default();
242 for l in [5, 10, 15, 20] {
243 metadata.push_batch_length(l);
244 }
245
246 let batches = metadata.range_to_batches(0..10).unwrap();
247 assert_eq!(batches, vec![(0, 0..5), (1, 0..5)]);
248
249 let batches = metadata.range_to_batches(2..10).unwrap();
250 assert_eq!(batches, vec![(0, 2..5), (1, 0..5)]);
251
252 let batches = metadata.range_to_batches(15..33).unwrap();
253 assert_eq!(batches, vec![(2, 0..15), (3, 0..3)]);
254
255 let batches = metadata.range_to_batches(14..33).unwrap();
256 assert_eq!(batches, vec![(1, 9..10), (2, 0..15), (3, 0..3)]);
257 }
258}