1use {
2 dashmap::DashMap,
3 solana_sdk::{
4 account::{AccountSharedData, ReadableAccount},
5 clock::Slot,
6 hash::Hash,
7 pubkey::Pubkey,
8 },
9 std::{
10 borrow::Borrow,
11 collections::BTreeSet,
12 ops::Deref,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc, RwLock,
16 },
17 },
18};
19
20pub type SlotCache = Arc<SlotCacheInner>;
21
22#[derive(Debug)]
23pub struct SlotCacheInner {
24 cache: DashMap<Pubkey, CachedAccount>,
25 same_account_writes: AtomicU64,
26 same_account_writes_size: AtomicU64,
27 unique_account_writes_size: AtomicU64,
28 size: AtomicU64,
29 total_size: Arc<AtomicU64>,
30 is_frozen: AtomicBool,
31}
32
33impl Drop for SlotCacheInner {
34 fn drop(&mut self) {
35 self.total_size
37 .fetch_sub(self.size.load(Ordering::Relaxed), Ordering::Relaxed);
38 }
39}
40
41impl SlotCacheInner {
42 pub fn report_slot_store_metrics(&self) {
43 datapoint_info!(
44 "slot_repeated_writes",
45 (
46 "same_account_writes",
47 self.same_account_writes.load(Ordering::Relaxed),
48 i64
49 ),
50 (
51 "same_account_writes_size",
52 self.same_account_writes_size.load(Ordering::Relaxed),
53 i64
54 ),
55 (
56 "unique_account_writes_size",
57 self.unique_account_writes_size.load(Ordering::Relaxed),
58 i64
59 ),
60 ("size", self.size.load(Ordering::Relaxed), i64)
61 );
62 }
63
64 pub fn get_all_pubkeys(&self) -> Vec<Pubkey> {
65 self.cache.iter().map(|item| *item.key()).collect()
66 }
67
68 pub fn insert(
69 &self,
70 pubkey: &Pubkey,
71 account: AccountSharedData,
72 hash: Option<impl Borrow<Hash>>,
73 slot: Slot,
74 ) -> CachedAccount {
75 let data_len = account.data().len() as u64;
76 let item = Arc::new(CachedAccountInner {
77 account,
78 hash: RwLock::new(hash.map(|h| *h.borrow())),
79 slot,
80 pubkey: *pubkey,
81 });
82 if let Some(old) = self.cache.insert(*pubkey, item.clone()) {
83 self.same_account_writes.fetch_add(1, Ordering::Relaxed);
84 self.same_account_writes_size
85 .fetch_add(data_len, Ordering::Relaxed);
86
87 let old_len = old.account.data().len() as u64;
88 let grow = old_len.saturating_sub(data_len);
89 if grow > 0 {
90 self.size.fetch_add(grow, Ordering::Relaxed);
91 self.total_size.fetch_add(grow, Ordering::Relaxed);
92 } else {
93 let shrink = data_len.saturating_sub(old_len);
94 if shrink > 0 {
95 self.size.fetch_add(shrink, Ordering::Relaxed);
96 self.total_size.fetch_sub(shrink, Ordering::Relaxed);
97 }
98 }
99 } else {
100 self.size.fetch_add(data_len, Ordering::Relaxed);
101 self.total_size.fetch_add(data_len, Ordering::Relaxed);
102 self.unique_account_writes_size
103 .fetch_add(data_len, Ordering::Relaxed);
104 }
105 item
106 }
107
108 pub fn get_cloned(&self, pubkey: &Pubkey) -> Option<CachedAccount> {
109 self.cache
110 .get(pubkey)
111 .map(|account_ref| account_ref.value().clone())
115 }
116
117 pub fn mark_slot_frozen(&self) {
118 self.is_frozen.store(true, Ordering::SeqCst);
119 }
120
121 pub fn is_frozen(&self) -> bool {
122 self.is_frozen.load(Ordering::SeqCst)
123 }
124
125 pub fn total_bytes(&self) -> u64 {
126 self.unique_account_writes_size.load(Ordering::Relaxed)
127 + self.same_account_writes_size.load(Ordering::Relaxed)
128 }
129}
130
131impl Deref for SlotCacheInner {
132 type Target = DashMap<Pubkey, CachedAccount>;
133 fn deref(&self) -> &Self::Target {
134 &self.cache
135 }
136}
137
138pub type CachedAccount = Arc<CachedAccountInner>;
139
140#[derive(Debug)]
141pub struct CachedAccountInner {
142 pub account: AccountSharedData,
143 hash: RwLock<Option<Hash>>,
144 slot: Slot,
145 pubkey: Pubkey,
146}
147
148impl CachedAccountInner {
149 pub fn hash(&self) -> Hash {
150 let hash = self.hash.read().unwrap();
151 match *hash {
152 Some(hash) => hash,
153 None => {
154 drop(hash);
155 let hash = crate::accounts_db::AccountsDb::hash_account(
156 self.slot,
157 &self.account,
158 &self.pubkey,
159 );
160 *self.hash.write().unwrap() = Some(hash);
161 hash
162 }
163 }
164 }
165 pub fn pubkey(&self) -> &Pubkey {
166 &self.pubkey
167 }
168}
169
170#[derive(Debug, Default)]
171pub struct AccountsCache {
172 cache: DashMap<Slot, SlotCache>,
173 maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
176 max_flushed_root: AtomicU64,
177 total_size: Arc<AtomicU64>,
178}
179
180impl AccountsCache {
181 pub fn new_inner(&self) -> SlotCache {
182 Arc::new(SlotCacheInner {
183 cache: DashMap::default(),
184 same_account_writes: AtomicU64::default(),
185 same_account_writes_size: AtomicU64::default(),
186 unique_account_writes_size: AtomicU64::default(),
187 size: AtomicU64::default(),
188 total_size: Arc::clone(&self.total_size),
189 is_frozen: AtomicBool::default(),
190 })
191 }
192 fn unique_account_writes_size(&self) -> u64 {
193 self.cache
194 .iter()
195 .map(|item| {
196 let slot_cache = item.value();
197 slot_cache
198 .unique_account_writes_size
199 .load(Ordering::Relaxed)
200 })
201 .sum()
202 }
203 pub fn size(&self) -> u64 {
204 self.total_size.load(Ordering::Relaxed)
205 }
206 pub fn report_size(&self) {
207 datapoint_info!(
208 "accounts_cache_size",
209 (
210 "num_roots",
211 self.maybe_unflushed_roots.read().unwrap().len(),
212 i64
213 ),
214 ("num_slots", self.cache.len(), i64),
215 (
216 "total_unique_writes_size",
217 self.unique_account_writes_size(),
218 i64
219 ),
220 ("total_size", self.size(), i64),
221 );
222 }
223
224 pub fn store(
225 &self,
226 slot: Slot,
227 pubkey: &Pubkey,
228 account: AccountSharedData,
229 hash: Option<impl Borrow<Hash>>,
230 ) -> CachedAccount {
231 let slot_cache = self.slot_cache(slot).unwrap_or_else(||
232 self
237 .cache
238 .entry(slot)
239 .or_insert(self.new_inner())
240 .clone());
241
242 slot_cache.insert(pubkey, account, hash, slot)
243 }
244
245 pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option<CachedAccount> {
246 self.slot_cache(slot)
247 .and_then(|slot_cache| slot_cache.get_cloned(pubkey))
248 }
249
250 pub fn remove_slot(&self, slot: Slot) -> Option<SlotCache> {
251 self.cache.remove(&slot).map(|(_, slot_cache)| slot_cache)
252 }
253
254 pub fn slot_cache(&self, slot: Slot) -> Option<SlotCache> {
255 self.cache.get(&slot).map(|result| result.value().clone())
256 }
257
258 pub fn add_root(&self, root: Slot) {
259 let max_flushed_root = self.fetch_max_flush_root();
260 if root > max_flushed_root || (root == max_flushed_root && root == 0) {
261 self.maybe_unflushed_roots.write().unwrap().insert(root);
262 }
263 }
264
265 pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
266 let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
267 if let Some(max_root) = max_root {
268 let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
272 std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
275 } else {
276 std::mem::take(&mut *w_maybe_unflushed_roots)
277 }
278 }
279
280 pub fn remove_slots_le(&self, max_root: Slot) -> Vec<(Slot, SlotCache)> {
283 let mut removed_slots = vec![];
284 self.cache.retain(|slot, slot_cache| {
285 let should_remove = *slot <= max_root;
286 if should_remove {
287 removed_slots.push((*slot, slot_cache.clone()))
288 }
289 !should_remove
290 });
291 removed_slots
292 }
293
294 pub fn cached_frozen_slots(&self) -> Vec<Slot> {
295 let mut slots: Vec<_> = self
296 .cache
297 .iter()
298 .filter_map(|item| {
299 let (slot, slot_cache) = item.pair();
300 if slot_cache.is_frozen() {
301 Some(*slot)
302 } else {
303 None
304 }
305 })
306 .collect();
307 slots.sort_unstable();
308 slots
309 }
310
311 pub fn num_slots(&self) -> usize {
312 self.cache.len()
313 }
314
315 pub fn fetch_max_flush_root(&self) -> Slot {
316 self.max_flushed_root.load(Ordering::Relaxed)
317 }
318
319 pub fn set_max_flush_root(&self, root: Slot) {
320 self.max_flushed_root.fetch_max(root, Ordering::Relaxed);
321 }
322}
323
324#[cfg(test)]
325pub mod tests {
326 use super::*;
327
328 #[test]
329 fn test_remove_slots_le() {
330 let cache = AccountsCache::default();
331 assert!(cache.remove_slots_le(1).is_empty());
333 let inserted_slot = 0;
334 cache.store(
335 inserted_slot,
336 &Pubkey::new_unique(),
337 AccountSharedData::new(1, 0, &Pubkey::default()),
338 Some(&Hash::default()),
339 );
340 let removed = cache.remove_slots_le(0);
342 assert_eq!(removed.len(), 1);
343 assert_eq!(removed[0].0, inserted_slot);
344 }
345
346 #[test]
347 fn test_cached_frozen_slots() {
348 let cache = AccountsCache::default();
349 assert!(cache.cached_frozen_slots().is_empty());
351 let inserted_slot = 0;
352 cache.store(
353 inserted_slot,
354 &Pubkey::new_unique(),
355 AccountSharedData::new(1, 0, &Pubkey::default()),
356 Some(&Hash::default()),
357 );
358
359 assert!(cache.cached_frozen_slots().is_empty());
362 cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
363 assert_eq!(cache.cached_frozen_slots(), vec![inserted_slot]);
365 }
366}