solana_perf/
recycler.rs

1use {
2    rand::{thread_rng, Rng},
3    std::sync::{
4        atomic::{AtomicBool, AtomicUsize, Ordering},
5        Arc, Mutex, Weak,
6    },
7};
8
9// A temporary burst in the workload can cause a large number of allocations,
10// after which they will be recycled and still reside in memory. If the number
11// of recycled objects stays above below limit for long, they will be deemed as
12// redundant since they are not getting reused. The recycler will then shrink
13// by releasing objects above this threshold. This limit aims to maintain a
14// cushion against *normal* variations in the workload while bounding the
15// number of redundant garbage collected objects after temporary bursts.
16const RECYCLER_SHRINK_SIZE: usize = 1024;
17
18// Lookback window for exponential moving averaging number of garbage collected
19// objects in terms of number of allocations. The half-life of the decaying
20// factor based on the window size defined below is 11356. This means a sample
21// of gc.size() that is 11356 allocations ago has half of the weight as the most
22// recent sample of gc.size() at current allocation.
23const RECYCLER_SHRINK_WINDOW: usize = 16384;
24
25#[derive(Debug, Default)]
26struct RecyclerStats {
27    total: AtomicUsize,
28    reuse: AtomicUsize,
29    freed: AtomicUsize,
30    max_gc: AtomicUsize,
31}
32
33#[derive(Clone, Default)]
34pub struct Recycler<T> {
35    recycler: Arc<RecyclerX<T>>,
36}
37
38#[derive(Debug)]
39pub struct RecyclerX<T> {
40    gc: Mutex<Vec<T>>,
41    stats: RecyclerStats,
42    id: usize,
43    // Shrink window times the exponential moving average size of gc.len().
44    size_factor: AtomicUsize,
45}
46
47impl<T: Default> Default for RecyclerX<T> {
48    fn default() -> RecyclerX<T> {
49        let id = thread_rng().gen_range(0..1000);
50        trace!("new recycler..{}", id);
51        RecyclerX {
52            gc: Mutex::default(),
53            stats: RecyclerStats::default(),
54            id,
55            size_factor: AtomicUsize::default(),
56        }
57    }
58}
59
60#[cfg(feature = "frozen-abi")]
61impl solana_frozen_abi::abi_example::AbiExample
62    for RecyclerX<crate::cuda_runtime::PinnedVec<solana_packet::Packet>>
63{
64    fn example() -> Self {
65        Self::default()
66    }
67}
68
69pub trait Reset {
70    fn reset(&mut self);
71    fn warm(&mut self, size_hint: usize);
72    fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
73    where
74        Self: std::marker::Sized;
75}
76
77lazy_static! {
78    static ref WARM_RECYCLERS: AtomicBool = AtomicBool::new(false);
79}
80
81pub fn enable_recycler_warming() {
82    WARM_RECYCLERS.store(true, Ordering::Relaxed);
83}
84
85fn warm_recyclers() -> bool {
86    WARM_RECYCLERS.load(Ordering::Relaxed)
87}
88
89impl<T: Default + Reset + Sized> Recycler<T> {
90    pub fn warmed(num: usize, size_hint: usize) -> Self {
91        let new = Self::default();
92        if warm_recyclers() {
93            let warmed_items: Vec<_> = (0..num)
94                .map(|_| {
95                    let mut item = new.allocate("warming");
96                    item.warm(size_hint);
97                    item
98                })
99                .collect();
100            warmed_items
101                .into_iter()
102                .for_each(|i| new.recycler.recycle(i));
103        }
104        new
105    }
106
107    pub fn allocate(&self, name: &'static str) -> T {
108        {
109            const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
110            const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
111            let mut gc = self.recycler.gc.lock().unwrap();
112            // Update the exponential moving average of gc.len().
113            // The update equation is:
114            //      a <- a * (n - 1) / n + x / n
115            // To avoid floating point math, define b = n a:
116            //      b <- b * (n - 1) / n + x
117            // To make the remaining division to round (instead of truncate),
118            // add n/2 to the numerator.
119            // Effectively b (size_factor here) is an exponential moving
120            // estimate of the "sum" of x (gc.len()) over the window as opposed
121            // to the "average".
122            self.recycler.size_factor.store(
123                self.recycler
124                    .size_factor
125                    .load(Ordering::Acquire)
126                    .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
127                    .saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
128                    .checked_div(RECYCLER_SHRINK_WINDOW)
129                    .unwrap()
130                    .saturating_add(gc.len()),
131                Ordering::Release,
132            );
133            if let Some(mut x) = gc.pop() {
134                self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
135                x.reset();
136                return x;
137            }
138        }
139        let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
140        trace!(
141            "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
142            total,
143            name,
144            self.recycler.id,
145            self.recycler.stats.reuse.load(Ordering::Relaxed),
146            self.recycler.stats.max_gc.load(Ordering::Relaxed),
147        );
148
149        let mut t = T::default();
150        t.set_recycler(Arc::downgrade(&self.recycler));
151        t
152    }
153}
154
155impl<T: Default + Reset> RecyclerX<T> {
156    pub fn recycle(&self, x: T) {
157        let len = {
158            let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
159            gc.push(x);
160            const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
161            if gc.len() > RECYCLER_SHRINK_SIZE
162                && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
163            {
164                self.stats.freed.fetch_add(
165                    gc.len().saturating_sub(RECYCLER_SHRINK_SIZE),
166                    Ordering::Relaxed,
167                );
168                for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
169                    x.set_recycler(Weak::default());
170                }
171                self.size_factor
172                    .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
173            }
174            gc.len()
175        };
176
177        let max_gc = self.stats.max_gc.load(Ordering::Relaxed);
178        if len > max_gc {
179            // this is not completely accurate, but for most cases should be fine.
180            let _ = self.stats.max_gc.compare_exchange(
181                max_gc,
182                len,
183                Ordering::Relaxed,
184                Ordering::Relaxed,
185            );
186        }
187        let total = self.stats.total.load(Ordering::Relaxed);
188        let reuse = self.stats.reuse.load(Ordering::Relaxed);
189        let freed = self.stats.freed.load(Ordering::Relaxed);
190        datapoint_debug!(
191            "recycler",
192            ("gc_len", len as i64, i64),
193            ("total", total as i64, i64),
194            ("freed", freed as i64, i64),
195            ("reuse", reuse as i64, i64),
196        );
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with};
203
204    impl Reset for u64 {
205        fn reset(&mut self) {
206            *self = 10;
207        }
208        fn warm(&mut self, _size_hint: usize) {}
209        fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
210    }
211
212    #[test]
213    fn test_recycler() {
214        let recycler = Recycler::default();
215        let mut y: u64 = recycler.allocate("test_recycler1");
216        assert_eq!(y, 0);
217        y = 20;
218        let recycler2 = recycler.clone();
219        recycler2.recycler.recycle(y);
220        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
221        let z = recycler.allocate("test_recycler2");
222        assert_eq!(z, 10);
223        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
224    }
225
226    #[test]
227    fn test_recycler_shrink() {
228        let mut rng = rand::thread_rng();
229        let recycler = PacketBatchRecycler::default();
230        // Allocate a burst of packets.
231        const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
232        {
233            let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
234                .take(NUM_PACKETS)
235                .collect();
236        }
237        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
238        // Process a normal load of packets for a while.
239        for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
240            let count = rng.gen_range(1..128);
241            let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
242        }
243        // Assert that the gc size has shrunk.
244        assert_eq!(
245            recycler.recycler.gc.lock().unwrap().len(),
246            RECYCLER_SHRINK_SIZE
247        );
248    }
249}