use lance_core::Error;
use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
use lance_file::version::LanceFileVersion;
use object_store::path::Path;
use serde::{Deserialize, Serialize};
use snafu::{location, Location};
use crate::format::pb;
use lance_core::datatypes::Schema;
use lance_core::error::Result;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DataFile {
pub path: String,
pub fields: Vec<i32>,
#[serde(default)]
pub column_indices: Vec<i32>,
#[serde(default)]
pub file_major_version: u32,
#[serde(default)]
pub file_minor_version: u32,
}
impl DataFile {
pub fn new(
path: impl Into<String>,
fields: Vec<i32>,
column_indices: Vec<i32>,
file_major_version: u32,
file_minor_version: u32,
) -> Self {
Self {
path: path.into(),
fields,
column_indices,
file_major_version,
file_minor_version,
}
}
pub fn new_unstarted(
path: impl Into<String>,
file_major_version: u32,
file_minor_version: u32,
) -> Self {
Self {
path: path.into(),
fields: vec![],
column_indices: vec![],
file_major_version,
file_minor_version,
}
}
pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
Self::new(
path,
fields,
vec![],
MAJOR_VERSION as u32,
MINOR_VERSION as u32,
)
}
pub fn new_legacy(path: impl Into<String>, schema: &Schema) -> Self {
let mut field_ids = schema.field_ids();
field_ids.sort();
Self::new(
path,
field_ids,
vec![],
MAJOR_VERSION as u32,
MINOR_VERSION as u32,
)
}
pub fn schema(&self, full_schema: &Schema) -> Schema {
full_schema.project_by_ids(&self.fields, false)
}
pub fn is_legacy_file(&self) -> bool {
self.file_major_version == 0 && self.file_minor_version < 3
}
pub fn validate(&self, base_path: &Path) -> Result<()> {
if self.is_legacy_file() {
if !self.fields.windows(2).all(|w| w[0] < w[1]) {
return Err(Error::corrupt_file(
base_path.child(self.path.clone()),
"contained unsorted or duplicate field ids",
location!(),
));
}
} else if self.fields.len() != self.column_indices.len() {
return Err(Error::corrupt_file(
base_path.child(self.path.clone()),
"contained an unequal number of fields / column_indices",
location!(),
));
}
Ok(())
}
}
impl From<&DataFile> for pb::DataFile {
fn from(df: &DataFile) -> Self {
Self {
path: df.path.clone(),
fields: df.fields.clone(),
column_indices: df.column_indices.clone(),
file_major_version: df.file_major_version,
file_minor_version: df.file_minor_version,
}
}
}
impl TryFrom<pb::DataFile> for DataFile {
type Error = Error;
fn try_from(proto: pb::DataFile) -> Result<Self> {
Ok(Self {
path: proto.path,
fields: proto.fields,
column_indices: proto.column_indices,
file_major_version: proto.file_major_version,
file_minor_version: proto.file_minor_version,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DeletionFileType {
Array,
Bitmap,
}
impl DeletionFileType {
pub fn suffix(&self) -> &str {
match self {
Self::Array => "arrow",
Self::Bitmap => "bin",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeletionFile {
pub read_version: u64,
pub id: u64,
pub file_type: DeletionFileType,
pub num_deleted_rows: Option<usize>,
}
impl TryFrom<pb::DeletionFile> for DeletionFile {
type Error = Error;
fn try_from(value: pb::DeletionFile) -> Result<Self> {
let file_type = match value.file_type {
0 => DeletionFileType::Array,
1 => DeletionFileType::Bitmap,
_ => {
return Err(Error::NotSupported {
source: "Unknown deletion file type".into(),
location: location!(),
})
}
};
let num_deleted_rows = if value.num_deleted_rows == 0 {
None
} else {
Some(value.num_deleted_rows as usize)
};
Ok(Self {
read_version: value.read_version,
id: value.id,
file_type,
num_deleted_rows,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ExternalFile {
pub path: String,
pub offset: u64,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RowIdMeta {
Inline(Vec<u8>),
External(ExternalFile),
}
impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
type Error = Error;
fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
match value {
pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
Ok(Self::External(ExternalFile {
path: file.path.clone(),
offset: file.offset,
size: file.size,
}))
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Fragment {
pub id: u64,
pub files: Vec<DataFile>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deletion_file: Option<DeletionFile>,
#[serde(skip_serializing_if = "Option::is_none")]
pub row_id_meta: Option<RowIdMeta>,
pub physical_rows: Option<usize>,
}
impl Fragment {
pub fn new(id: u64) -> Self {
Self {
id,
files: vec![],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
}
}
pub fn num_rows(&self) -> Option<usize> {
match (self.physical_rows, &self.deletion_file) {
(Some(len), None) => Some(len),
(
Some(len),
Some(DeletionFile {
num_deleted_rows: Some(num_deleted_rows),
..
}),
) => Some(len - num_deleted_rows),
_ => None,
}
}
pub fn from_json(json: &str) -> Result<Self> {
let fragment: Self = serde_json::from_str(json)?;
Ok(fragment)
}
pub fn with_file_legacy(
id: u64,
path: &str,
schema: &Schema,
physical_rows: Option<usize>,
) -> Self {
Self {
id,
files: vec![DataFile::new_legacy(path, schema)],
deletion_file: None,
physical_rows,
row_id_meta: None,
}
}
pub fn add_file(
&mut self,
path: impl Into<String>,
field_ids: Vec<i32>,
column_indices: Vec<i32>,
version: &LanceFileVersion,
) {
let (major, minor) = version.to_numbers();
self.files
.push(DataFile::new(path, field_ids, column_indices, major, minor));
}
pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
self.files.push(DataFile::new_legacy(path, schema));
}
pub fn has_legacy_files(&self) -> bool {
self.files[0].is_legacy_file()
}
pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
let Some(sample_file) = fragments
.iter()
.find(|f| !f.files.is_empty())
.map(|f| &f.files[0])
else {
return Ok(None);
};
let file_version = LanceFileVersion::try_from_major_minor(
sample_file.file_major_version,
sample_file.file_minor_version,
)?;
for frag in fragments {
for file in &frag.files {
let this_file_version = LanceFileVersion::try_from_major_minor(
file.file_major_version,
file.file_minor_version,
)?;
if file_version != this_file_version {
return Err(Error::invalid_input(
format!(
"All data files must have the same version. Detected both {} and {}",
file_version, this_file_version
),
location!(),
));
}
}
}
Ok(Some(file_version))
}
}
impl TryFrom<pb::DataFragment> for Fragment {
type Error = Error;
fn try_from(p: pb::DataFragment) -> Result<Self> {
let physical_rows = if p.physical_rows > 0 {
Some(p.physical_rows as usize)
} else {
None
};
Ok(Self {
id: p.id,
files: p
.files
.into_iter()
.map(DataFile::try_from)
.collect::<Result<_>>()?,
deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
physical_rows,
})
}
}
impl From<&Fragment> for pb::DataFragment {
fn from(f: &Fragment) -> Self {
let deletion_file = f.deletion_file.as_ref().map(|f| {
let file_type = match f.file_type {
DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
};
pb::DeletionFile {
read_version: f.read_version,
id: f.id,
file_type: file_type.into(),
num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
}
});
let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
RowIdMeta::External(file) => {
pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
path: file.path.clone(),
offset: file.offset,
size: file.size,
})
}
});
Self {
id: f.id,
files: f.files.iter().map(pb::DataFile::from).collect(),
deletion_file,
row_id_sequence,
physical_rows: f.physical_rows.unwrap_or_default() as u64,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::{
DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
};
use serde_json::{json, Value};
#[test]
fn test_new_fragment() {
let path = "foobar.lance";
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"s",
DataType::Struct(ArrowFields::from(vec![
ArrowField::new("si", DataType::Int32, false),
ArrowField::new("sb", DataType::Binary, true),
])),
true,
),
ArrowField::new("bool", DataType::Boolean, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
assert_eq!(123, fragment.id);
assert_eq!(
fragment.files,
vec![DataFile::new_legacy_from_fields(
path.to_string(),
vec![0, 1, 2, 3],
)]
)
}
#[test]
fn test_roundtrip_fragment() {
let mut fragment = Fragment::new(123);
let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
fragment.deletion_file = Some(DeletionFile {
read_version: 123,
id: 456,
file_type: DeletionFileType::Array,
num_deleted_rows: Some(10),
});
let proto = pb::DataFragment::from(&fragment);
let fragment2 = Fragment::try_from(proto).unwrap();
assert_eq!(fragment, fragment2);
fragment.deletion_file = None;
let proto = pb::DataFragment::from(&fragment);
let fragment2 = Fragment::try_from(proto).unwrap();
assert_eq!(fragment, fragment2);
}
#[test]
fn test_to_json() {
let mut fragment = Fragment::new(123);
let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
fragment.deletion_file = Some(DeletionFile {
read_version: 123,
id: 456,
file_type: DeletionFileType::Array,
num_deleted_rows: Some(10),
});
let json = serde_json::to_string(&fragment).unwrap();
let value: Value = serde_json::from_str(&json).unwrap();
assert_eq!(
value,
json!({
"id": 123,
"files":[
{"path": "foobar.lance", "fields": [0], "column_indices": [], "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION}],
"deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
"num_deleted_rows": 10},
"physical_rows": None::<usize>}),
);
let frag2 = Fragment::from_json(&json).unwrap();
assert_eq!(fragment, frag2);
}
}