solana_runtime/
accounts_cache.rs

1use {
2    dashmap::DashMap,
3    solana_sdk::{
4        account::{AccountSharedData, ReadableAccount},
5        clock::Slot,
6        hash::Hash,
7        pubkey::Pubkey,
8    },
9    std::{
10        borrow::Borrow,
11        collections::BTreeSet,
12        ops::Deref,
13        sync::{
14            atomic::{AtomicBool, AtomicU64, Ordering},
15            Arc, RwLock,
16        },
17    },
18};
19
20pub type SlotCache = Arc<SlotCacheInner>;
21
22#[derive(Debug)]
23pub struct SlotCacheInner {
24    cache: DashMap<Pubkey, CachedAccount>,
25    same_account_writes: AtomicU64,
26    same_account_writes_size: AtomicU64,
27    unique_account_writes_size: AtomicU64,
28    size: AtomicU64,
29    total_size: Arc<AtomicU64>,
30    is_frozen: AtomicBool,
31}
32
33impl Drop for SlotCacheInner {
34    fn drop(&mut self) {
35        // broader cache no longer holds our size in memory
36        self.total_size
37            .fetch_sub(self.size.load(Ordering::Relaxed), Ordering::Relaxed);
38    }
39}
40
41impl SlotCacheInner {
42    pub fn report_slot_store_metrics(&self) {
43        datapoint_info!(
44            "slot_repeated_writes",
45            (
46                "same_account_writes",
47                self.same_account_writes.load(Ordering::Relaxed),
48                i64
49            ),
50            (
51                "same_account_writes_size",
52                self.same_account_writes_size.load(Ordering::Relaxed),
53                i64
54            ),
55            (
56                "unique_account_writes_size",
57                self.unique_account_writes_size.load(Ordering::Relaxed),
58                i64
59            ),
60            ("size", self.size.load(Ordering::Relaxed), i64)
61        );
62    }
63
64    pub fn get_all_pubkeys(&self) -> Vec<Pubkey> {
65        self.cache.iter().map(|item| *item.key()).collect()
66    }
67
68    pub fn insert(
69        &self,
70        pubkey: &Pubkey,
71        account: AccountSharedData,
72        hash: Option<impl Borrow<Hash>>,
73        slot: Slot,
74    ) -> CachedAccount {
75        let data_len = account.data().len() as u64;
76        let item = Arc::new(CachedAccountInner {
77            account,
78            hash: RwLock::new(hash.map(|h| *h.borrow())),
79            slot,
80            pubkey: *pubkey,
81        });
82        if let Some(old) = self.cache.insert(*pubkey, item.clone()) {
83            self.same_account_writes.fetch_add(1, Ordering::Relaxed);
84            self.same_account_writes_size
85                .fetch_add(data_len, Ordering::Relaxed);
86
87            let old_len = old.account.data().len() as u64;
88            let grow = old_len.saturating_sub(data_len);
89            if grow > 0 {
90                self.size.fetch_add(grow, Ordering::Relaxed);
91                self.total_size.fetch_add(grow, Ordering::Relaxed);
92            } else {
93                let shrink = data_len.saturating_sub(old_len);
94                if shrink > 0 {
95                    self.size.fetch_add(shrink, Ordering::Relaxed);
96                    self.total_size.fetch_sub(shrink, Ordering::Relaxed);
97                }
98            }
99        } else {
100            self.size.fetch_add(data_len, Ordering::Relaxed);
101            self.total_size.fetch_add(data_len, Ordering::Relaxed);
102            self.unique_account_writes_size
103                .fetch_add(data_len, Ordering::Relaxed);
104        }
105        item
106    }
107
108    pub fn get_cloned(&self, pubkey: &Pubkey) -> Option<CachedAccount> {
109        self.cache
110            .get(pubkey)
111            // 1) Maybe can eventually use a Cow to avoid a clone on every read
112            // 2) Popping is only safe if it's guaranteed that only
113            //    replay/banking threads are reading from the AccountsDb
114            .map(|account_ref| account_ref.value().clone())
115    }
116
117    pub fn mark_slot_frozen(&self) {
118        self.is_frozen.store(true, Ordering::SeqCst);
119    }
120
121    pub fn is_frozen(&self) -> bool {
122        self.is_frozen.load(Ordering::SeqCst)
123    }
124
125    pub fn total_bytes(&self) -> u64 {
126        self.unique_account_writes_size.load(Ordering::Relaxed)
127            + self.same_account_writes_size.load(Ordering::Relaxed)
128    }
129}
130
131impl Deref for SlotCacheInner {
132    type Target = DashMap<Pubkey, CachedAccount>;
133    fn deref(&self) -> &Self::Target {
134        &self.cache
135    }
136}
137
138pub type CachedAccount = Arc<CachedAccountInner>;
139
140#[derive(Debug)]
141pub struct CachedAccountInner {
142    pub account: AccountSharedData,
143    hash: RwLock<Option<Hash>>,
144    slot: Slot,
145    pubkey: Pubkey,
146}
147
148impl CachedAccountInner {
149    pub fn hash(&self) -> Hash {
150        let hash = self.hash.read().unwrap();
151        match *hash {
152            Some(hash) => hash,
153            None => {
154                drop(hash);
155                let hash = crate::accounts_db::AccountsDb::hash_account(
156                    self.slot,
157                    &self.account,
158                    &self.pubkey,
159                );
160                *self.hash.write().unwrap() = Some(hash);
161                hash
162            }
163        }
164    }
165    pub fn pubkey(&self) -> &Pubkey {
166        &self.pubkey
167    }
168}
169
170#[derive(Debug, Default)]
171pub struct AccountsCache {
172    cache: DashMap<Slot, SlotCache>,
173    // Queue of potentially unflushed roots. Random eviction + cache too large
174    // could have triggered a flush of this slot already
175    maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
176    max_flushed_root: AtomicU64,
177    total_size: Arc<AtomicU64>,
178}
179
180impl AccountsCache {
181    pub fn new_inner(&self) -> SlotCache {
182        Arc::new(SlotCacheInner {
183            cache: DashMap::default(),
184            same_account_writes: AtomicU64::default(),
185            same_account_writes_size: AtomicU64::default(),
186            unique_account_writes_size: AtomicU64::default(),
187            size: AtomicU64::default(),
188            total_size: Arc::clone(&self.total_size),
189            is_frozen: AtomicBool::default(),
190        })
191    }
192    fn unique_account_writes_size(&self) -> u64 {
193        self.cache
194            .iter()
195            .map(|item| {
196                let slot_cache = item.value();
197                slot_cache
198                    .unique_account_writes_size
199                    .load(Ordering::Relaxed)
200            })
201            .sum()
202    }
203    pub fn size(&self) -> u64 {
204        self.total_size.load(Ordering::Relaxed)
205    }
206    pub fn report_size(&self) {
207        datapoint_info!(
208            "accounts_cache_size",
209            (
210                "num_roots",
211                self.maybe_unflushed_roots.read().unwrap().len(),
212                i64
213            ),
214            ("num_slots", self.cache.len(), i64),
215            (
216                "total_unique_writes_size",
217                self.unique_account_writes_size(),
218                i64
219            ),
220            ("total_size", self.size(), i64),
221        );
222    }
223
224    pub fn store(
225        &self,
226        slot: Slot,
227        pubkey: &Pubkey,
228        account: AccountSharedData,
229        hash: Option<impl Borrow<Hash>>,
230    ) -> CachedAccount {
231        let slot_cache = self.slot_cache(slot).unwrap_or_else(||
232            // DashMap entry.or_insert() returns a RefMut, essentially a write lock,
233            // which is dropped after this block ends, minimizing time held by the lock.
234            // However, we still want to persist the reference to the `SlotStores` behind
235            // the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone).
236            self
237                .cache
238                .entry(slot)
239                .or_insert(self.new_inner())
240                .clone());
241
242        slot_cache.insert(pubkey, account, hash, slot)
243    }
244
245    pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option<CachedAccount> {
246        self.slot_cache(slot)
247            .and_then(|slot_cache| slot_cache.get_cloned(pubkey))
248    }
249
250    pub fn remove_slot(&self, slot: Slot) -> Option<SlotCache> {
251        self.cache.remove(&slot).map(|(_, slot_cache)| slot_cache)
252    }
253
254    pub fn slot_cache(&self, slot: Slot) -> Option<SlotCache> {
255        self.cache.get(&slot).map(|result| result.value().clone())
256    }
257
258    pub fn add_root(&self, root: Slot) {
259        let max_flushed_root = self.fetch_max_flush_root();
260        if root > max_flushed_root || (root == max_flushed_root && root == 0) {
261            self.maybe_unflushed_roots.write().unwrap().insert(root);
262        }
263    }
264
265    pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
266        let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
267        if let Some(max_root) = max_root {
268            // `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively,
269            // all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots
270            // <= `max_root`.
271            let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
272            // After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and
273            // we return all slots <= `max_root`
274            std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
275        } else {
276            std::mem::take(&mut *w_maybe_unflushed_roots)
277        }
278    }
279
280    // Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot,
281    // otherwise the slot removed could still be undergoing replay!
282    pub fn remove_slots_le(&self, max_root: Slot) -> Vec<(Slot, SlotCache)> {
283        let mut removed_slots = vec![];
284        self.cache.retain(|slot, slot_cache| {
285            let should_remove = *slot <= max_root;
286            if should_remove {
287                removed_slots.push((*slot, slot_cache.clone()))
288            }
289            !should_remove
290        });
291        removed_slots
292    }
293
294    pub fn cached_frozen_slots(&self) -> Vec<Slot> {
295        let mut slots: Vec<_> = self
296            .cache
297            .iter()
298            .filter_map(|item| {
299                let (slot, slot_cache) = item.pair();
300                if slot_cache.is_frozen() {
301                    Some(*slot)
302                } else {
303                    None
304                }
305            })
306            .collect();
307        slots.sort_unstable();
308        slots
309    }
310
311    pub fn num_slots(&self) -> usize {
312        self.cache.len()
313    }
314
315    pub fn fetch_max_flush_root(&self) -> Slot {
316        self.max_flushed_root.load(Ordering::Relaxed)
317    }
318
319    pub fn set_max_flush_root(&self, root: Slot) {
320        self.max_flushed_root.fetch_max(root, Ordering::Relaxed);
321    }
322}
323
324#[cfg(test)]
325pub mod tests {
326    use super::*;
327
328    #[test]
329    fn test_remove_slots_le() {
330        let cache = AccountsCache::default();
331        // Cache is empty, should return nothing
332        assert!(cache.remove_slots_le(1).is_empty());
333        let inserted_slot = 0;
334        cache.store(
335            inserted_slot,
336            &Pubkey::new_unique(),
337            AccountSharedData::new(1, 0, &Pubkey::default()),
338            Some(&Hash::default()),
339        );
340        // If the cache is told the size limit is 0, it should return the one slot
341        let removed = cache.remove_slots_le(0);
342        assert_eq!(removed.len(), 1);
343        assert_eq!(removed[0].0, inserted_slot);
344    }
345
346    #[test]
347    fn test_cached_frozen_slots() {
348        let cache = AccountsCache::default();
349        // Cache is empty, should return nothing
350        assert!(cache.cached_frozen_slots().is_empty());
351        let inserted_slot = 0;
352        cache.store(
353            inserted_slot,
354            &Pubkey::new_unique(),
355            AccountSharedData::new(1, 0, &Pubkey::default()),
356            Some(&Hash::default()),
357        );
358
359        // If the cache is told the size limit is 0, it should return nothing, because there's no
360        // frozen slots
361        assert!(cache.cached_frozen_slots().is_empty());
362        cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
363        // If the cache is told the size limit is 0, it should return the one frozen slot
364        assert_eq!(cache.cached_frozen_slots(), vec![inserted_slot]);
365    }
366}