1use {
2 rand::{thread_rng, Rng},
3 std::sync::{
4 atomic::{AtomicBool, AtomicUsize, Ordering},
5 Arc, Mutex, Weak,
6 },
7};
8
9const RECYCLER_SHRINK_SIZE: usize = 1024;
17
18const RECYCLER_SHRINK_WINDOW: usize = 16384;
24
25#[derive(Debug, Default)]
26struct RecyclerStats {
27 total: AtomicUsize,
28 reuse: AtomicUsize,
29 freed: AtomicUsize,
30 max_gc: AtomicUsize,
31}
32
33#[derive(Clone, Default)]
34pub struct Recycler<T> {
35 recycler: Arc<RecyclerX<T>>,
36}
37
38#[derive(Debug)]
39pub struct RecyclerX<T> {
40 gc: Mutex<Vec<T>>,
41 stats: RecyclerStats,
42 id: usize,
43 size_factor: AtomicUsize,
45}
46
47impl<T: Default> Default for RecyclerX<T> {
48 fn default() -> RecyclerX<T> {
49 let id = thread_rng().gen_range(0..1000);
50 trace!("new recycler..{}", id);
51 RecyclerX {
52 gc: Mutex::default(),
53 stats: RecyclerStats::default(),
54 id,
55 size_factor: AtomicUsize::default(),
56 }
57 }
58}
59
60#[cfg(feature = "frozen-abi")]
61impl solana_frozen_abi::abi_example::AbiExample
62 for RecyclerX<crate::cuda_runtime::PinnedVec<solana_packet::Packet>>
63{
64 fn example() -> Self {
65 Self::default()
66 }
67}
68
69pub trait Reset {
70 fn reset(&mut self);
71 fn warm(&mut self, size_hint: usize);
72 fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
73 where
74 Self: std::marker::Sized;
75}
76
77lazy_static! {
78 static ref WARM_RECYCLERS: AtomicBool = AtomicBool::new(false);
79}
80
81pub fn enable_recycler_warming() {
82 WARM_RECYCLERS.store(true, Ordering::Relaxed);
83}
84
85fn warm_recyclers() -> bool {
86 WARM_RECYCLERS.load(Ordering::Relaxed)
87}
88
89impl<T: Default + Reset + Sized> Recycler<T> {
90 pub fn warmed(num: usize, size_hint: usize) -> Self {
91 let new = Self::default();
92 if warm_recyclers() {
93 let warmed_items: Vec<_> = (0..num)
94 .map(|_| {
95 let mut item = new.allocate("warming");
96 item.warm(size_hint);
97 item
98 })
99 .collect();
100 warmed_items
101 .into_iter()
102 .for_each(|i| new.recycler.recycle(i));
103 }
104 new
105 }
106
107 pub fn allocate(&self, name: &'static str) -> T {
108 {
109 const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
110 const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
111 let mut gc = self.recycler.gc.lock().unwrap();
112 self.recycler.size_factor.store(
123 self.recycler
124 .size_factor
125 .load(Ordering::Acquire)
126 .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
127 .saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
128 .checked_div(RECYCLER_SHRINK_WINDOW)
129 .unwrap()
130 .saturating_add(gc.len()),
131 Ordering::Release,
132 );
133 if let Some(mut x) = gc.pop() {
134 self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
135 x.reset();
136 return x;
137 }
138 }
139 let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
140 trace!(
141 "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
142 total,
143 name,
144 self.recycler.id,
145 self.recycler.stats.reuse.load(Ordering::Relaxed),
146 self.recycler.stats.max_gc.load(Ordering::Relaxed),
147 );
148
149 let mut t = T::default();
150 t.set_recycler(Arc::downgrade(&self.recycler));
151 t
152 }
153}
154
155impl<T: Default + Reset> RecyclerX<T> {
156 pub fn recycle(&self, x: T) {
157 let len = {
158 let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
159 gc.push(x);
160 const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
161 if gc.len() > RECYCLER_SHRINK_SIZE
162 && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
163 {
164 self.stats.freed.fetch_add(
165 gc.len().saturating_sub(RECYCLER_SHRINK_SIZE),
166 Ordering::Relaxed,
167 );
168 for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
169 x.set_recycler(Weak::default());
170 }
171 self.size_factor
172 .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
173 }
174 gc.len()
175 };
176
177 let max_gc = self.stats.max_gc.load(Ordering::Relaxed);
178 if len > max_gc {
179 let _ = self.stats.max_gc.compare_exchange(
181 max_gc,
182 len,
183 Ordering::Relaxed,
184 Ordering::Relaxed,
185 );
186 }
187 let total = self.stats.total.load(Ordering::Relaxed);
188 let reuse = self.stats.reuse.load(Ordering::Relaxed);
189 let freed = self.stats.freed.load(Ordering::Relaxed);
190 datapoint_debug!(
191 "recycler",
192 ("gc_len", len as i64, i64),
193 ("total", total as i64, i64),
194 ("freed", freed as i64, i64),
195 ("reuse", reuse as i64, i64),
196 );
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use {super::*, crate::packet::PacketBatchRecycler, std::iter::repeat_with};
203
204 impl Reset for u64 {
205 fn reset(&mut self) {
206 *self = 10;
207 }
208 fn warm(&mut self, _size_hint: usize) {}
209 fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
210 }
211
212 #[test]
213 fn test_recycler() {
214 let recycler = Recycler::default();
215 let mut y: u64 = recycler.allocate("test_recycler1");
216 assert_eq!(y, 0);
217 y = 20;
218 let recycler2 = recycler.clone();
219 recycler2.recycler.recycle(y);
220 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
221 let z = recycler.allocate("test_recycler2");
222 assert_eq!(z, 10);
223 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
224 }
225
226 #[test]
227 fn test_recycler_shrink() {
228 let mut rng = rand::thread_rng();
229 let recycler = PacketBatchRecycler::default();
230 const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
232 {
233 let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
234 .take(NUM_PACKETS)
235 .collect();
236 }
237 assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
238 for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
240 let count = rng.gen_range(1..128);
241 let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
242 }
243 assert_eq!(
245 recycler.recycler.gc.lock().unwrap().len(),
246 RECYCLER_SHRINK_SIZE
247 );
248 }
249}