lance_core/utils/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Testing utilities
5
6use crate::Result;
7use async_trait::async_trait;
8use bytes::Bytes;
9use chrono::{Duration, TimeDelta};
10use futures::stream::BoxStream;
11use futures::{StreamExt, TryStreamExt};
12use object_store::path::Path;
13use object_store::{
14    Error as OSError, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
15    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as OSResult,
16};
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::future;
20use std::ops::Range;
21use std::sync::{Arc, Mutex, MutexGuard};
22
23// A policy function takes in the name of the operation (e.g. "put") and the location
24// that is being accessed / modified and returns an optional error.
25pub trait PolicyFnT: Fn(&str, &Path) -> Result<()> + Send + Sync {}
26impl<F> PolicyFnT for F where F: Fn(&str, &Path) -> Result<()> + Send + Sync {}
27impl Debug for dyn PolicyFnT {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        write!(f, "PolicyFn")
30    }
31}
32type PolicyFn = Arc<dyn PolicyFnT>;
33
34// These policy functions receive (and optionally transform) an ObjectMeta
35// They apply to functions that list file info
36pub trait ObjectMetaPolicyFnT: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
37impl<F> ObjectMetaPolicyFnT for F where F: Fn(&str, ObjectMeta) -> Result<ObjectMeta> + Send + Sync {}
38impl Debug for dyn ObjectMetaPolicyFnT {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "PolicyFn")
41    }
42}
43type ObjectMetaPolicyFn = Arc<dyn ObjectMetaPolicyFnT>;
44
45/// A policy container, meant to be shared between test code and the proxy object store.
46///
47/// This container allows you to configure policies that should apply to the proxied calls.
48///
49/// Typically, you would use this to simulate I/O errors or mock out data.
50///
51/// Currently, for simplicity, we only proxy calls that involve some kind of path.  Calls
52/// to copy functions, which have a src and dst, will provide the source to the policy
53#[derive(Debug, Default)]
54pub struct ProxyObjectStorePolicy {
55    /// Policies which run before a method is invoked.  If the policy returns
56    /// an error then the target method will not be invoked and the error will
57    /// be returned instead.
58    before_policies: HashMap<String, PolicyFn>,
59    /// Policies which run after calls that return ObjectMeta.  The policy can
60    /// transform the returned ObjectMeta to mock out file listing results.
61    object_meta_policies: HashMap<String, ObjectMetaPolicyFn>,
62}
63
64impl ProxyObjectStorePolicy {
65    pub fn new() -> Self {
66        Default::default()
67    }
68
69    /// Set a new policy with the given name
70    ///
71    /// The name can be used to later remove this policy
72    pub fn set_before_policy(&mut self, name: &str, policy: PolicyFn) {
73        self.before_policies.insert(name.to_string(), policy);
74    }
75
76    pub fn clear_before_policy(&mut self, name: &str) {
77        self.before_policies.remove(name);
78    }
79
80    pub fn set_obj_meta_policy(&mut self, name: &str, policy: ObjectMetaPolicyFn) {
81        self.object_meta_policies.insert(name.to_string(), policy);
82    }
83}
84
85/// A proxy object store
86///
87/// This store wraps another object store and applies the given policy to all calls
88/// made to the underlying store.  This can be used to simulate failures or, perhaps
89/// in the future, to mock out results or provide other fine-grained control.
90#[derive(Debug)]
91pub struct ProxyObjectStore {
92    target: Arc<dyn ObjectStore>,
93    policy: Arc<Mutex<ProxyObjectStorePolicy>>,
94}
95
96impl ProxyObjectStore {
97    pub fn new(target: Arc<dyn ObjectStore>, policy: Arc<Mutex<ProxyObjectStorePolicy>>) -> Self {
98        Self { target, policy }
99    }
100
101    fn before_method(&self, method: &str, location: &Path) -> OSResult<()> {
102        let policy = self.policy.lock().unwrap();
103        for policy in policy.before_policies.values() {
104            policy(method, location).map_err(OSError::from)?;
105        }
106        Ok(())
107    }
108
109    fn transform_meta(&self, method: &str, meta: ObjectMeta) -> OSResult<ObjectMeta> {
110        let policy = self.policy.lock().unwrap();
111        let mut meta = meta;
112        for policy in policy.object_meta_policies.values() {
113            meta = policy(method, meta).map_err(OSError::from)?;
114        }
115        Ok(meta)
116    }
117}
118
119impl std::fmt::Display for ProxyObjectStore {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        write!(f, "ProxyObjectStore({})", self.target)
122    }
123}
124
125#[async_trait]
126impl ObjectStore for ProxyObjectStore {
127    async fn put_opts(
128        &self,
129        location: &Path,
130        bytes: PutPayload,
131        opts: PutOptions,
132    ) -> OSResult<PutResult> {
133        self.before_method("put", location)?;
134        self.target.put_opts(location, bytes, opts).await
135    }
136
137    async fn put_multipart_opts(
138        &self,
139        location: &Path,
140        opts: PutMultipartOpts,
141    ) -> OSResult<Box<dyn MultipartUpload>> {
142        self.before_method("put_multipart", location)?;
143        self.target.put_multipart_opts(location, opts).await
144    }
145
146    async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
147        self.before_method("get_opts", location)?;
148        self.target.get_opts(location, options).await
149    }
150
151    async fn get_range(&self, location: &Path, range: Range<usize>) -> OSResult<Bytes> {
152        self.before_method("get_range", location)?;
153        self.target.get_range(location, range).await
154    }
155
156    async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> OSResult<Vec<Bytes>> {
157        self.before_method("get_ranges", location)?;
158        self.target.get_ranges(location, ranges).await
159    }
160
161    async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
162        self.before_method("head", location)?;
163        let meta = self.target.head(location).await?;
164        self.transform_meta("head", meta)
165    }
166
167    async fn delete(&self, location: &Path) -> OSResult<()> {
168        self.before_method("delete", location)?;
169        self.target.delete(location).await
170    }
171
172    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult<ObjectMeta>> {
173        self.target
174            .list(prefix)
175            .and_then(|meta| future::ready(self.transform_meta("list", meta)))
176            .boxed()
177    }
178
179    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
180        self.target.list_with_delimiter(prefix).await
181    }
182
183    async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
184        self.before_method("copy", from)?;
185        self.target.copy(from, to).await
186    }
187
188    async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
189        self.before_method("rename", from)?;
190        self.target.rename(from, to).await
191    }
192
193    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
194        self.before_method("copy_if_not_exists", from)?;
195        self.target.copy_if_not_exists(from, to).await
196    }
197}
198
199// Regrettably, the system clock is a process-wide global. That means that tests running
200// in parallel can interfere with each other if they both want to adjust the system clock.
201//
202// By using MockClock below (which wraps mock_instant::MockClock), we can prevent this from
203// happening, though there is a test time cost as this will prevent some potential test
204// parallelism in a rather negative way (blocking).
205//
206// It also means that if one clock-dependent test fails then all future clock-dependent
207// tests will fail because of mutex poisoning.
208static CLOCK_MUTEX: Mutex<()> = Mutex::new(());
209pub struct MockClock<'a> {
210    _guard: MutexGuard<'a, ()>,
211}
212
213impl Default for MockClock<'_> {
214    fn default() -> Self {
215        Self {
216            _guard: CLOCK_MUTEX.lock().unwrap(),
217        }
218    }
219}
220
221impl MockClock<'_> {
222    pub fn new() -> Self {
223        Default::default()
224    }
225
226    pub fn set_system_time(&self, time: Duration) {
227        mock_instant::MockClock::set_system_time(time.to_std().unwrap());
228    }
229}
230
231impl Drop for MockClock<'_> {
232    fn drop(&mut self) {
233        // Reset the clock to the epoch
234        mock_instant::MockClock::set_system_time(TimeDelta::try_days(0).unwrap().to_std().unwrap());
235    }
236}