Crate object_store

Source
Expand description

§object_store

This crate provides a uniform API for interacting with object storage services and local files via the ObjectStore trait.

Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.

§Highlights

  1. A high-performance async API focused on providing a consistent interface mirroring that of object stores such as S3

  2. Production quality, leading this crate to be used in large scale production systems, such as crates.io and [InfluxDB IOx]

  3. Support for advanced functionality, including atomic, conditional reads and writes, vectored IO, bulk deletion, and more…

  4. Stable and predictable governance via the Apache Arrow project

  5. Small dependency footprint, depending on only a small number of common crates

Originally developed by InfluxData and subsequently donated to Apache Arrow.

§Available ObjectStore Implementations

By default, this crate provides the following implementations:

Feature flags are used to enable support for other implementations:

§Why not a Filesystem Interface?

The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems.

This design provides the following advantages:

  • All operations are atomic, and readers cannot observe partial and/or failed writes
  • Methods map directly to object store APIs, providing both efficiency and predictability
  • Abstracts away filesystem and operating system specific quirks, ensuring portability
  • Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads

This crate does provide BufReader and BufWriter adapters which provide a more filesystem-like API for working with the ObjectStore trait, however, they should be used with care

§Adapters

ObjectStore instances can be composed with various adapters which add additional functionality:

§Configuration System

This crate provides a configuration system inspired by the APIs exposed by fsspec, PyArrow FileSystem, and Hadoop FileSystem, allowing creating a DynObjectStore from a URL and an optional list of key value pairs. This provides a flexible interface to support a wide variety of user-defined store configurations, with minimal additional application complexity.

// Can manually create a specific store variant using the appropriate builder
let store: AmazonS3 = AmazonS3Builder::from_env()
    .with_bucket_name("my-bucket").build().unwrap();

// Alternatively can create an ObjectStore from an S3 URL
let url = Url::parse("s3://bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");

// Potentially with additional options
let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();

// Or with URLs that encode the bucket name in the URL path
let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");

§List objects

Use the ObjectStore::list method to iterate over objects in remote storage or files in the local filesystem:

// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();

// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix = Path::from("data");

// Get an `async` stream of Metadata objects:
let mut list_stream = object_store.list(Some(&prefix));

// Print a line about each object
while let Some(meta) = list_stream.next().await.transpose().unwrap() {
    println!("Name: {}, size: {}", meta.location, meta.size);
}

Which will print out something like the following:

Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...

§Fetch objects

Use the ObjectStore::get method to fetch the data bytes from remote storage or files in the local filesystem as a stream.

// Create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();

// Retrieve a specific file
let path = Path::from("data/file01.parquet");

// Fetch just the file metadata
let meta = object_store.head(&path).await.unwrap();
println!("{meta:?}");

// Fetch the object including metadata
let result: GetResult = object_store.get(&path).await.unwrap();
assert_eq!(result.meta, meta);

// Buffer the entire object in memory
let object: Bytes = result.bytes().await.unwrap();
assert_eq!(object.len() as u64, meta.size);

// Alternatively stream the bytes from object storage
let stream = object_store.get(&path).await.unwrap().into_stream();

// Count the '0's using `try_fold` from `TryStreamExt` trait
let num_zeros = stream
    .try_fold(0, |acc, bytes| async move {
        Ok(acc + bytes.iter().filter(|b| **b == 0).count())
    }).await.unwrap();

println!("Num zeros in {} is {}", path, num_zeros);

§Put Object

Use the ObjectStore::put method to atomically write data.

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/file1");
let payload = PutPayload::from_static(b"hello");
object_store.put(&path, payload).await.unwrap();

§Multipart Upload

Use the ObjectStore::put_multipart method to atomically write a large amount of data

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let upload =  object_store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.write(b"hello");
write.finish().await.unwrap();

§Vectored Read

A common pattern, especially when reading structured datasets, is to need to fetch multiple, potentially non-contiguous, ranges of a particular object.

ObjectStore::get_ranges provides an efficient way to perform such vectored IO, and will automatically coalesce adjacent ranges into an appropriate number of parallel requests.

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0].len(), 10);

§Vectored Write

When writing data it is often the case that the size of the output is not known ahead of time.

A common approach to handling this is to bump-allocate a Vec, whereby the underlying allocation is repeatedly reallocated, each time doubling the capacity. The performance of this is suboptimal as reallocating memory will often involve copying it to a new location.

Fortunately, as PutPayload does not require memory regions to be contiguous, it is possible to instead allocate memory in chunks and avoid bump allocating. PutPayloadMut encapsulates this approach

let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let mut buffer = PutPayloadMut::new().with_block_size(8192);
for _ in 0..22 {
    buffer.extend_from_slice(&[0; 1024]);
}
let payload = buffer.freeze();

// Payload consists of 3 separate 8KB allocations
assert_eq!(payload.as_ref().len(), 3);
assert_eq!(payload.as_ref()[0].len(), 8192);
assert_eq!(payload.as_ref()[1].len(), 8192);
assert_eq!(payload.as_ref()[2].len(), 6144);

object_store.put(&path, payload).await.unwrap();

§Conditional Fetch

More complex object retrieval can be supported by ObjectStore::get_opts.

For example, efficiently refreshing a cache without re-fetching the entire object data if the object hasn’t been modified.

struct CacheEntry {
    /// Data returned by last request
    data: Bytes,
    /// ETag identifying the object returned by the server
    e_tag: String,
    /// Instant of last refresh
    refreshed_at: Instant,
}

/// Example cache that checks entries after 10 seconds for a new version
struct Cache {
    entries: HashMap<Path, CacheEntry>,
    store: Arc<dyn ObjectStore>,
}

