use std::{collections::HashMap, sync::Arc, time::Duration};
use lance_file::datatypes::populate_schema_dictionary;
use lance_io::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptions,
DEFAULT_CLOUD_IO_PARALLELISM,
};
use lance_table::{
format::Manifest,
io::commit::{commit_handler_from_url, CommitHandler},
};
use object_store::{aws::AwsCredentialProvider, path::Path, DynObjectStore};
use prost::Message;
use snafu::{location, Location};
use tracing::instrument;
use url::Url;
use super::refs::{Ref, Tags};
use super::{ReadParams, WriteParams, DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE};
use crate::{
error::{Error, Result},
session::Session,
Dataset,
};
#[derive(Debug, Clone)]
pub struct DatasetBuilder {
index_cache_size: usize,
metadata_cache_size: usize,
manifest: Option<Manifest>,
session: Option<Arc<Session>>,
commit_handler: Option<Arc<dyn CommitHandler>>,
options: ObjectStoreParams,
version: Option<Ref>,
table_uri: String,
object_store_registry: Arc<ObjectStoreRegistry>,
}
impl DatasetBuilder {
pub fn from_uri<T: AsRef<str>>(table_uri: T) -> Self {
Self {
index_cache_size: DEFAULT_INDEX_CACHE_SIZE,
metadata_cache_size: DEFAULT_METADATA_CACHE_SIZE,
table_uri: table_uri.as_ref().to_string(),
options: ObjectStoreParams::default(),
commit_handler: None,
session: None,
version: None,
manifest: None,
object_store_registry: Arc::new(ObjectStoreRegistry::default()),
}
}
}
impl DatasetBuilder {
pub fn with_index_cache_size(mut self, cache_size: usize) -> Self {
self.index_cache_size = cache_size;
self
}
pub fn with_metadata_cache_size(mut self, cache_size: usize) -> Self {
self.metadata_cache_size = cache_size;
self
}
pub fn with_block_size(mut self, block_size: usize) -> Self {
self.options.block_size = Some(block_size);
self
}
pub fn with_version(mut self, version: u64) -> Self {
self.version = Some(Ref::from(version));
self
}
pub fn with_tag(mut self, tag: &str) -> Self {
self.version = Some(Ref::from(tag));
self
}
pub fn with_commit_handler(mut self, commit_handler: Arc<dyn CommitHandler>) -> Self {
self.commit_handler = Some(commit_handler);
self
}
pub fn with_s3_credentials_refresh_offset(mut self, offset: Duration) -> Self {
self.options.s3_credentials_refresh_offset = offset;
self
}
pub fn with_aws_credentials_provider(mut self, credentials: AwsCredentialProvider) -> Self {
self.options.aws_credentials = Some(credentials);
self
}
pub fn with_object_store(
mut self,
object_store: Arc<DynObjectStore>,
location: Url,
commit_handler: Arc<dyn CommitHandler>,
) -> Self {
self.options.object_store = Some((object_store, location));
self.commit_handler = Some(commit_handler);
self
}
pub fn with_serialized_manifest(mut self, manifest: &[u8]) -> Result<Self> {
let manifest = Manifest::try_from(lance_table::format::pb::Manifest::decode(manifest)?)?;
self.manifest = Some(manifest);
Ok(self)
}
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.options.storage_options = Some(storage_options);
self
}
pub fn with_storage_option(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Self {
let mut storage_options = self.options.storage_options.unwrap_or_default();
storage_options.insert(key.as_ref().to_string(), value.as_ref().to_string());
self.options.storage_options = Some(storage_options);
self
}
pub fn with_read_params(mut self, read_params: ReadParams) -> Self {
self = self
.with_index_cache_size(read_params.index_cache_size)
.with_metadata_cache_size(read_params.metadata_cache_size);
if let Some(options) = read_params.store_options {
self.options = options;
}
if let Some(session) = read_params.session {
self.session = Some(session);
}
if let Some(commit_handler) = read_params.commit_handler {
self.commit_handler = Some(commit_handler);
}
self.object_store_registry = read_params.object_store_registry.clone();
self
}
pub fn with_write_params(mut self, write_params: WriteParams) -> Self {
if let Some(options) = write_params.store_params {
self.options = options;
}
if let Some(commit_handler) = write_params.commit_handler {
self.commit_handler = Some(commit_handler);
}
self.object_store_registry = write_params.object_store_registry.clone();
self
}
pub fn with_session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}
pub fn with_object_store_registry(mut self, registry: Arc<ObjectStoreRegistry>) -> Self {
self.object_store_registry = registry;
self
}
pub async fn build_object_store(self) -> Result<(ObjectStore, Path, Arc<dyn CommitHandler>)> {
let commit_handler = match self.commit_handler {
Some(commit_handler) => Ok(commit_handler),
None => commit_handler_from_url(&self.table_uri, &Some(self.options.clone())).await,
}?;
let storage_options = self
.options
.storage_options
.clone()
.map(StorageOptions::new)
.unwrap_or_default();
let download_retry_count = storage_options.download_retry_count();
match &self.options.object_store {
Some(store) => Ok((
ObjectStore::new(
store.0.clone(),
store.1.clone(),
self.options.block_size,
self.options.object_store_wrapper,
self.options.use_constant_size_upload_parts,
store.1.scheme() != "file",
DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
),
Path::from(store.1.path()),
commit_handler,
)),
None => {
let (store, path) = ObjectStore::from_uri_and_params(
self.object_store_registry.clone(),
&self.table_uri,
&self.options,
)
.await?;
Ok((store, path, commit_handler))
}
}
}
#[instrument(skip_all)]
pub async fn load(mut self) -> Result<Dataset> {
let session = match self.session.take() {
Some(session) => session,
None => Arc::new(Session::new(
self.index_cache_size,
self.metadata_cache_size,
)),
};
let mut version: Option<u64> = None;
let cloned_ref = self.version.clone();
let table_uri = self.table_uri.clone();
let manifest = self.manifest.take();
let (object_store, base_path, commit_handler) = self.build_object_store().await?;
if let Some(r) = cloned_ref {
version = match r {
Ref::Version(v) => Some(v),
Ref::Tag(t) => {
let tags = Tags::new(
Arc::new(object_store.clone()),
commit_handler.clone(),
base_path.clone(),
);
Some(tags.get_version(t.as_str()).await?)
}
}
}
let (manifest, manifest_naming_scheme) = if let Some(mut manifest) = manifest {
let location = commit_handler
.resolve_version_location(&base_path, manifest.version, &object_store.inner)
.await?;
if manifest.schema.has_dictionary_types() {
let reader = object_store.open(&location.path).await?;
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
}
(manifest, location.naming_scheme)
} else {
let manifest_location = match version {
Some(version) => {
commit_handler
.resolve_version_location(&base_path, version, &object_store.inner)
.await?
}
None => commit_handler
.resolve_latest_location(&base_path, &object_store)
.await
.map_err(|e| Error::DatasetNotFound {
source: Box::new(e),
path: base_path.to_string(),
location: location!(),
})?,
};
let manifest = Dataset::load_manifest(&object_store, &manifest_location).await?;
(manifest, manifest_location.naming_scheme)
};
Dataset::checkout_manifest(
Arc::new(object_store),
base_path,
table_uri,
manifest,
session,
commit_handler,
manifest_naming_scheme,
)
.await
}
}