1use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use chrono::{Duration, TimeDelta};
10use futures::stream::BoxStream;
11use futures::{StreamExt, TryStreamExt};
12use object_store::path::Path;
13use object_store::{
14 Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
15 PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as OSResult,
16};
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::future;
20use std::ops::Range;
21use std::sync::{Arc, Mutex, MutexGuard};
22
23pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
26impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
27impl Debug for dyn PolicyFnT {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 write!(f, "PolicyFn")
30 }
31}
32type PolicyFn = Arc<dyn PolicyFnT>;
33
34pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
37impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
38impl Debug for dyn ObjectMetaPolicyFnT {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "PolicyFn")
41 }
42}
43type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
44
45#[derive(Debug, Default)]
54pub struct ProxyObjectStorePolicy {
55 before_policies: HashMap<String, PolicyFn>,
59 object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
62}
63
64impl ProxyObjectStorePolicy {
65 pub fn new() -> Self {
66 Default::default()
67 }
68
69 pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
73 self.before_policies.insert(name.to_string(), policy);
74 }
75
76 pub fn clear_before_policy(&mut self, name: &str) {
77 self.before_policies.remove(name);
78 }
79
80 pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
81 self.object_meta_policies.insert(name.to_string(), policy);
82 }
83}
84
85#[derive(Debug)]
91pub struct ProxyObjectStore {
92 target: Arc<dyn ObjectStore>,
93 policy: Arc<Mutex<ProxyObjectStorePolicy>>,
94}
95
96impl ProxyObjectStore {
97 pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
98 Self { target, policy }
99 }
100
101 fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
102 let policy = self.policy.lock().unwrap();
103 for policy in policy.before_policies.values() {
104 policy(method, location).map_err(OSError::from)?;
105 }
106 Ok(())
107 }
108
109 fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
110 let policy = self.policy.lock().unwrap();
111 let mut meta = meta;
112 for policy in policy.object_meta_policies.values() {
113 meta = policy(method, meta).map_err(OSError::from)?;
114 }
115 Ok(meta)
116 }
117}
118
119impl std::fmt::Display for ProxyObjectStore {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 write!(f, "ProxyObjectStore({})", self.target)
122 }
123}
124
125#[async_trait]
126impl ObjectStore for ProxyObjectStore {
127 async fn put_opts(
128 &self,
129 location: &Path,
130 bytes: PutPayload,
131 opts: PutOptions,
132 ) -> OSResult<PutResult> {
133 self.before_method("put", location)?;
134 self.target.put_opts(location, bytes, opts).await
135 }
136
137 async fn put_multipart_opts(
138 &self,
139 location: &Path,
140 opts: PutMultipartOpts,
141 ) -> OSResult<Box<dyn MultipartUpload>> {
142 self.before_method("put_multipart", location)?;
143 self.target.put_multipart_opts(location, opts).await
144 }
145
146 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
147 self.before_method("get_opts", location)?;
148 self.target.get_opts(location, options).await
149 }
150
151 async fn get_range(&self, location: &Path, range: Range<usize>) -> OSResult<Bytes> {
152 self.before_method("get_range", location)?;
153 self.target.get_range(location, range).await
154 }
155
156 async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> OSResult<Vec<Bytes>> {
157 self.before_method("get_ranges", location)?;
158 self.target.get_ranges(location, ranges).await
159 }
160
161 async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
162 self.before_method("head", location)?;
163 let meta = self.target.head(location).await?;
164 self.transform_meta("head", meta)
165 }
166
167 async fn delete(&self, location: &Path) -> OSResult<()> {
168 self.before_method("delete", location)?;
169 self.target.delete(location).await
170 }
171
172 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult<ObjectMeta>> {
173 self.target
174 .list(prefix)
175 .and_then(|meta| future::ready(self.transform_meta("list", meta)))
176 .boxed()
177 }
178
179 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
180 self.target.list_with_delimiter(prefix).await
181 }
182
183 async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
184 self.before_method("copy", from)?;
185 self.target.copy(from, to).await
186 }
187
188 async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
189 self.before_method("rename", from)?;
190 self.target.rename(from, to).await
191 }
192
193 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
194 self.before_method("copy_if_not_exists", from)?;
195 self.target.copy_if_not_exists(from, to).await
196 }
197}
198
199static CLOCK_MUTEX: Mutex<()> = Mutex::new(());
209pub struct MockClock<'a> {
210 _guard: MutexGuard<'a, ()>,
211}
212
213impl Default for MockClock<'_> {
214 fn default() -> Self {
215 Self {
216 _guard: CLOCK_MUTEX.lock().unwrap(),
217 }
218 }
219}
220
221impl MockClock<'_> {
222 pub fn new() -> Self {
223 Default::default()
224 }
225
226 pub fn set_system_time(&self, time: Duration) {
227 mock_instant::MockClock::set_system_time(time.to_std().unwrap());
228 }
229}
230
231impl Drop for MockClock<'_> {
232 fn drop(&mut self) {
233 mock_instant::MockClock::set_system_time(TimeDelta::try_days(0).unwrap().to_std().unwrap());
235 }
236}