1use {
2 crate::{account_storage::meta::StoredAccountMeta, accounts_db::AccountsDb},
3 solana_measure::measure::Measure,
4 solana_metrics::*,
5 solana_pubkey::Pubkey,
6 solana_sdk::{account::AccountSharedData, clock::Slot, transaction::SanitizedTransaction},
7 std::collections::{HashMap, HashSet},
8};
9
10#[derive(Default)]
11pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
12 pub total_accounts: usize,
13 pub skipped_accounts: usize,
14 pub notified_accounts: usize,
15 pub elapsed_filtering_us: usize,
16 pub total_pure_notify: usize,
17 pub total_pure_bookeeping: usize,
18 pub elapsed_notifying_us: usize,
19}
20
21impl GeyserPluginNotifyAtSnapshotRestoreStats {
22 pub fn report(&self) {
23 datapoint_info!(
24 "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
25 ("total_accounts", self.total_accounts, i64),
26 ("skipped_accounts", self.skipped_accounts, i64),
27 ("notified_accounts", self.notified_accounts, i64),
28 ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
29 ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
30 ("total_pure_notify_us", self.total_pure_notify, i64),
31 ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
32 );
33 }
34}
35
36impl AccountsDb {
37 pub fn notify_account_restore_from_snapshot(&self) {
41 let Some(accounts_update_notifier) = &self.accounts_update_notifier else {
42 return;
43 };
44
45 let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
46 if accounts_update_notifier.snapshot_notifications_enabled() {
47 let mut slots = self.storage.all_slots();
48 let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
49
50 slots.sort_by(|a, b| b.cmp(a));
51 for slot in slots {
52 self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
53 }
54 }
55
56 accounts_update_notifier.notify_end_of_restore_from_snapshot();
57 notify_stats.report();
58 }
59
60 pub fn notify_account_at_accounts_update(
61 &self,
62 slot: Slot,
63 account: &AccountSharedData,
64 txn: &Option<&SanitizedTransaction>,
65 pubkey: &Pubkey,
66 write_version: u64,
67 ) {
68 if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
69 accounts_update_notifier.notify_account_update(
70 slot,
71 account,
72 txn,
73 pubkey,
74 write_version,
75 );
76 }
77 }
78
79 fn notify_accounts_in_slot(
80 &self,
81 slot: Slot,
82 notified_accounts: &mut HashSet<Pubkey>,
83 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
84 ) {
85 let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
86
87 let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
88 let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
89 let mut account_len = 0;
90 let mut pubkeys = HashSet::new();
91
92 let mut i = 0;
96 storage_entry.accounts.scan_pubkeys(|pubkey| {
97 i += 1; if !pubkeys.insert(*pubkey) {
99 accounts_duplicate.insert(*pubkey, i); }
101 });
102
103 let mut i = 0;
105 storage_entry.accounts.scan_accounts(|account| {
106 i += 1;
107 account_len += 1;
108 if notified_accounts.contains(account.pubkey()) {
109 notify_stats.skipped_accounts += 1;
110 return;
111 }
112 if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
113 if highest_i != &i {
114 return;
117 }
118 }
119
120 self.notify_filtered_accounts(
124 slot,
125 notified_accounts,
126 std::iter::once(account),
127 notify_stats,
128 );
129 });
130 notify_stats.total_accounts += account_len;
131 measure_filter.stop();
132 notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
133 }
134
135 fn notify_filtered_accounts<'a>(
136 &self,
137 slot: Slot,
138 notified_accounts: &mut HashSet<Pubkey>,
139 accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
140 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
141 ) {
142 let notifier = self.accounts_update_notifier.as_ref().unwrap();
143 let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
144 for account in accounts_to_stream {
145 let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
146 notifier.notify_account_restore_from_snapshot(slot, &account);
147 measure_pure_notify.stop();
148
149 notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
150
151 let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
152 notified_accounts.insert(*account.pubkey());
153 measure_bookkeep.stop();
154 notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
155
156 notify_stats.notified_accounts += 1;
157 }
158 measure_notify.stop();
159 notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
160 }
161}
162
163#[cfg(test)]
164pub mod tests {
165 use {
166 crate::{
167 account_storage::meta::StoredAccountMeta,
168 accounts_db::AccountsDb,
169 accounts_update_notifier_interface::{
170 AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
171 },
172 },
173 dashmap::DashMap,
174 solana_pubkey::Pubkey,
175 solana_sdk::{
176 account::{AccountSharedData, ReadableAccount},
177 clock::Slot,
178 transaction::SanitizedTransaction,
179 },
180 std::sync::{
181 atomic::{AtomicBool, Ordering},
182 Arc,
183 },
184 };
185
186 impl AccountsDb {
187 pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
188 self.accounts_update_notifier = notifier;
189 }
190 }
191
192 #[derive(Debug, Default)]
193 struct GeyserTestPlugin {
194 pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
195 pub is_startup_done: AtomicBool,
196 }
197
198 impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
199 fn snapshot_notifications_enabled(&self) -> bool {
200 true
201 }
202
203 fn notify_account_update(
205 &self,
206 slot: Slot,
207 account: &AccountSharedData,
208 _txn: &Option<&SanitizedTransaction>,
209 pubkey: &Pubkey,
210 _write_version: u64,
211 ) {
212 self.accounts_notified
213 .entry(*pubkey)
214 .or_default()
215 .push((slot, account.clone()));
216 }
217
218 fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
221 self.accounts_notified
222 .entry(*account.pubkey())
223 .or_default()
224 .push((slot, account.to_account_shared_data()));
225 }
226
227 fn notify_end_of_restore_from_snapshot(&self) {
228 self.is_startup_done.store(true, Ordering::Relaxed);
229 }
230 }
231
232 #[test]
233 fn test_notify_account_restore_from_snapshot_once_per_slot() {
234 let mut accounts = AccountsDb::new_single_for_tests();
235 let key1 = solana_pubkey::new_rand();
237 let mut account1_lamports: u64 = 1;
238 let account1 =
239 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
240 let slot0 = 0;
241 accounts.store_uncached(slot0, &[(&key1, &account1)]);
242
243 account1_lamports = 2;
244 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
245 accounts.store_uncached(slot0, &[(&key1, &account1)]);
246 let notifier = GeyserTestPlugin::default();
247
248 let key2 = solana_pubkey::new_rand();
249 let account2_lamports: u64 = 100;
250 let account2 =
251 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
252
253 accounts.store_uncached(slot0, &[(&key2, &account2)]);
254
255 let notifier = Arc::new(notifier);
256 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
257
258 accounts.notify_account_restore_from_snapshot();
259
260 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
261 assert_eq!(
262 notifier.accounts_notified.get(&key1).unwrap()[0]
263 .1
264 .lamports(),
265 account1_lamports
266 );
267 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
268 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
269 assert_eq!(
270 notifier.accounts_notified.get(&key2).unwrap()[0]
271 .1
272 .lamports(),
273 account2_lamports
274 );
275 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
276
277 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
278 }
279
280 #[test]
281 fn test_notify_account_restore_from_snapshot_once_across_slots() {
282 let mut accounts = AccountsDb::new_single_for_tests();
283 let key1 = solana_pubkey::new_rand();
287 let mut account1_lamports: u64 = 1;
288 let account1 =
289 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
290 let slot0 = 0;
291 accounts.store_uncached(slot0, &[(&key1, &account1)]);
292
293 let key2 = solana_pubkey::new_rand();
294 let account2_lamports: u64 = 200;
295 let account2 =
296 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
297 accounts.store_uncached(slot0, &[(&key2, &account2)]);
298
299 account1_lamports = 2;
300 let slot1 = 1;
301 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
302 accounts.store_uncached(slot1, &[(&key1, &account1)]);
303 let notifier = GeyserTestPlugin::default();
304
305 let key3 = solana_pubkey::new_rand();
306 let account3_lamports: u64 = 300;
307 let account3 =
308 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
309 accounts.store_uncached(slot1, &[(&key3, &account3)]);
310
311 let notifier = Arc::new(notifier);
312 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
313
314 accounts.notify_account_restore_from_snapshot();
315
316 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
317 assert_eq!(
318 notifier.accounts_notified.get(&key1).unwrap()[0]
319 .1
320 .lamports(),
321 account1_lamports
322 );
323 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
324 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
325 assert_eq!(
326 notifier.accounts_notified.get(&key2).unwrap()[0]
327 .1
328 .lamports(),
329 account2_lamports
330 );
331 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
332 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
333 assert_eq!(
334 notifier.accounts_notified.get(&key3).unwrap()[0]
335 .1
336 .lamports(),
337 account3_lamports
338 );
339 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
340 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
341 }
342
343 #[test]
344 fn test_notify_account_at_accounts_update() {
345 let mut accounts = AccountsDb::new_single_for_tests();
346
347 let notifier = GeyserTestPlugin::default();
348
349 let notifier = Arc::new(notifier);
350 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
351
352 let key1 = solana_pubkey::new_rand();
356 let account1_lamports1: u64 = 1;
357 let account1 =
358 AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
359 let slot0 = 0;
360 accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
361
362 let key2 = solana_pubkey::new_rand();
363 let account2_lamports: u64 = 200;
364 let account2 =
365 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
366 accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
367
368 let account1_lamports2 = 2;
369 let slot1 = 1;
370 let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
371 accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
372
373 let key3 = solana_pubkey::new_rand();
374 let account3_lamports: u64 = 300;
375 let account3 =
376 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
377 accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
378
379 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
380 assert_eq!(
381 notifier.accounts_notified.get(&key1).unwrap()[0]
382 .1
383 .lamports(),
384 account1_lamports1
385 );
386 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
387 assert_eq!(
388 notifier.accounts_notified.get(&key1).unwrap()[1]
389 .1
390 .lamports(),
391 account1_lamports2
392 );
393 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
394
395 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
396 assert_eq!(
397 notifier.accounts_notified.get(&key2).unwrap()[0]
398 .1
399 .lamports(),
400 account2_lamports
401 );
402 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
403 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
404 assert_eq!(
405 notifier.accounts_notified.get(&key3).unwrap()[0]
406 .1
407 .lamports(),
408 account3_lamports
409 );
410 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
411 }
412}