impl Cache {
    pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
        Ok(match self.entries.get_mut(path) {
            Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
                true => e.data.clone(), // Return cached data
                false => { // Check if remote version has changed
                    let opts = GetOptions {
                        if_none_match: Some(e.e_tag.clone()),
                        ..GetOptions::default()
                    };
                    match self.store.get_opts(&path, opts).await {
                        Ok(d) => e.data = d.bytes().await?,
                        Err(Error::NotModified { .. }) => {} // Data has not changed
                        Err(e) => return Err(e),
                    };
                    e.refreshed_at = Instant::now();
                    e.data.clone()
                }
            },
            None => { // Not cached, fetch data
                let get = self.store.get(&path).await?;
                let e_tag = get.meta.e_tag.clone();
                let data = get.bytes().await?;
                if let Some(e_tag) = e_tag {
                    let entry = CacheEntry {
                        e_tag,
                        data: data.clone(),
                        refreshed_at: Instant::now(),
                    };
                    self.entries.insert(path.clone(), entry);
                }
                data
            }
        })
    }
}

§Conditional Put

The default behaviour when writing data is to upsert any existing object at the given path, overwriting any previous value. More complex behaviours can be achieved using PutMode, and can be used to build Optimistic Concurrency Control based transactions. This facilitates building metadata catalogs, such as Apache Iceberg or Delta Lake, directly on top of object storage, without relying on a separate DBMS.

let store = get_object_store();
let path = Path::from("test");

// Perform a conditional update on path
loop {
    // Perform get request
    let r = store.get(&path).await.unwrap();

    // Save version information fetched
    let version = UpdateVersion {
        e_tag: r.meta.e_tag.clone(),
        version: r.meta.version.clone(),
    };

    // Compute new version of object contents
    let new = do_update(r.bytes().await.unwrap());

    // Attempt to commit transaction
    match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
        Ok(_) => break, // Successfully committed
        Err(Error::Precondition { .. }) => continue, // Object has changed, try again
        Err(e) => panic!("{e}")
    }
}

§TLS Certificates

Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their CA certificates. By default the system-bundled certificates are used (see rustls-native-certs). The tls-webpki-roots feature switch can be used to also bundle Mozilla’s root certificates with the library/application (see webpki-roots).

Re-exports§

pub use client::ClientConfigKey;
pub use client::ClientOptions;
pub use client::CredentialProvider;
pub use client::StaticCredentialProvider;
pub use client::Certificate;

Modules§

aws
An object store implementation for S3
azure
An object store implementation for Azure blob storage
buffered
Utilities for performing tokio-style buffered IO
chunked
A ChunkedStore that can be used to test streaming behaviour
client
Generic utilities reqwest based ObjectStore implementations
delimited
Utility for streaming newline delimited files from object storage
gcp
An object store implementation for Google Cloud Storage
http
An object store implementation for generic HTTP servers
integration
Integration tests for custom object store implementations
limit
An object store that limits the maximum concurrency of the wrapped implementation
local
An object store implementation for a local filesystem
memory
An in-memory object store implementation
multipart
Cloud Multipart Upload
path
Path abstraction for Object Storage
prefix
An object store wrapper handling a constant path prefix
signer
Abstraction of signed URL generation for those object store implementations that support it
throttle
A throttling object store wrapper

Structs§

AttributeValue
The value of an Attribute
Attributes
Additional attributes of an object
AttributesIter
Iterator over Attributes
BackoffConfig
Exponential backoff with decorrelated jitter algorithm
GetOptions
Options for a get request, such as range
GetResult
Result for a get request
ListResult
Result of a list call that includes objects, prefixes (directories) and a token for the next set of results. Individual result sets may be limited to 1,000 objects based on the underlying object storage’s limitations.
ObjectMeta
The metadata that describes an object.
PutMultipartOpts
Options for ObjectStore::put_multipart_opts
PutOptions
Options for a put request
PutPayload
A cheaply cloneable, ordered collection of Bytes
PutPayloadIntoIter
An owning iterator of PutPayload
PutPayloadIter
An iterator over PutPayload
PutPayloadMut
A builder for PutPayload that avoids reallocating memory
PutResult
Result for a put request
RetryConfig
The configuration for how to respond to request errors
TagSet
A collection of key value pairs used to annotate objects
UpdateVersion
Uniquely identifies a version of an object to update
WriteMultipart
A synchronous write API for uploading data in parallel in fixed size chunks

Enums§

Attribute
Additional object attribute types
Error
A specialized Error for object store-related errors
GetRange
Request only a portion of an object’s bytes
GetResultPayload
The kind of a GetResult
ObjectStoreScheme
Recognizes various URL formats, identifying the relevant ObjectStore
PutMode
Configure preconditions for the put operation

Constants§

OBJECT_STORE_COALESCE_DEFAULT
Range requests with a gap less than or equal to this, will be coalesced into a single request by coalesce_ranges

Traits§

MultipartUpload
A trait allowing writing an object in fixed size chunks
ObjectStore
Universal API to multiple object store services.

Functions§

coalesce_ranges
Takes a function fetch that can fetch a range of bytes and uses this to fetch the provided byte ranges
collect_bytes
Collect a stream into Bytes avoiding copying in the event of a single chunk
parse_url
Create an ObjectStore based on the provided url
parse_url_opts
Create an ObjectStore based on the provided url and options

Type Aliases§

DynObjectStore
An alias for a dynamically dispatched object store implementation.
MultipartId
Id type for multipart uploads.
Result
A specialized Result for object store-related errors
UploadPart
An upload part request