fuel_core/service/
vm_pool.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use core::{
    fmt,
    mem,
};
use fuel_core_types::fuel_vm::interpreter::MemoryInstance;
use std::sync::{
    Arc,
    Mutex,
};
use tokio::sync::OwnedSemaphorePermit;

/// Memory instance originating from a pool.
/// Will be recycled back into the pool when dropped.
pub struct MemoryFromPool {
    pool: MemoryPool,
    memory: MemoryInstance,
    _permit: OwnedSemaphorePermit,
}

impl Drop for MemoryFromPool {
    fn drop(&mut self) {
        self.pool.recycle_raw(mem::take(&mut self.memory));
    }
}

impl fmt::Debug for MemoryFromPool {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("MemoryFromPool")
            .field("pool", &"..")
            .field("memory", &self.memory)
            .finish()
    }
}

impl AsRef<MemoryInstance> for MemoryFromPool {
    fn as_ref(&self) -> &MemoryInstance {
        self.memory.as_ref()
    }
}

impl AsMut<MemoryInstance> for MemoryFromPool {
    fn as_mut(&mut self) -> &mut MemoryInstance {
        self.memory.as_mut()
    }
}

/// Pool of VM memory instances for reuse.
#[derive(Clone)]
pub struct MemoryPool {
    semaphore: Arc<tokio::sync::Semaphore>,
    pool: Arc<Mutex<Vec<MemoryInstance>>>,
}
impl MemoryPool {
    pub fn new(number_of_instances: usize) -> Self {
        Self {
            semaphore: Arc::new(tokio::sync::Semaphore::new(number_of_instances)),
            pool: Arc::new(Mutex::new(Vec::new())),
        }
    }

    /// Gets a new raw VM memory instance from the pool.
    pub async fn take_raw(&self) -> MemoryFromPool {
        let _permit = self
            .semaphore
            .clone()
            .acquire_owned()
            .await
            .expect("Semaphore is not closed");
        let mut pool = self.pool.lock().expect("poisoned");
        let memory = pool.pop().unwrap_or_default();

        MemoryFromPool {
            pool: self.clone(),
            memory,
            _permit,
        }
    }

    /// Adds a new memory instance to the pool.
    fn recycle_raw(&self, mut mem: MemoryInstance) {
        mem.reset();
        let mut pool = self.pool.lock().expect("poisoned");
        pool.push(mem);
    }
}

impl fmt::Debug for MemoryPool {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self.pool.lock() {
            Ok(pool) => {
                write!(f, "SharedVmMemoryPool {{ pool: [{} items] }}", pool.len())
            }
            Err(_) => write!(f, "SharedVmMemoryPool {{ pool: [poisoned] }}"),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    #[tokio::test]
    async fn memory_pool_recycling_works() {
        // Given
        let pool = MemoryPool::new(1);

        // When
        let mut mem_guard = pool.take_raw().await;
        let mem = mem_guard.as_mut();
        mem.grow_stack(1024).expect("Unable to grow stack");
        mem.write_bytes_noownerchecks(0, [1, 2, 3, 4])
            .expect("Unable to write stack");
        let ptr1 = mem.stack_raw() as *const _ as *const u8 as usize;
        drop(mem_guard);

        // Then
        // Make sure we get the same memory allocation back
        let mem = pool.take_raw().await;
        let ptr2 = mem.as_ref().stack_raw() as *const _ as *const u8 as usize;
        assert_eq!(ptr1, ptr2);
    }

    #[tokio::test]
    async fn memory_pool_locking_works() {
        // Given
        const POOL_SIZE: usize = 4;
        let pool = MemoryPool::new(POOL_SIZE);
        let mut _drop = vec![];
        for _ in 0..POOL_SIZE {
            _drop.push(pool.take_raw().await);
        }

        // When
        let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;

        // Then
        assert!(mem.is_err());
    }

    #[tokio::test]
    async fn memory_pool_freeing_works() {
        // Given
        const POOL_SIZE: usize = 4;
        let pool = MemoryPool::new(POOL_SIZE);
        let mut _drop = vec![];
        for _ in 0..POOL_SIZE {
            _drop.push(pool.take_raw().await);
        }
        drop(_drop);

        // When
        let mem = tokio::time::timeout(Duration::from_secs(1), pool.take_raw()).await;

        // Then
        assert!(mem.is_ok());
    }
}