solana_accounts_db/accounts_db/
geyser_plugin_utils.rs

1use {
2    crate::{account_storage::meta::StoredAccountMeta, accounts_db::AccountsDb},
3    solana_measure::measure::Measure,
4    solana_metrics::*,
5    solana_pubkey::Pubkey,
6    solana_sdk::{account::AccountSharedData, clock::Slot, transaction::SanitizedTransaction},
7    std::collections::{HashMap, HashSet},
8};
9
10#[derive(Default)]
11pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
12    pub total_accounts: usize,
13    pub skipped_accounts: usize,
14    pub notified_accounts: usize,
15    pub elapsed_filtering_us: usize,
16    pub total_pure_notify: usize,
17    pub total_pure_bookeeping: usize,
18    pub elapsed_notifying_us: usize,
19}
20
21impl GeyserPluginNotifyAtSnapshotRestoreStats {
22    pub fn report(&self) {
23        datapoint_info!(
24            "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
25            ("total_accounts", self.total_accounts, i64),
26            ("skipped_accounts", self.skipped_accounts, i64),
27            ("notified_accounts", self.notified_accounts, i64),
28            ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
29            ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
30            ("total_pure_notify_us", self.total_pure_notify, i64),
31            ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
32        );
33    }
34}
35
36impl AccountsDb {
37    /// Notify the plugins of account data when AccountsDb is restored from a snapshot. The data is streamed
38    /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
39    /// multiple times only the last write (with highest write_version) is notified.
40    pub fn notify_account_restore_from_snapshot(&self) {
41        let Some(accounts_update_notifier) = &self.accounts_update_notifier else {
42            return;
43        };
44
45        let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
46        if accounts_update_notifier.snapshot_notifications_enabled() {
47            let mut slots = self.storage.all_slots();
48            let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
49
50            slots.sort_by(|a, b| b.cmp(a));
51            for slot in slots {
52                self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
53            }
54        }
55
56        accounts_update_notifier.notify_end_of_restore_from_snapshot();
57        notify_stats.report();
58    }
59
60    pub fn notify_account_at_accounts_update(
61        &self,
62        slot: Slot,
63        account: &AccountSharedData,
64        txn: &Option<&SanitizedTransaction>,
65        pubkey: &Pubkey,
66        write_version: u64,
67    ) {
68        if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
69            accounts_update_notifier.notify_account_update(
70                slot,
71                account,
72                txn,
73                pubkey,
74                write_version,
75            );
76        }
77    }
78
79    fn notify_accounts_in_slot(
80        &self,
81        slot: Slot,
82        notified_accounts: &mut HashSet<Pubkey>,
83        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
84    ) {
85        let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
86
87        let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
88        let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
89        let mut account_len = 0;
90        let mut pubkeys = HashSet::new();
91
92        // populate `accounts_duplicate` for any pubkeys that are in this storage twice.
93        // Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure
94        // we don't have duplicate pubkeys.
95        let mut i = 0;
96        storage_entry.accounts.scan_pubkeys(|pubkey| {
97            i += 1; // pre-increment to most easily match early returns in next loop
98            if !pubkeys.insert(*pubkey) {
99                accounts_duplicate.insert(*pubkey, i); // remember the highest index entry in this slot
100            }
101        });
102
103        // now, actually notify geyser
104        let mut i = 0;
105        storage_entry.accounts.scan_accounts(|account| {
106            i += 1;
107            account_len += 1;
108            if notified_accounts.contains(account.pubkey()) {
109                notify_stats.skipped_accounts += 1;
110                return;
111            }
112            if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
113                if highest_i != &i {
114                    // this pubkey is in this storage twice and the current instance is not the last one, so we skip it.
115                    // We only send unique accounts in this slot to `notify_filtered_accounts`
116                    return;
117                }
118            }
119
120            // later entries in the same slot are more recent and override earlier accounts for the same pubkey
121            // We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version.
122            // As long as all accounts for this slot are in 1 append vec that can be iterated oldest to newest.
123            self.notify_filtered_accounts(
124                slot,
125                notified_accounts,
126                std::iter::once(account),
127                notify_stats,
128            );
129        });
130        notify_stats.total_accounts += account_len;
131        measure_filter.stop();
132        notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
133    }
134
135    fn notify_filtered_accounts<'a>(
136        &self,
137        slot: Slot,
138        notified_accounts: &mut HashSet<Pubkey>,
139        accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
140        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
141    ) {
142        let notifier = self.accounts_update_notifier.as_ref().unwrap();
143        let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
144        for account in accounts_to_stream {
145            let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
146            notifier.notify_account_restore_from_snapshot(slot, &account);
147            measure_pure_notify.stop();
148
149            notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
150
151            let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
152            notified_accounts.insert(*account.pubkey());
153            measure_bookkeep.stop();
154            notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
155
156            notify_stats.notified_accounts += 1;
157        }
158        measure_notify.stop();
159        notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
160    }
161}
162
163#[cfg(test)]
164pub mod tests {
165    use {
166        crate::{
167            account_storage::meta::StoredAccountMeta,
168            accounts_db::AccountsDb,
169            accounts_update_notifier_interface::{
170                AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
171            },
172        },
173        dashmap::DashMap,
174        solana_pubkey::Pubkey,
175        solana_sdk::{
176            account::{AccountSharedData, ReadableAccount},
177            clock::Slot,
178            transaction::SanitizedTransaction,
179        },
180        std::sync::{
181            atomic::{AtomicBool, Ordering},
182            Arc,
183        },
184    };
185
186    impl AccountsDb {
187        pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
188            self.accounts_update_notifier = notifier;
189        }
190    }
191
192    #[derive(Debug, Default)]
193    struct GeyserTestPlugin {
194        pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
195        pub is_startup_done: AtomicBool,
196    }
197
198    impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
199        fn snapshot_notifications_enabled(&self) -> bool {
200            true
201        }
202
203        /// Notified when an account is updated at runtime, due to transaction activities
204        fn notify_account_update(
205            &self,
206            slot: Slot,
207            account: &AccountSharedData,
208            _txn: &Option<&SanitizedTransaction>,
209            pubkey: &Pubkey,
210            _write_version: u64,
211        ) {
212            self.accounts_notified
213                .entry(*pubkey)
214                .or_default()
215                .push((slot, account.clone()));
216        }
217
218        /// Notified when the AccountsDb is initialized at start when restored
219        /// from a snapshot.
220        fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
221            self.accounts_notified
222                .entry(*account.pubkey())
223                .or_default()
224                .push((slot, account.to_account_shared_data()));
225        }
226
227        fn notify_end_of_restore_from_snapshot(&self) {
228            self.is_startup_done.store(true, Ordering::Relaxed);
229        }
230    }
231
232    #[test]
233    fn test_notify_account_restore_from_snapshot_once_per_slot() {
234        let mut accounts = AccountsDb::new_single_for_tests();
235        // Account with key1 is updated twice in the store -- should only get notified once.
236        let key1 = solana_pubkey::new_rand();
237        let mut account1_lamports: u64 = 1;
238        let account1 =
239            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
240        let slot0 = 0;
241        accounts.store_uncached(slot0, &[(&key1, &account1)]);
242
243        account1_lamports = 2;
244        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
245        accounts.store_uncached(slot0, &[(&key1, &account1)]);
246        let notifier = GeyserTestPlugin::default();
247
248        let key2 = solana_pubkey::new_rand();
249        let account2_lamports: u64 = 100;
250        let account2 =
251            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
252
253        accounts.store_uncached(slot0, &[(&key2, &account2)]);
254
255        let notifier = Arc::new(notifier);
256        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
257
258        accounts.notify_account_restore_from_snapshot();
259
260        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
261        assert_eq!(
262            notifier.accounts_notified.get(&key1).unwrap()[0]
263                .1
264                .lamports(),
265            account1_lamports
266        );
267        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
268        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
269        assert_eq!(
270            notifier.accounts_notified.get(&key2).unwrap()[0]
271                .1
272                .lamports(),
273            account2_lamports
274        );
275        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
276
277        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
278    }
279
280    #[test]
281    fn test_notify_account_restore_from_snapshot_once_across_slots() {
282        let mut accounts = AccountsDb::new_single_for_tests();
283        // Account with key1 is updated twice in two different slots -- should only get notified once.
284        // Account with key2 is updated slot0, should get notified once
285        // Account with key3 is updated in slot1, should get notified once
286        let key1 = solana_pubkey::new_rand();
287        let mut account1_lamports: u64 = 1;
288        let account1 =
289            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
290        let slot0 = 0;
291        accounts.store_uncached(slot0, &[(&key1, &account1)]);
292
293        let key2 = solana_pubkey::new_rand();
294        let account2_lamports: u64 = 200;
295        let account2 =
296            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
297        accounts.store_uncached(slot0, &[(&key2, &account2)]);
298
299        account1_lamports = 2;
300        let slot1 = 1;
301        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
302        accounts.store_uncached(slot1, &[(&key1, &account1)]);
303        let notifier = GeyserTestPlugin::default();
304
305        let key3 = solana_pubkey::new_rand();
306        let account3_lamports: u64 = 300;
307        let account3 =
308            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
309        accounts.store_uncached(slot1, &[(&key3, &account3)]);
310
311        let notifier = Arc::new(notifier);
312        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
313
314        accounts.notify_account_restore_from_snapshot();
315
316        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
317        assert_eq!(
318            notifier.accounts_notified.get(&key1).unwrap()[0]
319                .1
320                .lamports(),
321            account1_lamports
322        );
323        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
324        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
325        assert_eq!(
326            notifier.accounts_notified.get(&key2).unwrap()[0]
327                .1
328                .lamports(),
329            account2_lamports
330        );
331        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
332        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
333        assert_eq!(
334            notifier.accounts_notified.get(&key3).unwrap()[0]
335                .1
336                .lamports(),
337            account3_lamports
338        );
339        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
340        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
341    }
342
343    #[test]
344    fn test_notify_account_at_accounts_update() {
345        let mut accounts = AccountsDb::new_single_for_tests();
346
347        let notifier = GeyserTestPlugin::default();
348
349        let notifier = Arc::new(notifier);
350        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
351
352        // Account with key1 is updated twice in two different slots -- should only get notified twice.
353        // Account with key2 is updated slot0, should get notified once
354        // Account with key3 is updated in slot1, should get notified once
355        let key1 = solana_pubkey::new_rand();
356        let account1_lamports1: u64 = 1;
357        let account1 =
358            AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
359        let slot0 = 0;
360        accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
361
362        let key2 = solana_pubkey::new_rand();
363        let account2_lamports: u64 = 200;
364        let account2 =
365            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
366        accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
367
368        let account1_lamports2 = 2;
369        let slot1 = 1;
370        let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
371        accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
372
373        let key3 = solana_pubkey::new_rand();
374        let account3_lamports: u64 = 300;
375        let account3 =
376            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
377        accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
378
379        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
380        assert_eq!(
381            notifier.accounts_notified.get(&key1).unwrap()[0]
382                .1
383                .lamports(),
384            account1_lamports1
385        );
386        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
387        assert_eq!(
388            notifier.accounts_notified.get(&key1).unwrap()[1]
389                .1
390                .lamports(),
391            account1_lamports2
392        );
393        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
394
395        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
396        assert_eq!(
397            notifier.accounts_notified.get(&key2).unwrap()[0]
398                .1
399                .lamports(),
400            account2_lamports
401        );
402        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
403        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
404        assert_eq!(
405            notifier.accounts_notified.get(&key3).unwrap()[0]
406                .1
407                .lamports(),
408            account3_lamports
409        );
410        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
411    }
412}