use crate::Result;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{Duration, TimeDelta};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{
Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as OSResult,
};
use std::collections::HashMap;
use std::fmt::Debug;
use std::future;
use std::ops::Range;
use std::sync::{Arc, Mutex, MutexGuard};
pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
impl Debug for dyn PolicyFnT {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PolicyFn")
}
}
type PolicyFn = Arc<dyn PolicyFnT>;
pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
impl Debug for dyn ObjectMetaPolicyFnT {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PolicyFn")
}
}
type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
#[derive(Debug, Default)]
pub struct ProxyObjectStorePolicy {
before_policies: HashMap<String, PolicyFn>,
object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
}
impl ProxyObjectStorePolicy {
pub fn new() -> Self {
Default::default()
}
pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
self.before_policies.insert(name.to_string(), policy);
}
pub fn clear_before_policy(&mut self, name: &str) {
self.before_policies.remove(name);
}
pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
self.object_meta_policies.insert(name.to_string(), policy);
}
}
#[derive(Debug)]
pub struct ProxyObjectStore {
target: Arc<dyn ObjectStore>,
policy: Arc<Mutex<ProxyObjectStorePolicy>>,
}
impl ProxyObjectStore {
pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
Self { target, policy }
}
fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
let policy = self.policy.lock().unwrap();
for policy in policy.before_policies.values() {
policy(method, location).map_err(OSError::from)?;
}
Ok(())
}
fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
let policy = self.policy.lock().unwrap();
let mut meta = meta;
for policy in policy.object_meta_policies.values() {
meta = policy(method, meta).map_err(OSError::from)?;
}
Ok(meta)
}
}
impl std::fmt::Display for ProxyObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ProxyObjectStore({})", self.target)
}
}
#[async_trait]
impl ObjectStore for ProxyObjectStore {
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> OSResult<PutResult> {
self.before_method("put", location)?;
self.target.put_opts(location, bytes, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> OSResult<Box<dyn MultipartUpload>> {
self.before_method("put_multipart", location)?;
self.target.put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
self.before_method("get_opts", location)?;
self.target.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> OSResult<Bytes> {
self.before_method("get_range", location)?;
self.target.get_range(location, range).await
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> OSResult<Vec<Bytes>> {
self.before_method("get_ranges", location)?;
self.target.get_ranges(location, ranges).await
}
async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
self.before_method("head", location)?;
let meta = self.target.head(location).await?;
self.transform_meta("head", meta)
}
async fn delete(&self, location: &Path) -> OSResult<()> {
self.before_method("delete", location)?;
self.target.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult<ObjectMeta>> {
self.target
.list(prefix)
.and_then(|meta| future::ready(self.transform_meta("list", meta)))
.boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
self.target.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
self.before_method("copy", from)?;
self.target.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
self.before_method("rename", from)?;
self.target.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
self.before_method("copy_if_not_exists", from)?;
self.target.copy_if_not_exists(from, to).await
}
}
static CLOCK_MUTEX: Mutex<()> = Mutex::new(());
pub struct MockClock<'a> {
_guard: MutexGuard<'a, ()>,
}
impl Default for MockClock<'_> {
fn default() -> Self {
Self {
_guard: CLOCK_MUTEX.lock().unwrap(),
}
}
}
impl MockClock<'_> {
pub fn new() -> Self {
Default::default()
}
pub fn set_system_time(&self, time: Duration) {
mock_instant::MockClock::set_system_time(time.to_std().unwrap());
}
}
impl Drop for MockClock<'_> {
fn drop(&mut self) {
mock_instant::MockClock::set_system_time(TimeDelta::try_days(0).unwrap().to_std().unwrap());
}
}