solana_accounts_db/accounts_db/
geyser_plugin_utils.rs

1use {
2    crate::{account_storage::meta::StoredAccountMeta, accounts_db::AccountsDb},
3    solana_measure::measure::Measure,
4    solana_metrics::*,
5    solana_sdk::{
6        account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
7    },
8    std::collections::{HashMap, HashSet},
9};
10
11#[derive(Default)]
12pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
13    pub total_accounts: usize,
14    pub skipped_accounts: usize,
15    pub notified_accounts: usize,
16    pub elapsed_filtering_us: usize,
17    pub total_pure_notify: usize,
18    pub total_pure_bookeeping: usize,
19    pub elapsed_notifying_us: usize,
20}
21
22impl GeyserPluginNotifyAtSnapshotRestoreStats {
23    pub fn report(&self) {
24        datapoint_info!(
25            "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
26            ("total_accounts", self.total_accounts, i64),
27            ("skipped_accounts", self.skipped_accounts, i64),
28            ("notified_accounts", self.notified_accounts, i64),
29            ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
30            ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
31            ("total_pure_notify_us", self.total_pure_notify, i64),
32            ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
33        );
34    }
35}
36
37impl AccountsDb {
38    /// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed
39    /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
40    /// multiple times only the last write (with highest write_version) is notified.
41    pub fn notify_account_restore_from_snapshot(&self) {
42        if self.accounts_update_notifier.is_none() {
43            return;
44        }
45
46        let mut slots = self.storage.all_slots();
47        let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
48        let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
49
50        slots.sort_by(|a, b| b.cmp(a));
51        for slot in slots {
52            self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
53        }
54
55        let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
56        accounts_update_notifier.notify_end_of_restore_from_snapshot();
57        notify_stats.report();
58    }
59
60    pub fn notify_account_at_accounts_update(
61        &self,
62        slot: Slot,
63        account: &AccountSharedData,
64        txn: &Option<&SanitizedTransaction>,
65        pubkey: &Pubkey,
66        write_version: u64,
67    ) {
68        if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
69            accounts_update_notifier.notify_account_update(
70                slot,
71                account,
72                txn,
73                pubkey,
74                write_version,
75            );
76        }
77    }
78
79    fn notify_accounts_in_slot(
80        &self,
81        slot: Slot,
82        notified_accounts: &mut HashSet<Pubkey>,
83        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
84    ) {
85        let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
86
87        let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
88        let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
89        let mut account_len = 0;
90        let mut pubkeys = HashSet::new();
91
92        // populate `accounts_duplicate` for any pubkeys that are in this storage twice.
93        // Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure
94        // we don't have duplicate pubkeys.
95        let mut i = 0;
96        storage_entry.accounts.scan_pubkeys(|pubkey| {
97            i += 1; // pre-increment to most easily match early returns in next loop
98            if !pubkeys.insert(*pubkey) {
99                accounts_duplicate.insert(*pubkey, i); // remember the highest index entry in this slot
100            }
101        });
102
103        // now, actually notify geyser
104        let mut i = 0;
105        storage_entry.accounts.scan_accounts(|account| {
106            i += 1;
107            account_len += 1;
108            if notified_accounts.contains(account.pubkey()) {
109                notify_stats.skipped_accounts += 1;
110                return;
111            }
112            if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
113                if highest_i != &i {
114                    // this pubkey is in this storage twice and the current instance is not the last one, so we skip it.
115                    // We only send unique accounts in this slot to `notify_filtered_accounts`
116                    return;
117                }
118            }
119
120            // later entries in the same slot are more recent and override earlier accounts for the same pubkey
121            // We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version.
122            // As long as all accounts for this slot are in 1 append vec that can be iterated oldest to newest.
123            self.notify_filtered_accounts(
124                slot,
125                notified_accounts,
126                std::iter::once(account),
127                notify_stats,
128            );
129        });
130        notify_stats.total_accounts += account_len;
131        measure_filter.stop();
132        notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
133    }
134
135    fn notify_filtered_accounts<'a>(
136        &self,
137        slot: Slot,
138        notified_accounts: &mut HashSet<Pubkey>,
139        accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
140        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
141    ) {
142        let notifier = self.accounts_update_notifier.as_ref().unwrap();
143        let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
144        for account in accounts_to_stream {
145            let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
146            notifier.notify_account_restore_from_snapshot(slot, &account);
147            measure_pure_notify.stop();
148
149            notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
150
151            let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
152            notified_accounts.insert(*account.pubkey());
153            measure_bookkeep.stop();
154            notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
155
156            notify_stats.notified_accounts += 1;
157        }
158        measure_notify.stop();
159        notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
160    }
161}
162
163#[cfg(test)]
164pub mod tests {
165    use {
166        crate::{
167            account_storage::meta::StoredAccountMeta,
168            accounts_db::AccountsDb,
169            accounts_update_notifier_interface::{
170                AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
171            },
172        },
173        dashmap::DashMap,
174        solana_sdk::{
175            account::{AccountSharedData, ReadableAccount},
176            clock::Slot,
177            pubkey::Pubkey,
178            transaction::SanitizedTransaction,
179        },
180        std::sync::{
181            atomic::{AtomicBool, Ordering},
182            Arc,
183        },
184    };
185
186    impl AccountsDb {
187        pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
188            self.accounts_update_notifier = notifier;
189        }
190    }
191
192    #[derive(Debug, Default)]
193    struct GeyserTestPlugin {
194        pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
195        pub is_startup_done: AtomicBool,
196    }
197
198    impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
199        /// Notified when an account is updated at runtime, due to transaction activities
200        fn notify_account_update(
201            &self,
202            slot: Slot,
203            account: &AccountSharedData,
204            _txn: &Option<&SanitizedTransaction>,
205            pubkey: &Pubkey,
206            _write_version: u64,
207        ) {
208            self.accounts_notified
209                .entry(*pubkey)
210                .or_default()
211                .push((slot, account.clone()));
212        }
213
214        /// Notified when the AccountsDb is initialized at start when restored
215        /// from a snapshot.
216        fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
217            self.accounts_notified
218                .entry(*account.pubkey())
219                .or_default()
220                .push((slot, account.to_account_shared_data()));
221        }
222
223        fn notify_end_of_restore_from_snapshot(&self) {
224            self.is_startup_done.store(true, Ordering::Relaxed);
225        }
226    }
227
228    #[test]
229    fn test_notify_account_restore_from_snapshot_once_per_slot() {
230        let mut accounts = AccountsDb::new_single_for_tests();
231        // Account with key1 is updated twice in the store -- should only get notified once.
232        let key1 = solana_sdk::pubkey::new_rand();
233        let mut account1_lamports: u64 = 1;
234        let account1 =
235            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
236        let slot0 = 0;
237        accounts.store_uncached(slot0, &[(&key1, &account1)]);
238
239        account1_lamports = 2;
240        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
241        accounts.store_uncached(slot0, &[(&key1, &account1)]);
242        let notifier = GeyserTestPlugin::default();
243
244        let key2 = solana_sdk::pubkey::new_rand();
245        let account2_lamports: u64 = 100;
246        let account2 =
247            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
248
249        accounts.store_uncached(slot0, &[(&key2, &account2)]);
250
251        let notifier = Arc::new(notifier);
252        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
253
254        accounts.notify_account_restore_from_snapshot();
255
256        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
257        assert_eq!(
258            notifier.accounts_notified.get(&key1).unwrap()[0]
259                .1
260                .lamports(),
261            account1_lamports
262        );
263        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
264        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
265        assert_eq!(
266            notifier.accounts_notified.get(&key2).unwrap()[0]
267                .1
268                .lamports(),
269            account2_lamports
270        );
271        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
272
273        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
274    }
275
276    #[test]
277    fn test_notify_account_restore_from_snapshot_once_across_slots() {
278        let mut accounts = AccountsDb::new_single_for_tests();
279        // Account with key1 is updated twice in two different slots -- should only get notified once.
280        // Account with key2 is updated slot0, should get notified once
281        // Account with key3 is updated in slot1, should get notified once
282        let key1 = solana_sdk::pubkey::new_rand();
283        let mut account1_lamports: u64 = 1;
284        let account1 =
285            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
286        let slot0 = 0;
287        accounts.store_uncached(slot0, &[(&key1, &account1)]);
288
289        let key2 = solana_sdk::pubkey::new_rand();
290        let account2_lamports: u64 = 200;
291        let account2 =
292            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
293        accounts.store_uncached(slot0, &[(&key2, &account2)]);
294
295        account1_lamports = 2;
296        let slot1 = 1;
297        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
298        accounts.store_uncached(slot1, &[(&key1, &account1)]);
299        let notifier = GeyserTestPlugin::default();
300
301        let key3 = solana_sdk::pubkey::new_rand();
302        let account3_lamports: u64 = 300;
303        let account3 =
304            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
305        accounts.store_uncached(slot1, &[(&key3, &account3)]);
306
307        let notifier = Arc::new(notifier);
308        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
309
310        accounts.notify_account_restore_from_snapshot();
311
312        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
313        assert_eq!(
314            notifier.accounts_notified.get(&key1).unwrap()[0]
315                .1
316                .lamports(),
317            account1_lamports
318        );
319        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
320        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
321        assert_eq!(
322            notifier.accounts_notified.get(&key2).unwrap()[0]
323                .1
324                .lamports(),
325            account2_lamports
326        );
327        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
328        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
329        assert_eq!(
330            notifier.accounts_notified.get(&key3).unwrap()[0]
331                .1
332                .lamports(),
333            account3_lamports
334        );
335        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
336        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
337    }
338
339    #[test]
340    fn test_notify_account_at_accounts_update() {
341        let mut accounts = AccountsDb::new_single_for_tests();
342
343        let notifier = GeyserTestPlugin::default();
344
345        let notifier = Arc::new(notifier);
346        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
347
348        // Account with key1 is updated twice in two different slots -- should only get notified twice.
349        // Account with key2 is updated slot0, should get notified once
350        // Account with key3 is updated in slot1, should get notified once
351        let key1 = solana_sdk::pubkey::new_rand();
352        let account1_lamports1: u64 = 1;
353        let account1 =
354            AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
355        let slot0 = 0;
356        accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
357
358        let key2 = solana_sdk::pubkey::new_rand();
359        let account2_lamports: u64 = 200;
360        let account2 =
361            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
362        accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
363
364        let account1_lamports2 = 2;
365        let slot1 = 1;
366        let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
367        accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
368
369        let key3 = solana_sdk::pubkey::new_rand();
370        let account3_lamports: u64 = 300;
371        let account3 =
372            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
373        accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
374
375        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
376        assert_eq!(
377            notifier.accounts_notified.get(&key1).unwrap()[0]
378                .1
379                .lamports(),
380            account1_lamports1
381        );
382        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
383        assert_eq!(
384            notifier.accounts_notified.get(&key1).unwrap()[1]
385                .1
386                .lamports(),
387            account1_lamports2
388        );
389        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
390
391        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
392        assert_eq!(
393            notifier.accounts_notified.get(&key2).unwrap()[0]
394                .1
395                .lamports(),
396            account2_lamports
397        );
398        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
399        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
400        assert_eq!(
401            notifier.accounts_notified.get(&key3).unwrap()[0]
402                .1
403                .lamports(),
404            account3_lamports
405        );
406        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
407    }
408}