solana_accounts_db/
secondary_index.rs

1use {
2    dashmap::{mapref::entry::Entry::Occupied, DashMap},
3    log::*,
4    solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
5    std::{
6        collections::HashSet,
7        fmt::Debug,
8        sync::{
9            atomic::{AtomicU64, Ordering},
10            RwLock,
11        },
12    },
13};
14
15// The only cases where an inner key should map to a different outer key is
16// if the key had different account data for the indexed key across different
17// slots. As this is rare, it should be ok to use a Vec here over a HashSet, even
18// though we are running some key existence checks.
19pub type SecondaryReverseIndexEntry = RwLock<Vec<Pubkey>>;
20
21pub trait SecondaryIndexEntry: Debug {
22    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64);
23    // Removes a value from the set. Returns whether the value was present in the set.
24    fn remove_inner_key(&self, key: &Pubkey) -> bool;
25    fn is_empty(&self) -> bool;
26    fn keys(&self) -> Vec<Pubkey>;
27    fn len(&self) -> usize;
28}
29
30#[derive(Debug, Default)]
31pub struct SecondaryIndexStats {
32    last_report: AtomicInterval,
33    num_inner_keys: AtomicU64,
34}
35
36#[derive(Debug, Default)]
37pub struct DashMapSecondaryIndexEntry {
38    account_keys: DashMap<Pubkey, ()>,
39}
40
41impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
42    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
43        if self.account_keys.get(key).is_none() {
44            self.account_keys.entry(*key).or_insert_with(|| {
45                inner_keys_count.fetch_add(1, Ordering::Relaxed);
46            });
47        }
48    }
49
50    fn remove_inner_key(&self, key: &Pubkey) -> bool {
51        self.account_keys.remove(key).is_some()
52    }
53
54    fn is_empty(&self) -> bool {
55        self.account_keys.is_empty()
56    }
57
58    fn keys(&self) -> Vec<Pubkey> {
59        self.account_keys
60            .iter()
61            .map(|entry_ref| *entry_ref.key())
62            .collect()
63    }
64
65    fn len(&self) -> usize {
66        self.account_keys.len()
67    }
68}
69
70#[derive(Debug, Default)]
71pub struct RwLockSecondaryIndexEntry {
72    account_keys: RwLock<HashSet<Pubkey>>,
73}
74
75impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
76    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
77        if self.account_keys.read().unwrap().contains(key) {
78            // the key already exists, so nothing to do here
79            return;
80        }
81
82        let was_newly_inserted = self.account_keys.write().unwrap().insert(*key);
83        if was_newly_inserted {
84            inner_keys_count.fetch_add(1, Ordering::Relaxed);
85        }
86    }
87
88    fn remove_inner_key(&self, key: &Pubkey) -> bool {
89        self.account_keys.write().unwrap().remove(key)
90    }
91
92    fn is_empty(&self) -> bool {
93        self.account_keys.read().unwrap().is_empty()
94    }
95
96    fn keys(&self) -> Vec<Pubkey> {
97        self.account_keys.read().unwrap().iter().cloned().collect()
98    }
99
100    fn len(&self) -> usize {
101        self.account_keys.read().unwrap().len()
102    }
103}
104
105#[derive(Debug, Default)]
106pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
107    metrics_name: &'static str,
108    // Map from index keys to index values
109    pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
110    pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
111    stats: SecondaryIndexStats,
112}
113
114impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
115    SecondaryIndex<SecondaryIndexEntryType>
116{
117    pub fn new(metrics_name: &'static str) -> Self {
118        Self {
119            metrics_name,
120            ..Self::default()
121        }
122    }
123
124    pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey) {
125        {
126            let pubkeys_map = self
127                .index
128                .get(key)
129                .unwrap_or_else(|| self.index.entry(*key).or_default().downgrade());
130
131            pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
132        }
133
134        {
135            let outer_keys = self.reverse_index.get(inner_key).unwrap_or_else(|| {
136                self.reverse_index
137                    .entry(*inner_key)
138                    .or_insert(RwLock::new(Vec::with_capacity(1)))
139                    .downgrade()
140            });
141
142            let should_insert = !outer_keys.read().unwrap().contains(key);
143            if should_insert {
144                let mut w_outer_keys = outer_keys.write().unwrap();
145                if !w_outer_keys.contains(key) {
146                    w_outer_keys.push(*key);
147                }
148            }
149        }
150
151        if self.stats.last_report.should_update(1000) {
152            datapoint_info!(
153                self.metrics_name,
154                ("num_secondary_keys", self.index.len() as i64, i64),
155                (
156                    "num_inner_keys",
157                    self.stats.num_inner_keys.load(Ordering::Relaxed) as i64,
158                    i64
159                ),
160                (
161                    "num_reverse_index_keys",
162                    self.reverse_index.len() as i64,
163                    i64
164                ),
165            );
166        }
167    }
168
169    // Only safe to call from `remove_by_inner_key()` due to asserts
170    fn remove_index_entries(&self, outer_key: &Pubkey, removed_inner_key: &Pubkey) {
171        let is_outer_key_empty = {
172            let inner_key_map = self
173                .index
174                .get_mut(outer_key)
175                .expect("If we're removing a key, then it must have an entry in the map");
176            // If we deleted a pubkey from the reverse_index, then the corresponding entry
177            // better exist in this index as well or the two indexes are out of sync!
178            assert!(inner_key_map.value().remove_inner_key(removed_inner_key));
179            inner_key_map.is_empty()
180        };
181
182        // Delete the `key` if the set of inner keys is empty
183        if is_outer_key_empty {
184            // Other threads may have interleaved writes to this `key`,
185            // so double-check again for its emptiness
186            if let Occupied(key_entry) = self.index.entry(*outer_key) {
187                if key_entry.get().is_empty() {
188                    key_entry.remove();
189                }
190            }
191        }
192    }
193
194    pub fn remove_by_inner_key(&self, inner_key: &Pubkey) {
195        // Save off which keys in `self.index` had slots removed so we can remove them
196        // after we purge the reverse index
197        let mut removed_outer_keys: HashSet<Pubkey> = HashSet::new();
198
199        // Check if the entry for `inner_key` in the reverse index is empty
200        // and can be removed
201        if let Some((_, outer_keys_set)) = self.reverse_index.remove(inner_key) {
202            for removed_outer_key in outer_keys_set.into_inner().unwrap().into_iter() {
203                removed_outer_keys.insert(removed_outer_key);
204            }
205        }
206
207        // Remove this value from those keys
208        for outer_key in &removed_outer_keys {
209            self.remove_index_entries(outer_key, inner_key);
210        }
211
212        // Safe to `fetch_sub()` here because a dead key cannot be removed more than once,
213        // and the `num_inner_keys` must have been incremented by exactly removed_outer_keys.len()
214        // in previous unique insertions of `inner_key` into `self.index` for each key
215        // in `removed_outer_keys`
216        self.stats
217            .num_inner_keys
218            .fetch_sub(removed_outer_keys.len() as u64, Ordering::Relaxed);
219    }
220
221    pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {
222        if let Some(inner_keys_map) = self.index.get(key) {
223            inner_keys_map.keys()
224        } else {
225            vec![]
226        }
227    }
228
229    /// log top 20 (owner, # accounts) in descending order of # accounts
230    pub fn log_contents(&self) {
231        let mut entries = self
232            .index
233            .iter()
234            .map(|entry| (entry.value().len(), *entry.key()))
235            .collect::<Vec<_>>();
236        entries.sort_unstable();
237        entries
238            .iter()
239            .rev()
240            .take(20)
241            .for_each(|(v, k)| info!("owner: {}, accounts: {}", k, v));
242    }
243}