lance_file/format/
metadata.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::BTreeMap;
5use std::ops::Range;
6
7use crate::datatypes::{Fields, FieldsWithMeta};
8use crate::format::pb;
9use deepsize::DeepSizeOf;
10use lance_core::datatypes::Schema;
11use lance_core::{Error, Result};
12use lance_io::traits::ProtoStruct;
13use snafu::location;
14/// Data File Metadata
15#[derive(Debug, Default, DeepSizeOf, PartialEq)]
16pub struct Metadata {
17    /// Offset of each record batch.
18    pub batch_offsets: Vec<i32>,
19
20    /// The file position of the page table in the file.
21    pub page_table_position: usize,
22
23    /// The file position of the manifest block in the file.
24    pub manifest_position: Option<usize>,
25
26    /// Metadata about statistics.
27    pub stats_metadata: Option<StatisticsMetadata>,
28}
29
30impl ProtoStruct for Metadata {
31    type Proto = pb::Metadata;
32}
33
34impl From<&Metadata> for pb::Metadata {
35    fn from(m: &Metadata) -> Self {
36        let statistics = if let Some(stats_meta) = &m.stats_metadata {
37            let fields_with_meta: FieldsWithMeta = (&stats_meta.schema).into();
38            Some(pb::metadata::StatisticsMetadata {
39                schema: fields_with_meta.fields.0,
40                fields: stats_meta.leaf_field_ids.clone(),
41                page_table_position: stats_meta.page_table_position as u64,
42            })
43        } else {
44            None
45        };
46
47        Self {
48            batch_offsets: m.batch_offsets.clone(),
49            page_table_position: m.page_table_position as u64,
50            manifest_position: m.manifest_position.unwrap_or(0) as u64,
51            statistics,
52        }
53    }
54}
55
56impl TryFrom<pb::Metadata> for Metadata {
57    type Error = Error;
58    fn try_from(m: pb::Metadata) -> Result<Self> {
59        Ok(Self {
60            batch_offsets: m.batch_offsets.clone(),
61            page_table_position: m.page_table_position as usize,
62            manifest_position: Some(m.manifest_position as usize),
63            stats_metadata: if let Some(stats_meta) = m.statistics {
64                Some(StatisticsMetadata {
65                    schema: Schema::from(FieldsWithMeta {
66                        fields: Fields(stats_meta.schema),
67                        metadata: Default::default(),
68                    }),
69                    leaf_field_ids: stats_meta.fields,
70                    page_table_position: stats_meta.page_table_position as usize,
71                })
72            } else {
73                None
74            },
75        })
76    }
77}
78
79#[derive(Debug, PartialEq)]
80pub struct BatchOffsets {
81    pub batch_id: i32,
82    pub offsets: Vec<u32>,
83}
84
85impl Metadata {
86    /// Get the number of batches in this file.
87    pub fn num_batches(&self) -> usize {
88        if self.batch_offsets.is_empty() {
89            0
90        } else {
91            self.batch_offsets.len() - 1
92        }
93    }
94
95    /// Get the number of records in this file
96    pub fn len(&self) -> usize {
97        *self.batch_offsets.last().unwrap_or(&0) as usize
98    }
99
100    pub fn is_empty(&self) -> bool {
101        self.len() == 0
102    }
103
104    /// Push the length of the batch.
105    pub fn push_batch_length(&mut self, batch_len: i32) {
106        if self.batch_offsets.is_empty() {
107            self.batch_offsets.push(0)
108        }
109        self.batch_offsets
110            .push(batch_len + self.batch_offsets.last().unwrap())
111    }
112
113    /// Get the starting offset of the batch.
114    pub fn get_offset(&self, batch_id: i32) -> Option<i32> {
115        self.batch_offsets.get(batch_id as usize).copied()
116    }
117
118    /// Get the length of the batch.
119    pub fn get_batch_length(&self, batch_id: i32) -> Option<i32> {
120        self.get_offset(batch_id + 1)
121            .map(|o| o - self.get_offset(batch_id).unwrap_or_default())
122    }
123
124    /// Group row indices into each batch.
125    ///
126    /// The indices must be sorted.
127    // TODO: pub(crate)
128    pub fn group_indices_to_batches(&self, indices: &[u32]) -> Vec<BatchOffsets> {
129        let mut batch_id: i32 = 0;
130        let num_batches = self.num_batches() as i32;
131        let mut indices_per_batch: BTreeMap<i32, Vec<u32>> = BTreeMap::new();
132
133        let mut indices = Vec::from(indices);
134        // sort unstable is quick sort and is almost always faster than sort
135        indices.sort_unstable();
136
137        for idx in indices.iter() {
138            while batch_id < num_batches && *idx >= self.batch_offsets[batch_id as usize + 1] as u32
139            {
140                batch_id += 1;
141            }
142            indices_per_batch
143                .entry(batch_id)
144                .and_modify(|v| v.push(*idx))
145                .or_insert(vec![*idx]);
146        }
147
148        indices_per_batch
149            .iter()
150            .map(|(batch_id, indices)| {
151                let batch_offset = self.batch_offsets[*batch_id as usize];
152                // Adjust indices to be the in-batch offsets.
153                let in_batch_offsets = indices
154                    .iter()
155                    .map(|i| i - batch_offset as u32)
156                    .collect::<Vec<_>>();
157                BatchOffsets {
158                    batch_id: *batch_id,
159                    offsets: in_batch_offsets,
160                }
161            })
162            .collect()
163    }
164
165    /// Map the range of row indices to the corresponding batches.
166    ///
167    /// It returns a list of (batch_id, in_batch_range) tuples.
168    // TODO: pub(crate)
169    pub fn range_to_batches(&self, range: Range<usize>) -> Result<Vec<(i32, Range<usize>)>> {
170        if range.end > *(self.batch_offsets.last().unwrap()) as usize {
171            return Err(Error::io(
172                format!(
173                    "Range {:?} is out of bounds {}",
174                    range,
175                    self.batch_offsets.last().unwrap()
176                ),
177                location!(),
178            ));
179        }
180        let offsets = self.batch_offsets.as_slice();
181        let mut batch_id = offsets
182            .binary_search(&(range.start as i32))
183            .unwrap_or_else(|x| x - 1);
184        let mut batches = vec![];
185
186        while batch_id < self.num_batches() {
187            let batch_start = offsets[batch_id] as usize;
188            if batch_start >= range.end {
189                break;
190            }
191            let start = std::cmp::max(range.start, batch_start) - batch_start;
192            let end = std::cmp::min(range.end, offsets[batch_id + 1] as usize) - batch_start;
193            batches.push((batch_id as i32, start..end));
194            batch_id += 1;
195        }
196        Ok(batches)
197    }
198}
199
200/// Metadata about the statistics
201#[derive(Debug, PartialEq, DeepSizeOf)]
202pub struct StatisticsMetadata {
203    /// Schema of the page-level statistics.
204    ///
205    /// For a given field with id `i`, the statistics are stored in the field
206    /// `i.null_count`, `i.min_value`, and `i.max_value`.
207    pub schema: Schema,
208    pub leaf_field_ids: Vec<i32>,
209    pub page_table_position: usize,
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[test]
217    fn test_group_indices_to_batch() {
218        let mut metadata = Metadata::default();
219        metadata.push_batch_length(20);
220        metadata.push_batch_length(20);
221
222        let batches = metadata.group_indices_to_batches(&[6, 24]);
223        assert_eq!(batches.len(), 2);
224        assert_eq!(
225            batches,
226            vec![
227                BatchOffsets {
228                    batch_id: 0,
229                    offsets: vec![6]
230                },
231                BatchOffsets {
232                    batch_id: 1,
233                    offsets: vec![4]
234                }
235            ]
236        );
237    }
238
239    #[test]
240    fn test_range_to_batches() {
241        let mut metadata = Metadata::default();
242        for l in [5, 10, 15, 20] {
243            metadata.push_batch_length(l);
244        }
245
246        let batches = metadata.range_to_batches(0..10).unwrap();
247        assert_eq!(batches, vec![(0, 0..5), (1, 0..5)]);
248
249        let batches = metadata.range_to_batches(2..10).unwrap();
250        assert_eq!(batches, vec![(0, 2..5), (1, 0..5)]);
251
252        let batches = metadata.range_to_batches(15..33).unwrap();
253        assert_eq!(batches, vec![(2, 0..15), (3, 0..3)]);
254
255        let batches = metadata.range_to_batches(14..33).unwrap();
256        assert_eq!(batches, vec![(1, 9..10), (2, 0..15), (3, 0..3)]);
257    }
258}