1mod stats;
6#[cfg(feature = "dev-context-only-utils")]
7use qualifier_attr::qualifiers;
8use {
9 crate::{
10 bank::{Bank, BankSlotDelta, DropCallback},
11 bank_forks::BankForks,
12 snapshot_bank_utils,
13 snapshot_config::SnapshotConfig,
14 snapshot_package::{self, AccountsPackage, AccountsPackageKind, SnapshotKind},
15 snapshot_utils::{self, SnapshotError},
16 },
17 crossbeam_channel::{Receiver, SendError, Sender},
18 log::*,
19 rand::{thread_rng, Rng},
20 rayon::iter::{IntoParallelIterator, ParallelIterator},
21 solana_accounts_db::{
22 accounts_db::CalcAccountsHashDataSource, accounts_hash::CalcAccountsHashConfig,
23 },
24 solana_measure::{measure::Measure, measure_us},
25 solana_sdk::clock::{BankId, Slot},
26 stats::StatsManager,
27 std::{
28 boxed::Box,
29 fmt::{Debug, Formatter},
30 sync::{
31 atomic::{AtomicBool, AtomicU64, Ordering},
32 Arc, RwLock,
33 },
34 thread::{self, sleep, Builder, JoinHandle},
35 time::{Duration, Instant},
36 },
37};
38
39const INTERVAL_MS: u64 = 100;
40const CLEAN_INTERVAL_BLOCKS: u64 = 100;
41
42pub type SnapshotRequestSender = Sender<SnapshotRequest>;
43pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
44pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
45pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
46
47const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
49const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
51
52#[derive(Debug, Default)]
53struct PrunedBankQueueLenReporter {
54 last_report_time: AtomicU64,
55}
56
57impl PrunedBankQueueLenReporter {
58 fn report(&self, q_len: usize) {
59 let now = solana_sdk::timing::timestamp();
60 let last_report_time = self.last_report_time.load(Ordering::Acquire);
61 if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
62 && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
63 {
64 datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
65 self.last_report_time.store(now, Ordering::Release);
66 }
67 }
68}
69
70lazy_static! {
71 static ref BANK_DROP_QUEUE_REPORTER: PrunedBankQueueLenReporter =
72 PrunedBankQueueLenReporter::default();
73}
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77 sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81 fn callback(&self, bank: &Bank) {
82 BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83 if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84 info!("bank DropCallback signal queue disconnected.");
85 }
86 }
87
88 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89 Box::new(self.clone())
90 }
91}
92
93impl Debug for SendDroppedBankCallback {
94 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
95 write!(f, "SendDroppedBankCallback({self:p})")
96 }
97}
98
99impl SendDroppedBankCallback {
100 pub fn new(sender: DroppedSlotsSender) -> Self {
101 Self { sender }
102 }
103}
104
105pub struct SnapshotRequest {
106 pub snapshot_root_bank: Arc<Bank>,
107 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108 pub request_kind: SnapshotRequestKind,
109
110 pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct("SnapshotRequest")
118 .field("request kind", &self.request_kind)
119 .field("bank slot", &self.snapshot_root_bank.slot())
120 .field("block height", &self.snapshot_root_bank.block_height())
121 .finish_non_exhaustive()
122 }
123}
124
125#[derive(Debug, Copy, Clone, Eq, PartialEq)]
131pub enum SnapshotRequestKind {
132 Snapshot,
133 EpochAccountsHash,
134}
135
136pub struct SnapshotRequestHandler {
137 pub snapshot_config: SnapshotConfig,
138 pub snapshot_request_sender: SnapshotRequestSender,
139 pub snapshot_request_receiver: SnapshotRequestReceiver,
140 pub accounts_package_sender: Sender<AccountsPackage>,
141}
142
143impl SnapshotRequestHandler {
144 #[allow(clippy::type_complexity)]
146 pub fn handle_snapshot_requests(
147 &self,
148 test_hash_calculation: bool,
149 non_snapshot_time_us: u128,
150 exit: &AtomicBool,
151 ) -> Option<Result<u64, SnapshotError>> {
152 let (
153 snapshot_request,
154 accounts_package_kind,
155 num_outstanding_requests,
156 num_re_enqueued_requests,
157 ) = self.get_next_snapshot_request()?;
158
159 datapoint_info!(
160 "handle_snapshot_requests",
161 ("num_outstanding_requests", num_outstanding_requests, i64),
162 ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
163 (
164 "enqueued_time_us",
165 snapshot_request.enqueued.elapsed().as_micros(),
166 i64
167 ),
168 );
169
170 Some(self.handle_snapshot_request(
171 test_hash_calculation,
172 non_snapshot_time_us,
173 snapshot_request,
174 accounts_package_kind,
175 exit,
176 ))
177 }
178
179 fn get_next_snapshot_request(
189 &self,
190 ) -> Option<(
191 SnapshotRequest,
192 AccountsPackageKind,
193 usize,
194 usize,
195 )> {
196 let mut requests: Vec<_> = self
197 .snapshot_request_receiver
198 .try_iter()
199 .map(|request| {
200 let accounts_package_kind =
201 new_accounts_package_kind(&request, &self.snapshot_config);
202 (request, accounts_package_kind)
203 })
204 .collect();
205 let requests_len = requests.len();
206 debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
207
208 match requests_len {
211 0 => None,
212 1 => {
213 let (snapshot_request, accounts_package_kind) = requests.pop().unwrap();
215 Some((snapshot_request, accounts_package_kind, 1, 0))
216 }
217 _ => {
218 let num_eah_requests = requests
219 .iter()
220 .filter(|(_, account_package_kind)| {
221 *account_package_kind == AccountsPackageKind::EpochAccountsHash
222 })
223 .count();
224 assert!(
225 num_eah_requests <= 1,
226 "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
227 );
228
229 let (_, y, z) =
233 requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
234 assert_eq!(z.len(), 1);
235 let z = z.first().unwrap();
236 let y: &_ = y; let (snapshot_request, accounts_package_kind) = if z.1
248 == AccountsPackageKind::EpochAccountsHash
249 && y.1 == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
250 && y.0.snapshot_root_bank.slot() < z.0.snapshot_root_bank.slot()
251 {
252 let z = requests.pop().unwrap();
254 let y = requests.pop().unwrap();
255 requests.push(z);
256 y
257 } else {
258 requests.pop().unwrap()
260 };
261
262 let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
263 let num_re_enqueued_requests = requests
265 .into_iter()
266 .filter(|(snapshot_request, _)| {
267 snapshot_request.snapshot_root_bank.slot() > handled_request_slot
268 })
269 .map(|(snapshot_request, _)| {
270 self.snapshot_request_sender
271 .try_send(snapshot_request)
272 .expect("re-enqueue snapshot request");
273 })
274 .count();
275
276 Some((
277 snapshot_request,
278 accounts_package_kind,
279 requests_len,
280 num_re_enqueued_requests,
281 ))
282 }
283 }
284 }
285
286 fn handle_snapshot_request(
287 &self,
288 test_hash_calculation: bool,
289 non_snapshot_time_us: u128,
290 snapshot_request: SnapshotRequest,
291 accounts_package_kind: AccountsPackageKind,
292 exit: &AtomicBool,
293 ) -> Result<u64, SnapshotError> {
294 info!("handling snapshot request: {snapshot_request:?}, {accounts_package_kind:?}");
295 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
296 let SnapshotRequest {
297 snapshot_root_bank,
298 status_cache_slot_deltas,
299 request_kind,
300 enqueued: _,
301 } = snapshot_request;
302
303 assert!(snapshot_root_bank.is_startup_verification_complete());
305
306 if accounts_package_kind == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot) {
307 snapshot_root_bank
311 .rc
312 .accounts
313 .accounts_db
314 .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
315 }
316
317 let previous_accounts_hash = test_hash_calculation.then(|| {
318 snapshot_root_bank.update_accounts_hash(
321 CalcAccountsHashDataSource::IndexForTests,
322 false,
323 false,
324 )
325 });
326
327 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
328 snapshot_root_bank.force_flush_accounts_cache();
333 assert!(
337 snapshot_root_bank.slot()
338 <= snapshot_root_bank
339 .rc
340 .accounts
341 .accounts_db
342 .accounts_cache
343 .fetch_max_flush_root()
344 );
345 flush_accounts_cache_time.stop();
346
347 let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
348 let (this_accounts_hash, capitalization) = snapshot_root_bank
349 .accounts()
350 .accounts_db
351 .calculate_accounts_hash_from(
352 CalcAccountsHashDataSource::Storages,
353 snapshot_root_bank.slot(),
354 &CalcAccountsHashConfig {
355 use_bg_thread_pool: true,
356 ancestors: None,
357 epoch_schedule: snapshot_root_bank.epoch_schedule(),
358 rent_collector: snapshot_root_bank.rent_collector(),
359 store_detailed_debug_info_on_failure: false,
360 },
361 );
362 assert_eq!(previous_accounts_hash, this_accounts_hash);
363 assert_eq!(capitalization, snapshot_root_bank.capitalization());
364 this_accounts_hash
365 });
366
367 let mut clean_time = Measure::start("clean_time");
368 snapshot_root_bank.clean_accounts();
369 clean_time.stop();
370
371 let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
372
373 let mut shrink_time = Measure::start("shrink_time");
374 snapshot_root_bank.shrink_candidate_slots();
375 shrink_time.stop();
376
377 let mut snapshot_time = Measure::start("snapshot_time");
379 let snapshot_storages = snapshot_bank_utils::get_snapshot_storages(&snapshot_root_bank);
380 let accounts_package = match request_kind {
381 SnapshotRequestKind::Snapshot => match &accounts_package_kind {
382 AccountsPackageKind::Snapshot(_) => {
383 AccountsPackage::new_for_snapshot(
384 accounts_package_kind,
385 &snapshot_root_bank,
386 snapshot_storages,
387 status_cache_slot_deltas,
388 accounts_hash_for_testing,
389 )
390 }
391 AccountsPackageKind::AccountsHashVerifier => {
392 AccountsPackage::new_for_accounts_hash_verifier(
393 accounts_package_kind,
394 &snapshot_root_bank,
395 snapshot_storages,
396 accounts_hash_for_testing,
397 )
398 }
399 AccountsPackageKind::EpochAccountsHash => panic!("Illegal account package type: EpochAccountsHash packages must be from an EpochAccountsHash request!"),
400 },
401 SnapshotRequestKind::EpochAccountsHash => {
402 AccountsPackage::new_for_epoch_accounts_hash(
403 accounts_package_kind,
404 &snapshot_root_bank,
405 snapshot_storages,
406 accounts_hash_for_testing,
407 )
408 }
409 };
410 let send_result = self.accounts_package_sender.send(accounts_package);
411 if let Err(err) = send_result {
412 let accounts_package = &err.0;
414 assert!(
415 exit.load(Ordering::Relaxed),
416 "Failed to send accounts package: {err}, {accounts_package:?}"
417 );
418 }
419 snapshot_time.stop();
420 info!(
421 "Handled snapshot request. accounts package kind: {:?}, slot: {}, bank hash: {}",
422 accounts_package_kind,
423 snapshot_root_bank.slot(),
424 snapshot_root_bank.hash(),
425 );
426
427 total_time.stop();
428
429 datapoint_info!(
430 "handle_snapshot_requests-timing",
431 (
432 "flush_accounts_cache_time",
433 flush_accounts_cache_time.as_us(),
434 i64
435 ),
436 ("shrink_time", shrink_time.as_us(), i64),
437 ("clean_time", clean_time.as_us(), i64),
438 ("snapshot_time", snapshot_time.as_us(), i64),
439 ("total_us", total_time.as_us(), i64),
440 ("non_snapshot_time_us", non_snapshot_time_us, i64),
441 ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
442 );
443 Ok(snapshot_root_bank.block_height())
444 }
445}
446
447#[derive(Default, Clone)]
448pub struct AbsRequestSender {
449 snapshot_request_sender: Option<SnapshotRequestSender>,
450}
451
452impl AbsRequestSender {
453 pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
454 Self {
455 snapshot_request_sender: Some(snapshot_request_sender),
456 }
457 }
458
459 pub fn is_snapshot_creation_enabled(&self) -> bool {
460 self.snapshot_request_sender.is_some()
461 }
462
463 pub fn send_snapshot_request(
464 &self,
465 snapshot_request: SnapshotRequest,
466 ) -> Result<(), SendError<SnapshotRequest>> {
467 if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
468 snapshot_request_sender.send(snapshot_request)
469 } else {
470 Ok(())
471 }
472 }
473}
474
475#[derive(Debug)]
476pub struct PrunedBanksRequestHandler {
477 pub pruned_banks_receiver: DroppedSlotsReceiver,
478}
479
480impl PrunedBanksRequestHandler {
481 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
482 fn handle_request(&self, bank: &Bank) -> usize {
483 let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
484 banks_to_purge.sort_by_key(|(slot, _id)| *slot);
487 let num_banks_to_purge = banks_to_purge.len();
488
489 let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
491
492 let num_banks_with_same_slot =
495 num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
496 if num_banks_with_same_slot > 0 {
497 datapoint_info!(
498 "pruned_banks_request_handler",
499 ("num_pruned_banks", num_banks_to_purge, i64),
500 ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
501 );
502 }
503
504 let accounts_db = bank.rc.accounts.accounts_db.as_ref();
507 accounts_db.thread_pool_clean.install(|| {
508 grouped_banks_to_purge.into_par_iter().for_each(|group| {
509 group.iter().for_each(|(slot, bank_id)| {
510 accounts_db.purge_slot(*slot, *bank_id, true);
511 })
512 });
513 });
514
515 num_banks_to_purge
516 }
517
518 fn remove_dead_slots(
519 &self,
520 bank: &Bank,
521 removed_slots_count: &mut usize,
522 total_remove_slots_time: &mut u64,
523 ) {
524 let mut remove_slots_time = Measure::start("remove_slots_time");
525 *removed_slots_count += self.handle_request(bank);
526 remove_slots_time.stop();
527 *total_remove_slots_time += remove_slots_time.as_us();
528
529 if *removed_slots_count >= 100 {
530 datapoint_info!(
531 "remove_slots_timing",
532 ("remove_slots_time", *total_remove_slots_time, i64),
533 ("removed_slots_count", *removed_slots_count, i64),
534 );
535 *total_remove_slots_time = 0;
536 *removed_slots_count = 0;
537 }
538 }
539}
540
541pub struct AbsRequestHandlers {
542 pub snapshot_request_handler: SnapshotRequestHandler,
543 pub pruned_banks_request_handler: PrunedBanksRequestHandler,
544}
545
546impl AbsRequestHandlers {
547 #[allow(clippy::type_complexity)]
549 pub fn handle_snapshot_requests(
550 &self,
551 test_hash_calculation: bool,
552 non_snapshot_time_us: u128,
553 exit: &AtomicBool,
554 ) -> Option<Result<u64, SnapshotError>> {
555 self.snapshot_request_handler.handle_snapshot_requests(
556 test_hash_calculation,
557 non_snapshot_time_us,
558 exit,
559 )
560 }
561}
562
563pub struct AccountsBackgroundService {
564 t_background: JoinHandle<()>,
565}
566
567impl AccountsBackgroundService {
568 pub fn new(
569 bank_forks: Arc<RwLock<BankForks>>,
570 exit: Arc<AtomicBool>,
571 request_handlers: AbsRequestHandlers,
572 test_hash_calculation: bool,
573 ) -> Self {
574 let mut last_cleaned_block_height = 0;
575 let mut removed_slots_count = 0;
576 let mut total_remove_slots_time = 0;
577 let t_background = Builder::new()
578 .name("solBgAccounts".to_string())
579 .spawn(move || {
580 info!("AccountsBackgroundService has started");
581 let mut stats = StatsManager::new();
582 let mut last_snapshot_end_time = None;
583
584 loop {
585 if exit.load(Ordering::Relaxed) {
586 break;
587 }
588 let start_time = Instant::now();
589
590 let bank = bank_forks.read().unwrap().root_bank();
592
593 request_handlers
595 .pruned_banks_request_handler
596 .remove_dead_slots(
597 &bank,
598 &mut removed_slots_count,
599 &mut total_remove_slots_time,
600 );
601
602 let non_snapshot_time = last_snapshot_end_time
603 .map(|last_snapshot_end_time: Instant| {
604 last_snapshot_end_time.elapsed().as_micros()
605 })
606 .unwrap_or_default();
607
608 let snapshot_handle_result = bank
631 .is_startup_verification_complete()
632 .then(|| {
633 request_handlers.handle_snapshot_requests(
634 test_hash_calculation,
635 non_snapshot_time,
636 &exit,
637 )
638 })
639 .flatten();
640 if snapshot_handle_result.is_some() {
641 last_snapshot_end_time = Some(Instant::now());
642 }
643
644 bank.flush_accounts_cache_if_needed();
649
650 if let Some(snapshot_handle_result) = snapshot_handle_result {
651 match snapshot_handle_result {
654 Ok(snapshot_block_height) => {
655 assert!(last_cleaned_block_height <= snapshot_block_height);
656 last_cleaned_block_height = snapshot_block_height;
657 }
658 Err(err) => {
659 error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}");
660 exit.store(true, Ordering::Relaxed);
661 break;
662 }
663 }
664 } else {
665 if bank.block_height() - last_cleaned_block_height
666 > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10))
667 {
668 bank.force_flush_accounts_cache();
673 bank.clean_accounts();
674 last_cleaned_block_height = bank.block_height();
675 if bank.is_startup_verification_complete() {
677 bank.shrink_ancient_slots();
678 }
679 }
680 if bank.is_startup_verification_complete() {
688 bank.shrink_candidate_slots();
689 }
690 }
691 stats.record_and_maybe_submit(start_time.elapsed());
692 sleep(Duration::from_millis(INTERVAL_MS));
693 }
694 info!("AccountsBackgroundService has stopped");
695 })
696 .unwrap();
697
698 Self { t_background }
699 }
700
701 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
706 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
707
708 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
709 {
710 let root_bank = bank_forks.read().unwrap().root_bank();
711
712 root_bank
713 .rc
714 .accounts
715 .accounts_db
716 .enable_bank_drop_callback();
717 root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
718 pruned_banks_sender,
719 ))));
720 }
721 pruned_banks_receiver
722 }
723
724 pub fn join(self) -> thread::Result<()> {
725 self.t_background.join()
726 }
727}
728
729#[must_use]
731fn new_accounts_package_kind(
732 snapshot_request: &SnapshotRequest,
733 snapshot_config: &SnapshotConfig,
734) -> AccountsPackageKind {
735 let block_height = snapshot_request.snapshot_root_bank.block_height();
736 let latest_full_snapshot_slot = snapshot_request
737 .snapshot_root_bank
738 .rc
739 .accounts
740 .accounts_db
741 .latest_full_snapshot_slot();
742 match snapshot_request.request_kind {
743 SnapshotRequestKind::EpochAccountsHash => AccountsPackageKind::EpochAccountsHash,
744 SnapshotRequestKind::Snapshot => {
745 if snapshot_utils::should_take_full_snapshot(
746 block_height,
747 snapshot_config.full_snapshot_archive_interval_slots,
748 ) {
749 AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
750 } else if snapshot_utils::should_take_incremental_snapshot(
751 block_height,
752 snapshot_config.incremental_snapshot_archive_interval_slots,
753 latest_full_snapshot_slot,
754 ) {
755 AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(
756 latest_full_snapshot_slot.unwrap(),
757 ))
758 } else {
759 AccountsPackageKind::AccountsHashVerifier
760 }
761 }
762 }
763}
764
765#[must_use]
775fn cmp_requests_by_priority(
776 a: &(SnapshotRequest, AccountsPackageKind),
777 b: &(SnapshotRequest, AccountsPackageKind),
778) -> std::cmp::Ordering {
779 let (snapshot_request_a, accounts_package_kind_a) = a;
780 let (snapshot_request_b, accounts_package_kind_b) = b;
781 let slot_a = snapshot_request_a.snapshot_root_bank.slot();
782 let slot_b = snapshot_request_b.snapshot_root_bank.slot();
783 snapshot_package::cmp_accounts_package_kinds_by_priority(
784 accounts_package_kind_a,
785 accounts_package_kind_b,
786 )
787 .then(slot_a.cmp(&slot_b))
788}
789
790#[cfg(test)]
791mod test {
792 use {
793 super::*,
794 crate::{bank::epoch_accounts_hash_utils, genesis_utils::create_genesis_config},
795 crossbeam_channel::unbounded,
796 solana_accounts_db::epoch_accounts_hash::EpochAccountsHash,
797 solana_sdk::{
798 account::AccountSharedData, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
799 },
800 };
801
802 #[test]
803 fn test_accounts_background_service_remove_dead_slots() {
804 let genesis = create_genesis_config(10);
805 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
806 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
807 let pruned_banks_request_handler = PrunedBanksRequestHandler {
808 pruned_banks_receiver,
809 };
810
811 let account_key = Pubkey::new_unique();
813 bank0.store_account(
814 &account_key,
815 &AccountSharedData::new(264, 0, &Pubkey::default()),
816 );
817 assert!(bank0.get_account(&account_key).is_some());
818 pruned_banks_sender.send((0, 0)).unwrap();
819
820 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
821
822 pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
823
824 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
825 }
826
827 #[test]
836 fn test_get_next_snapshot_request() {
837 const SLOTS_PER_EPOCH: Slot = 400;
842 const FULL_SNAPSHOT_INTERVAL: Slot = 80;
843 const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
844
845 let snapshot_config = SnapshotConfig {
846 full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
847 incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
848 ..SnapshotConfig::default()
849 };
850
851 let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
852 let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
853 let snapshot_request_handler = SnapshotRequestHandler {
854 snapshot_config,
855 snapshot_request_sender: snapshot_request_sender.clone(),
856 snapshot_request_receiver,
857 accounts_package_sender,
858 };
859
860 let send_snapshot_request = |snapshot_root_bank, request_kind| {
861 let snapshot_request = SnapshotRequest {
862 snapshot_root_bank,
863 status_cache_slot_deltas: Vec::default(),
864 request_kind,
865 enqueued: Instant::now(),
866 };
867 snapshot_request_sender.send(snapshot_request).unwrap();
868 };
869
870 let mut genesis_config_info = create_genesis_config(10);
871 genesis_config_info.genesis_config.epoch_schedule =
872 EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
873 let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
874 bank.set_startup_verification_complete();
875 bank.rc
878 .accounts
879 .accounts_db
880 .epoch_accounts_hash_manager
881 .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
882
883 let bank0 = bank.clone();
887 fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
888 bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
889 }
890 fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
891 bank.rc
892 .accounts
893 .accounts_db
894 .set_latest_full_snapshot_slot(slot);
895 }
896
897 let mut make_banks = |num_banks| {
920 for _ in 0..num_banks {
921 let slot = bank.slot() + 1;
922 bank = Arc::new(Bank::new_from_parent(
923 bank.clone(),
924 &Pubkey::new_unique(),
925 slot,
926 ));
927
928 if bank.slot() == epoch_accounts_hash_utils::calculation_start(&bank) {
931 send_snapshot_request(
932 Arc::clone(&bank),
933 SnapshotRequestKind::EpochAccountsHash,
934 );
935 } else {
936 send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::Snapshot);
937 }
938 }
939 };
940 make_banks(303);
941
942 assert_eq!(latest_full_snapshot_slot(&bank0), None,);
944 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
945 .get_next_snapshot_request()
946 .unwrap();
947 assert_eq!(
948 accounts_package_kind,
949 AccountsPackageKind::EpochAccountsHash
950 );
951 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
952
953 assert_eq!(latest_full_snapshot_slot(&bank0), None,);
956 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
957 .get_next_snapshot_request()
958 .unwrap();
959 assert_eq!(
960 accounts_package_kind,
961 AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
962 );
963 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
964 set_latest_full_snapshot_slot(&bank0, 240);
965
966 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
969 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
970 .get_next_snapshot_request()
971 .unwrap();
972 assert_eq!(
973 accounts_package_kind,
974 AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(240))
975 );
976 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
977
978 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
981 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
982 .get_next_snapshot_request()
983 .unwrap();
984 assert_eq!(
985 accounts_package_kind,
986 AccountsPackageKind::AccountsHashVerifier
987 );
988 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 303);
989
990 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
992 assert!(snapshot_request_handler
993 .get_next_snapshot_request()
994 .is_none());
995
996 make_banks(240);
1010
1011 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
1013 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1014 .get_next_snapshot_request()
1015 .unwrap();
1016 assert_eq!(
1017 accounts_package_kind,
1018 AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
1019 );
1020 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 480);
1021 set_latest_full_snapshot_slot(&bank0, 480);
1022
1023 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1025 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1026 .get_next_snapshot_request()
1027 .unwrap();
1028 assert_eq!(
1029 accounts_package_kind,
1030 AccountsPackageKind::EpochAccountsHash
1031 );
1032 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 500);
1033
1034 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1036 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1037 .get_next_snapshot_request()
1038 .unwrap();
1039 assert_eq!(
1040 accounts_package_kind,
1041 AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(480))
1042 );
1043 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 540);
1044
1045 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1047 let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1048 .get_next_snapshot_request()
1049 .unwrap();
1050 assert_eq!(
1051 accounts_package_kind,
1052 AccountsPackageKind::AccountsHashVerifier
1053 );
1054 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 543);
1055
1056 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1058 assert!(snapshot_request_handler
1059 .get_next_snapshot_request()
1060 .is_none());
1061 }
1062
1063 #[test]
1065 fn test_pruned_banks_request_handler_handle_request() {
1066 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
1067 let pruned_banks_request_handler = PrunedBanksRequestHandler {
1068 pruned_banks_receiver,
1069 };
1070 let genesis_config_info = create_genesis_config(10);
1071 let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
1072 bank.set_startup_verification_complete();
1073 bank.rc.accounts.accounts_db.enable_bank_drop_callback();
1074 bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
1075 pruned_banks_sender,
1076 ))));
1077
1078 let fork0_bank0 = Arc::new(bank);
1079 let fork0_bank1 = Arc::new(Bank::new_from_parent(
1080 fork0_bank0.clone(),
1081 &Pubkey::new_unique(),
1082 fork0_bank0.slot() + 1,
1083 ));
1084 let fork1_bank1 = Arc::new(Bank::new_from_parent(
1085 fork0_bank0.clone(),
1086 &Pubkey::new_unique(),
1087 fork0_bank0.slot() + 1,
1088 ));
1089 let fork2_bank1 = Arc::new(Bank::new_from_parent(
1090 fork0_bank0.clone(),
1091 &Pubkey::new_unique(),
1092 fork0_bank0.slot() + 1,
1093 ));
1094 let fork0_bank2 = Arc::new(Bank::new_from_parent(
1095 fork0_bank1.clone(),
1096 &Pubkey::new_unique(),
1097 fork0_bank1.slot() + 1,
1098 ));
1099 let fork1_bank2 = Arc::new(Bank::new_from_parent(
1100 fork1_bank1.clone(),
1101 &Pubkey::new_unique(),
1102 fork1_bank1.slot() + 1,
1103 ));
1104 let fork0_bank3 = Arc::new(Bank::new_from_parent(
1105 fork0_bank2.clone(),
1106 &Pubkey::new_unique(),
1107 fork0_bank2.slot() + 1,
1108 ));
1109 let fork3_bank3 = Arc::new(Bank::new_from_parent(
1110 fork0_bank2.clone(),
1111 &Pubkey::new_unique(),
1112 fork0_bank2.slot() + 1,
1113 ));
1114 fork0_bank3.squash();
1115
1116 drop(fork3_bank3);
1117 drop(fork1_bank2);
1118 drop(fork0_bank2);
1119 drop(fork1_bank1);
1120 drop(fork2_bank1);
1121 drop(fork0_bank1);
1122 drop(fork0_bank0);
1123 let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
1124 assert_eq!(num_banks_purged, 7);
1125 }
1126}