1use {
2 crate::{account_storage::meta::StoredAccountMeta, accounts_db::AccountsDb},
3 solana_measure::measure::Measure,
4 solana_metrics::*,
5 solana_sdk::{
6 account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
7 },
8 std::collections::{HashMap, HashSet},
9};
10
11#[derive(Default)]
12pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
13 pub total_accounts: usize,
14 pub skipped_accounts: usize,
15 pub notified_accounts: usize,
16 pub elapsed_filtering_us: usize,
17 pub total_pure_notify: usize,
18 pub total_pure_bookeeping: usize,
19 pub elapsed_notifying_us: usize,
20}
21
22impl GeyserPluginNotifyAtSnapshotRestoreStats {
23 pub fn report(&self) {
24 datapoint_info!(
25 "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
26 ("total_accounts", self.total_accounts, i64),
27 ("skipped_accounts", self.skipped_accounts, i64),
28 ("notified_accounts", self.notified_accounts, i64),
29 ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
30 ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
31 ("total_pure_notify_us", self.total_pure_notify, i64),
32 ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
33 );
34 }
35}
36
37impl AccountsDb {
38 pub fn notify_account_restore_from_snapshot(&self) {
42 if self.accounts_update_notifier.is_none() {
43 return;
44 }
45
46 let mut slots = self.storage.all_slots();
47 let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
48 let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
49
50 slots.sort_by(|a, b| b.cmp(a));
51 for slot in slots {
52 self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
53 }
54
55 let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
56 accounts_update_notifier.notify_end_of_restore_from_snapshot();
57 notify_stats.report();
58 }
59
60 pub fn notify_account_at_accounts_update(
61 &self,
62 slot: Slot,
63 account: &AccountSharedData,
64 txn: &Option<&SanitizedTransaction>,
65 pubkey: &Pubkey,
66 write_version: u64,
67 ) {
68 if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
69 accounts_update_notifier.notify_account_update(
70 slot,
71 account,
72 txn,
73 pubkey,
74 write_version,
75 );
76 }
77 }
78
79 fn notify_accounts_in_slot(
80 &self,
81 slot: Slot,
82 notified_accounts: &mut HashSet<Pubkey>,
83 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
84 ) {
85 let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
86
87 let mut accounts_duplicate: HashMap<Pubkey, usize> = HashMap::default();
88 let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
89 let mut account_len = 0;
90 let mut pubkeys = HashSet::new();
91
92 let mut i = 0;
96 storage_entry.accounts.scan_pubkeys(|pubkey| {
97 i += 1; if !pubkeys.insert(*pubkey) {
99 accounts_duplicate.insert(*pubkey, i); }
101 });
102
103 let mut i = 0;
105 storage_entry.accounts.scan_accounts(|account| {
106 i += 1;
107 account_len += 1;
108 if notified_accounts.contains(account.pubkey()) {
109 notify_stats.skipped_accounts += 1;
110 return;
111 }
112 if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
113 if highest_i != &i {
114 return;
117 }
118 }
119
120 self.notify_filtered_accounts(
124 slot,
125 notified_accounts,
126 std::iter::once(account),
127 notify_stats,
128 );
129 });
130 notify_stats.total_accounts += account_len;
131 measure_filter.stop();
132 notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
133 }
134
135 fn notify_filtered_accounts<'a>(
136 &self,
137 slot: Slot,
138 notified_accounts: &mut HashSet<Pubkey>,
139 accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
140 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
141 ) {
142 let notifier = self.accounts_update_notifier.as_ref().unwrap();
143 let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
144 for account in accounts_to_stream {
145 let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
146 notifier.notify_account_restore_from_snapshot(slot, &account);
147 measure_pure_notify.stop();
148
149 notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
150
151 let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
152 notified_accounts.insert(*account.pubkey());
153 measure_bookkeep.stop();
154 notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
155
156 notify_stats.notified_accounts += 1;
157 }
158 measure_notify.stop();
159 notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
160 }
161}
162
163#[cfg(test)]
164pub mod tests {
165 use {
166 crate::{
167 account_storage::meta::StoredAccountMeta,
168 accounts_db::AccountsDb,
169 accounts_update_notifier_interface::{
170 AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
171 },
172 },
173 dashmap::DashMap,
174 solana_sdk::{
175 account::{AccountSharedData, ReadableAccount},
176 clock::Slot,
177 pubkey::Pubkey,
178 transaction::SanitizedTransaction,
179 },
180 std::sync::{
181 atomic::{AtomicBool, Ordering},
182 Arc,
183 },
184 };
185
186 impl AccountsDb {
187 pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
188 self.accounts_update_notifier = notifier;
189 }
190 }
191
192 #[derive(Debug, Default)]
193 struct GeyserTestPlugin {
194 pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
195 pub is_startup_done: AtomicBool,
196 }
197
198 impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
199 fn notify_account_update(
201 &self,
202 slot: Slot,
203 account: &AccountSharedData,
204 _txn: &Option<&SanitizedTransaction>,
205 pubkey: &Pubkey,
206 _write_version: u64,
207 ) {
208 self.accounts_notified
209 .entry(*pubkey)
210 .or_default()
211 .push((slot, account.clone()));
212 }
213
214 fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
217 self.accounts_notified
218 .entry(*account.pubkey())
219 .or_default()
220 .push((slot, account.to_account_shared_data()));
221 }
222
223 fn notify_end_of_restore_from_snapshot(&self) {
224 self.is_startup_done.store(true, Ordering::Relaxed);
225 }
226 }
227
228 #[test]
229 fn test_notify_account_restore_from_snapshot_once_per_slot() {
230 let mut accounts = AccountsDb::new_single_for_tests();
231 let key1 = solana_sdk::pubkey::new_rand();
233 let mut account1_lamports: u64 = 1;
234 let account1 =
235 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
236 let slot0 = 0;
237 accounts.store_uncached(slot0, &[(&key1, &account1)]);
238
239 account1_lamports = 2;
240 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
241 accounts.store_uncached(slot0, &[(&key1, &account1)]);
242 let notifier = GeyserTestPlugin::default();
243
244 let key2 = solana_sdk::pubkey::new_rand();
245 let account2_lamports: u64 = 100;
246 let account2 =
247 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
248
249 accounts.store_uncached(slot0, &[(&key2, &account2)]);
250
251 let notifier = Arc::new(notifier);
252 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
253
254 accounts.notify_account_restore_from_snapshot();
255
256 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
257 assert_eq!(
258 notifier.accounts_notified.get(&key1).unwrap()[0]
259 .1
260 .lamports(),
261 account1_lamports
262 );
263 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
264 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
265 assert_eq!(
266 notifier.accounts_notified.get(&key2).unwrap()[0]
267 .1
268 .lamports(),
269 account2_lamports
270 );
271 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
272
273 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
274 }
275
276 #[test]
277 fn test_notify_account_restore_from_snapshot_once_across_slots() {
278 let mut accounts = AccountsDb::new_single_for_tests();
279 let key1 = solana_sdk::pubkey::new_rand();
283 let mut account1_lamports: u64 = 1;
284 let account1 =
285 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
286 let slot0 = 0;
287 accounts.store_uncached(slot0, &[(&key1, &account1)]);
288
289 let key2 = solana_sdk::pubkey::new_rand();
290 let account2_lamports: u64 = 200;
291 let account2 =
292 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
293 accounts.store_uncached(slot0, &[(&key2, &account2)]);
294
295 account1_lamports = 2;
296 let slot1 = 1;
297 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
298 accounts.store_uncached(slot1, &[(&key1, &account1)]);
299 let notifier = GeyserTestPlugin::default();
300
301 let key3 = solana_sdk::pubkey::new_rand();
302 let account3_lamports: u64 = 300;
303 let account3 =
304 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
305 accounts.store_uncached(slot1, &[(&key3, &account3)]);
306
307 let notifier = Arc::new(notifier);
308 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
309
310 accounts.notify_account_restore_from_snapshot();
311
312 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
313 assert_eq!(
314 notifier.accounts_notified.get(&key1).unwrap()[0]
315 .1
316 .lamports(),
317 account1_lamports
318 );
319 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
320 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
321 assert_eq!(
322 notifier.accounts_notified.get(&key2).unwrap()[0]
323 .1
324 .lamports(),
325 account2_lamports
326 );
327 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
328 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
329 assert_eq!(
330 notifier.accounts_notified.get(&key3).unwrap()[0]
331 .1
332 .lamports(),
333 account3_lamports
334 );
335 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
336 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
337 }
338
339 #[test]
340 fn test_notify_account_at_accounts_update() {
341 let mut accounts = AccountsDb::new_single_for_tests();
342
343 let notifier = GeyserTestPlugin::default();
344
345 let notifier = Arc::new(notifier);
346 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
347
348 let key1 = solana_sdk::pubkey::new_rand();
352 let account1_lamports1: u64 = 1;
353 let account1 =
354 AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
355 let slot0 = 0;
356 accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
357
358 let key2 = solana_sdk::pubkey::new_rand();
359 let account2_lamports: u64 = 200;
360 let account2 =
361 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
362 accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
363
364 let account1_lamports2 = 2;
365 let slot1 = 1;
366 let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
367 accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
368
369 let key3 = solana_sdk::pubkey::new_rand();
370 let account3_lamports: u64 = 300;
371 let account3 =
372 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
373 accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
374
375 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
376 assert_eq!(
377 notifier.accounts_notified.get(&key1).unwrap()[0]
378 .1
379 .lamports(),
380 account1_lamports1
381 );
382 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
383 assert_eq!(
384 notifier.accounts_notified.get(&key1).unwrap()[1]
385 .1
386 .lamports(),
387 account1_lamports2
388 );
389 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
390
391 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
392 assert_eq!(
393 notifier.accounts_notified.get(&key2).unwrap()[0]
394 .1
395 .lamports(),
396 account2_lamports
397 );
398 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
399 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
400 assert_eq!(
401 notifier.accounts_notified.get(&key3).unwrap()[0]
402 .1
403 .lamports(),
404 account3_lamports
405 );
406 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
407 }
408}