wasmer_wasix/runtime/module_cache/
fallback.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
use wasmer::{Engine, Module};

use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};

/// [`FallbackCache`] is a combinator for the [`ModuleCache`] trait that enables
/// the chaining of two caching strategies together, typically via
/// [`ModuleCache::with_fallback()`].
///
/// All operations are attempted using primary cache first, and if that fails,
/// falls back to using the fallback cache. By chaining different caches
/// together with [`FallbackCache`], you can create a caching hierarchy tailored
/// to your application's specific needs, balancing performance, resource usage,
/// and persistence.
///
/// A key assumption of [`FallbackCache`] is that **all operations on the
/// fallback implementation will be significantly slower than the primary one**.
///
/// ## Cache Promotion
///
/// Whenever there is a cache miss on the primary cache and the fallback is
/// able to load a module, that module is automatically added to the primary
/// cache to improve the speed of future lookups.
///
/// This "cache promotion" strategy helps keep frequently accessed modules in
/// the faster primary cache.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FallbackCache<Primary, Fallback> {
    primary: Primary,
    fallback: Fallback,
}

impl<Primary, Fallback> FallbackCache<Primary, Fallback> {
    pub(crate) fn new(primary: Primary, fallback: Fallback) -> Self {
        FallbackCache { primary, fallback }
    }

    pub fn primary(&self) -> &Primary {
        &self.primary
    }

    pub fn primary_mut(&mut self) -> &mut Primary {
        &mut self.primary
    }

    pub fn fallback(&self) -> &Fallback {
        &self.fallback
    }

    pub fn fallback_mut(&mut self) -> &mut Fallback {
        &mut self.fallback
    }

    pub fn into_inner(self) -> (Primary, Fallback) {
        let FallbackCache { primary, fallback } = self;
        (primary, fallback)
    }
}

#[async_trait::async_trait]
impl<Primary, Fallback> ModuleCache for FallbackCache<Primary, Fallback>
where
    Primary: ModuleCache + Send + Sync,
    Fallback: ModuleCache + Send + Sync,
{
    async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
        let primary_error = match self.primary.load(key, engine).await {
            Ok(m) => return Ok(m),
            Err(e) => e,
        };

        if let Ok(m) = self.fallback.load(key, engine).await {
            // Now we've got a module, let's make sure it is promoted to the
            // primary cache.
            if let Err(e) = self.primary.save(key, engine, &m).await {
                tracing::warn!(
                    %key,
                    error = &e as &dyn std::error::Error,
                    "Unable to promote a module to the primary cache",
                );
            }

            return Ok(m);
        }

        Err(primary_error)
    }

    async fn save(
        &self,
        key: ModuleHash,
        engine: &Engine,
        module: &Module,
    ) -> Result<(), CacheError> {
        futures::try_join!(
            self.primary.save(key, engine, module),
            self.fallback.save(key, engine, module)
        )?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::sync::atomic::{AtomicUsize, Ordering};

    use super::*;
    use crate::runtime::module_cache::SharedCache;

    const ADD_WAT: &[u8] = br#"(
        module
            (func
                (export "add")
                (param $x i64)
                (param $y i64)
                (result i64)
                (i64.add (local.get $x) (local.get $y)))
        )"#;

    #[derive(Debug)]
    struct Spy<I> {
        inner: I,
        success: AtomicUsize,
        failures: AtomicUsize,
    }

    impl<I> Spy<I> {
        fn new(inner: I) -> Self {
            Spy {
                inner,
                success: AtomicUsize::new(0),
                failures: AtomicUsize::new(0),
            }
        }

        fn success(&self) -> usize {
            self.success.load(Ordering::SeqCst)
        }

        fn failures(&self) -> usize {
            self.failures.load(Ordering::SeqCst)
        }
    }

    #[async_trait::async_trait]
    impl<I: ModuleCache + Send + Sync> ModuleCache for Spy<I> {
        async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
            match self.inner.load(key, engine).await {
                Ok(m) => {
                    self.success.fetch_add(1, Ordering::SeqCst);
                    Ok(m)
                }
                Err(e) => {
                    self.failures.fetch_add(1, Ordering::SeqCst);
                    Err(e)
                }
            }
        }

        async fn save(
            &self,
            key: ModuleHash,
            engine: &Engine,
            module: &Module,
        ) -> Result<(), CacheError> {
            match self.inner.save(key, engine, module).await {
                Ok(_) => {
                    self.success.fetch_add(1, Ordering::SeqCst);
                    Ok(())
                }
                Err(e) => {
                    self.failures.fetch_add(1, Ordering::SeqCst);
                    Err(e)
                }
            }
        }
    }

    #[tokio::test]
    async fn load_from_primary() {
        let engine = Engine::default();
        let module = Module::new(&engine, ADD_WAT).unwrap();
        let key = ModuleHash::xxhash_from_bytes([0; 8]);
        let primary = SharedCache::default();
        let fallback = SharedCache::default();
        primary.save(key, &engine, &module).await.unwrap();
        let primary = Spy::new(primary);
        let fallback = Spy::new(fallback);
        let cache = FallbackCache::new(&primary, &fallback);

        let got = cache.load(key, &engine).await.unwrap();

        // We should have received the same module
        assert_eq!(module, got);
        assert_eq!(primary.success(), 1);
        assert_eq!(primary.failures(), 0);
        // but the fallback wasn't touched at all
        assert_eq!(fallback.success(), 0);
        assert_eq!(fallback.failures(), 0);
        // And the fallback still doesn't have our module
        assert!(fallback.load(key, &engine).await.is_err());
    }

    #[tokio::test]
    async fn loading_from_fallback_also_populates_primary() {
        let engine = Engine::default();
        let module = Module::new(&engine, ADD_WAT).unwrap();
        let key = ModuleHash::xxhash_from_bytes([0; 8]);
        let primary = SharedCache::default();
        let fallback = SharedCache::default();
        fallback.save(key, &engine, &module).await.unwrap();
        let primary = Spy::new(primary);
        let fallback = Spy::new(fallback);
        let cache = FallbackCache::new(&primary, &fallback);

        let got = cache.load(key, &engine).await.unwrap();

        // We should have received the same module
        assert_eq!(module, got);
        // We got a hit on the fallback
        assert_eq!(fallback.success(), 1);
        assert_eq!(fallback.failures(), 0);
        // the load() on our primary failed
        assert_eq!(primary.failures(), 1);
        // but afterwards, we updated the primary cache with our module
        assert_eq!(primary.success(), 1);
        assert_eq!(primary.load(key, &engine).await.unwrap(), module);
    }

    #[tokio::test]
    async fn saving_will_update_both() {
        let engine = Engine::default();
        let module = Module::new(&engine, ADD_WAT).unwrap();
        let key = ModuleHash::xxhash_from_bytes([0; 8]);
        let primary = SharedCache::default();
        let fallback = SharedCache::default();
        let cache = FallbackCache::new(&primary, &fallback);

        cache.save(key, &engine, &module).await.unwrap();

        assert_eq!(primary.load(key, &engine).await.unwrap(), module);
        assert_eq!(fallback.load(key, &engine).await.unwrap(), module);
    }
}