solana_runtime/
accounts_background_service.rs

1//! Service to clean up dead slots in accounts_db
2//!
3//! This can be expensive since we have to walk the append vecs being cleaned up.
4
5mod stats;
6#[cfg(feature = "dev-context-only-utils")]
7use qualifier_attr::qualifiers;
8use {
9    crate::{
10        bank::{Bank, BankSlotDelta, DropCallback},
11        bank_forks::BankForks,
12        snapshot_bank_utils,
13        snapshot_config::SnapshotConfig,
14        snapshot_package::{self, AccountsPackage, AccountsPackageKind, SnapshotKind},
15        snapshot_utils::{self, SnapshotError},
16    },
17    crossbeam_channel::{Receiver, SendError, Sender},
18    log::*,
19    rand::{thread_rng, Rng},
20    rayon::iter::{IntoParallelIterator, ParallelIterator},
21    solana_accounts_db::{
22        accounts_db::CalcAccountsHashDataSource, accounts_hash::CalcAccountsHashConfig,
23    },
24    solana_measure::{measure::Measure, measure_us},
25    solana_sdk::clock::{BankId, Slot},
26    stats::StatsManager,
27    std::{
28        boxed::Box,
29        fmt::{Debug, Formatter},
30        sync::{
31            atomic::{AtomicBool, AtomicU64, Ordering},
32            Arc, RwLock,
33        },
34        thread::{self, sleep, Builder, JoinHandle},
35        time::{Duration, Instant},
36    },
37};
38
39const INTERVAL_MS: u64 = 100;
40const CLEAN_INTERVAL_BLOCKS: u64 = 100;
41
42pub type SnapshotRequestSender = Sender<SnapshotRequest>;
43pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
44pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
45pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
46
47/// interval to report bank_drop queue events: 60s
48const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
49/// maximum drop bank signal queue length
50const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
51
52#[derive(Debug, Default)]
53struct PrunedBankQueueLenReporter {
54    last_report_time: AtomicU64,
55}
56
57impl PrunedBankQueueLenReporter {
58    fn report(&self, q_len: usize) {
59        let now = solana_sdk::timing::timestamp();
60        let last_report_time = self.last_report_time.load(Ordering::Acquire);
61        if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
62            && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
63        {
64            datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
65            self.last_report_time.store(now, Ordering::Release);
66        }
67    }
68}
69
70lazy_static! {
71    static ref BANK_DROP_QUEUE_REPORTER: PrunedBankQueueLenReporter =
72        PrunedBankQueueLenReporter::default();
73}
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77    sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81    fn callback(&self, bank: &Bank) {
82        BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83        if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84            info!("bank DropCallback signal queue disconnected.");
85        }
86    }
87
88    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89        Box::new(self.clone())
90    }
91}
92
93impl Debug for SendDroppedBankCallback {
94    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
95        write!(f, "SendDroppedBankCallback({self:p})")
96    }
97}
98
99impl SendDroppedBankCallback {
100    pub fn new(sender: DroppedSlotsSender) -> Self {
101        Self { sender }
102    }
103}
104
105pub struct SnapshotRequest {
106    pub snapshot_root_bank: Arc<Bank>,
107    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108    pub request_kind: SnapshotRequestKind,
109
110    /// The instant this request was send to the queue.
111    /// Used to track how long requests wait before processing.
112    pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct("SnapshotRequest")
118            .field("request kind", &self.request_kind)
119            .field("bank slot", &self.snapshot_root_bank.slot())
120            .field("block height", &self.snapshot_root_bank.block_height())
121            .finish_non_exhaustive()
122    }
123}
124
125/// What kind of request is this?
126///
127/// The snapshot request has been expanded to support more than just snapshots.  This is
128/// confusing, but can be resolved by renaming this type; or better, by creating an enum with
129/// variants that wrap the fields-of-interest for each request.
130#[derive(Debug, Copy, Clone, Eq, PartialEq)]
131pub enum SnapshotRequestKind {
132    Snapshot,
133    EpochAccountsHash,
134}
135
136pub struct SnapshotRequestHandler {
137    pub snapshot_config: SnapshotConfig,
138    pub snapshot_request_sender: SnapshotRequestSender,
139    pub snapshot_request_receiver: SnapshotRequestReceiver,
140    pub accounts_package_sender: Sender<AccountsPackage>,
141}
142
143impl SnapshotRequestHandler {
144    // Returns the latest requested snapshot block height and storages
145    #[allow(clippy::type_complexity)]
146    pub fn handle_snapshot_requests(
147        &self,
148        test_hash_calculation: bool,
149        non_snapshot_time_us: u128,
150        exit: &AtomicBool,
151    ) -> Option<Result<u64, SnapshotError>> {
152        let (
153            snapshot_request,
154            accounts_package_kind,
155            num_outstanding_requests,
156            num_re_enqueued_requests,
157        ) = self.get_next_snapshot_request()?;
158
159        datapoint_info!(
160            "handle_snapshot_requests",
161            ("num_outstanding_requests", num_outstanding_requests, i64),
162            ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
163            (
164                "enqueued_time_us",
165                snapshot_request.enqueued.elapsed().as_micros(),
166                i64
167            ),
168        );
169
170        Some(self.handle_snapshot_request(
171            test_hash_calculation,
172            non_snapshot_time_us,
173            snapshot_request,
174            accounts_package_kind,
175            exit,
176        ))
177    }
178
179    /// Get the next snapshot request to handle
180    ///
181    /// Look through the snapshot request channel to find the highest priority one to handle next.
182    /// If there are no snapshot requests in the channel, return None.  Otherwise return the
183    /// highest priority one.  Unhandled snapshot requests with slots GREATER-THAN the handled one
184    /// will be re-enqueued.  The remaining will be dropped.
185    ///
186    /// Also return the number of snapshot requests initially in the channel, and the number of
187    /// ones re-enqueued.
188    fn get_next_snapshot_request(
189        &self,
190    ) -> Option<(
191        SnapshotRequest,
192        AccountsPackageKind,
193        /*num outstanding snapshot requests*/ usize,
194        /*num re-enqueued snapshot requests*/ usize,
195    )> {
196        let mut requests: Vec<_> = self
197            .snapshot_request_receiver
198            .try_iter()
199            .map(|request| {
200                let accounts_package_kind =
201                    new_accounts_package_kind(&request, &self.snapshot_config);
202                (request, accounts_package_kind)
203            })
204            .collect();
205        let requests_len = requests.len();
206        debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
207
208        // NOTE: This code to select the next request is mirrored in AccountsHashVerifier.
209        // Please ensure they stay in sync.
210        match requests_len {
211            0 => None,
212            1 => {
213                // SAFETY: We know the len is 1, so `pop` will return `Some`
214                let (snapshot_request, accounts_package_kind) = requests.pop().unwrap();
215                Some((snapshot_request, accounts_package_kind, 1, 0))
216            }
217            _ => {
218                let num_eah_requests = requests
219                    .iter()
220                    .filter(|(_, account_package_kind)| {
221                        *account_package_kind == AccountsPackageKind::EpochAccountsHash
222                    })
223                    .count();
224                assert!(
225                    num_eah_requests <= 1,
226                    "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
227                );
228
229                // Get the two highest priority requests, `y` and `z`.
230                // By asking for the second-to-last element to be in its final sorted position, we
231                // also ensure that the last element is also sorted.
232                let (_, y, z) =
233                    requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
234                assert_eq!(z.len(), 1);
235                let z = z.first().unwrap();
236                let y: &_ = y; // reborrow to remove `mut`
237
238                // If the highest priority request (`z`) is EpochAccountsHash, we need to check if
239                // there's a FullSnapshot request with a lower slot in `y` that is about to be
240                // dropped.  We do not want to drop a FullSnapshot request in this case because it
241                // will cause subsequent IncrementalSnapshot requests to fail.
242                //
243                // So, if `z` is an EpochAccountsHash request, check `y`.  We know there can only
244                // be at most one EpochAccountsHash request, so `y` is the only other request we
245                // need to check.  If `y` is a FullSnapshot request *with a lower slot* than `z`,
246                // then handle `y` first.
247                let (snapshot_request, accounts_package_kind) = if z.1
248                    == AccountsPackageKind::EpochAccountsHash
249                    && y.1 == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
250                    && y.0.snapshot_root_bank.slot() < z.0.snapshot_root_bank.slot()
251                {
252                    // SAFETY: We know the len is > 1, so both `pop`s will return `Some`
253                    let z = requests.pop().unwrap();
254                    let y = requests.pop().unwrap();
255                    requests.push(z);
256                    y
257                } else {
258                    // SAFETY: We know the len is > 1, so `pop` will return `Some`
259                    requests.pop().unwrap()
260                };
261
262                let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
263                // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
264                let num_re_enqueued_requests = requests
265                    .into_iter()
266                    .filter(|(snapshot_request, _)| {
267                        snapshot_request.snapshot_root_bank.slot() > handled_request_slot
268                    })
269                    .map(|(snapshot_request, _)| {
270                        self.snapshot_request_sender
271                            .try_send(snapshot_request)
272                            .expect("re-enqueue snapshot request");
273                    })
274                    .count();
275
276                Some((
277                    snapshot_request,
278                    accounts_package_kind,
279                    requests_len,
280                    num_re_enqueued_requests,
281                ))
282            }
283        }
284    }
285
286    fn handle_snapshot_request(
287        &self,
288        test_hash_calculation: bool,
289        non_snapshot_time_us: u128,
290        snapshot_request: SnapshotRequest,
291        accounts_package_kind: AccountsPackageKind,
292        exit: &AtomicBool,
293    ) -> Result<u64, SnapshotError> {
294        info!("handling snapshot request: {snapshot_request:?}, {accounts_package_kind:?}");
295        let mut total_time = Measure::start("snapshot_request_receiver_total_time");
296        let SnapshotRequest {
297            snapshot_root_bank,
298            status_cache_slot_deltas,
299            request_kind,
300            enqueued: _,
301        } = snapshot_request;
302
303        // we should not rely on the state of this validator until startup verification is complete
304        assert!(snapshot_root_bank.is_startup_verification_complete());
305
306        if accounts_package_kind == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot) {
307            // The latest full snapshot slot is what accounts-db uses to properly handle zero lamport
308            // accounts.  We are handling a full snapshot request here, and since taking a snapshot
309            // is not allowed to fail, we can update accounts-db now.
310            snapshot_root_bank
311                .rc
312                .accounts
313                .accounts_db
314                .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
315        }
316
317        let previous_accounts_hash = test_hash_calculation.then(|| {
318            // We have to use the index version here.
319            // We cannot calculate the non-index way because cache has not been flushed and stores don't match reality.
320            snapshot_root_bank.update_accounts_hash(
321                CalcAccountsHashDataSource::IndexForTests,
322                false,
323                false,
324            )
325        });
326
327        let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
328        // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
329        // That's because `snapshot_root_bank.slot()` must be root at this point,
330        // and contains relevant updates because each bank has at least 1 account update due
331        // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
332        snapshot_root_bank.force_flush_accounts_cache();
333        // Ensure all roots <= `self.slot()` have been flushed.
334        // Note `max_flush_root` could be larger than self.slot() if there are
335        // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
336        assert!(
337            snapshot_root_bank.slot()
338                <= snapshot_root_bank
339                    .rc
340                    .accounts
341                    .accounts_db
342                    .accounts_cache
343                    .fetch_max_flush_root()
344        );
345        flush_accounts_cache_time.stop();
346
347        let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
348            let (this_accounts_hash, capitalization) = snapshot_root_bank
349                .accounts()
350                .accounts_db
351                .calculate_accounts_hash_from(
352                    CalcAccountsHashDataSource::Storages,
353                    snapshot_root_bank.slot(),
354                    &CalcAccountsHashConfig {
355                        use_bg_thread_pool: true,
356                        ancestors: None,
357                        epoch_schedule: snapshot_root_bank.epoch_schedule(),
358                        rent_collector: snapshot_root_bank.rent_collector(),
359                        store_detailed_debug_info_on_failure: false,
360                    },
361                );
362            assert_eq!(previous_accounts_hash, this_accounts_hash);
363            assert_eq!(capitalization, snapshot_root_bank.capitalization());
364            this_accounts_hash
365        });
366
367        let mut clean_time = Measure::start("clean_time");
368        snapshot_root_bank.clean_accounts();
369        clean_time.stop();
370
371        let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
372
373        let mut shrink_time = Measure::start("shrink_time");
374        snapshot_root_bank.shrink_candidate_slots();
375        shrink_time.stop();
376
377        // Snapshot the bank and send over an accounts package
378        let mut snapshot_time = Measure::start("snapshot_time");
379        let snapshot_storages = snapshot_bank_utils::get_snapshot_storages(&snapshot_root_bank);
380        let accounts_package = match request_kind {
381            SnapshotRequestKind::Snapshot => match &accounts_package_kind {
382                AccountsPackageKind::Snapshot(_) => {
383                    AccountsPackage::new_for_snapshot(
384                        accounts_package_kind,
385                        &snapshot_root_bank,
386                        snapshot_storages,
387                        status_cache_slot_deltas,
388                        accounts_hash_for_testing,
389                    )
390                }
391                AccountsPackageKind::AccountsHashVerifier => {
392                    AccountsPackage::new_for_accounts_hash_verifier(
393                        accounts_package_kind,
394                        &snapshot_root_bank,
395                        snapshot_storages,
396                        accounts_hash_for_testing,
397                    )
398                }
399                AccountsPackageKind::EpochAccountsHash => panic!("Illegal account package type: EpochAccountsHash packages must be from an EpochAccountsHash request!"),
400            },
401            SnapshotRequestKind::EpochAccountsHash => {
402                AccountsPackage::new_for_epoch_accounts_hash(
403                    accounts_package_kind,
404                    &snapshot_root_bank,
405                    snapshot_storages,
406                    accounts_hash_for_testing,
407                )
408            }
409        };
410        let send_result = self.accounts_package_sender.send(accounts_package);
411        if let Err(err) = send_result {
412            // Sending the accounts package should never fail *unless* we're shutting down.
413            let accounts_package = &err.0;
414            assert!(
415                exit.load(Ordering::Relaxed),
416                "Failed to send accounts package: {err}, {accounts_package:?}"
417            );
418        }
419        snapshot_time.stop();
420        info!(
421            "Handled snapshot request. accounts package kind: {:?}, slot: {}, bank hash: {}",
422            accounts_package_kind,
423            snapshot_root_bank.slot(),
424            snapshot_root_bank.hash(),
425        );
426
427        total_time.stop();
428
429        datapoint_info!(
430            "handle_snapshot_requests-timing",
431            (
432                "flush_accounts_cache_time",
433                flush_accounts_cache_time.as_us(),
434                i64
435            ),
436            ("shrink_time", shrink_time.as_us(), i64),
437            ("clean_time", clean_time.as_us(), i64),
438            ("snapshot_time", snapshot_time.as_us(), i64),
439            ("total_us", total_time.as_us(), i64),
440            ("non_snapshot_time_us", non_snapshot_time_us, i64),
441            ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
442        );
443        Ok(snapshot_root_bank.block_height())
444    }
445}
446
447#[derive(Default, Clone)]
448pub struct AbsRequestSender {
449    snapshot_request_sender: Option<SnapshotRequestSender>,
450}
451
452impl AbsRequestSender {
453    pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
454        Self {
455            snapshot_request_sender: Some(snapshot_request_sender),
456        }
457    }
458
459    pub fn is_snapshot_creation_enabled(&self) -> bool {
460        self.snapshot_request_sender.is_some()
461    }
462
463    pub fn send_snapshot_request(
464        &self,
465        snapshot_request: SnapshotRequest,
466    ) -> Result<(), SendError<SnapshotRequest>> {
467        if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
468            snapshot_request_sender.send(snapshot_request)
469        } else {
470            Ok(())
471        }
472    }
473}
474
475#[derive(Debug)]
476pub struct PrunedBanksRequestHandler {
477    pub pruned_banks_receiver: DroppedSlotsReceiver,
478}
479
480impl PrunedBanksRequestHandler {
481    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
482    fn handle_request(&self, bank: &Bank) -> usize {
483        let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
484        // We need a stable sort to ensure we purge banks—with the same slot—in the same order
485        // they were sent into the channel.
486        banks_to_purge.sort_by_key(|(slot, _id)| *slot);
487        let num_banks_to_purge = banks_to_purge.len();
488
489        // Group the banks into slices with the same slot
490        let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
491
492        // Log whenever we need to handle banks with the same slot.  Purposely do this *before* we
493        // call `purge_slot()` to ensure we get the datapoint (in case there's an assert/panic).
494        let num_banks_with_same_slot =
495            num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
496        if num_banks_with_same_slot > 0 {
497            datapoint_info!(
498                "pruned_banks_request_handler",
499                ("num_pruned_banks", num_banks_to_purge, i64),
500                ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
501            );
502        }
503
504        // Purge all the slots in parallel
505        // Banks for the same slot are purged sequentially
506        let accounts_db = bank.rc.accounts.accounts_db.as_ref();
507        accounts_db.thread_pool_clean.install(|| {
508            grouped_banks_to_purge.into_par_iter().for_each(|group| {
509                group.iter().for_each(|(slot, bank_id)| {
510                    accounts_db.purge_slot(*slot, *bank_id, true);
511                })
512            });
513        });
514
515        num_banks_to_purge
516    }
517
518    fn remove_dead_slots(
519        &self,
520        bank: &Bank,
521        removed_slots_count: &mut usize,
522        total_remove_slots_time: &mut u64,
523    ) {
524        let mut remove_slots_time = Measure::start("remove_slots_time");
525        *removed_slots_count += self.handle_request(bank);
526        remove_slots_time.stop();
527        *total_remove_slots_time += remove_slots_time.as_us();
528
529        if *removed_slots_count >= 100 {
530            datapoint_info!(
531                "remove_slots_timing",
532                ("remove_slots_time", *total_remove_slots_time, i64),
533                ("removed_slots_count", *removed_slots_count, i64),
534            );
535            *total_remove_slots_time = 0;
536            *removed_slots_count = 0;
537        }
538    }
539}
540
541pub struct AbsRequestHandlers {
542    pub snapshot_request_handler: SnapshotRequestHandler,
543    pub pruned_banks_request_handler: PrunedBanksRequestHandler,
544}
545
546impl AbsRequestHandlers {
547    // Returns the latest requested snapshot block height, if one exists
548    #[allow(clippy::type_complexity)]
549    pub fn handle_snapshot_requests(
550        &self,
551        test_hash_calculation: bool,
552        non_snapshot_time_us: u128,
553        exit: &AtomicBool,
554    ) -> Option<Result<u64, SnapshotError>> {
555        self.snapshot_request_handler.handle_snapshot_requests(
556            test_hash_calculation,
557            non_snapshot_time_us,
558            exit,
559        )
560    }
561}
562
563pub struct AccountsBackgroundService {
564    t_background: JoinHandle<()>,
565}
566
567impl AccountsBackgroundService {
568    pub fn new(
569        bank_forks: Arc<RwLock<BankForks>>,
570        exit: Arc<AtomicBool>,
571        request_handlers: AbsRequestHandlers,
572        test_hash_calculation: bool,
573    ) -> Self {
574        let mut last_cleaned_block_height = 0;
575        let mut removed_slots_count = 0;
576        let mut total_remove_slots_time = 0;
577        let t_background = Builder::new()
578            .name("solBgAccounts".to_string())
579            .spawn(move || {
580                info!("AccountsBackgroundService has started");
581                let mut stats = StatsManager::new();
582                let mut last_snapshot_end_time = None;
583
584                loop {
585                    if exit.load(Ordering::Relaxed) {
586                        break;
587                    }
588                    let start_time = Instant::now();
589
590                    // Grab the current root bank
591                    let bank = bank_forks.read().unwrap().root_bank();
592
593                    // Purge accounts of any dead slots
594                    request_handlers
595                        .pruned_banks_request_handler
596                        .remove_dead_slots(
597                            &bank,
598                            &mut removed_slots_count,
599                            &mut total_remove_slots_time,
600                        );
601
602                    let non_snapshot_time = last_snapshot_end_time
603                        .map(|last_snapshot_end_time: Instant| {
604                            last_snapshot_end_time.elapsed().as_micros()
605                        })
606                        .unwrap_or_default();
607
608                    // Check to see if there were any requests for snapshotting banks
609                    // < the current root bank `bank` above.
610
611                    // Claim: Any snapshot request for slot `N` found here implies that the last cleanup
612                    // slot `M` satisfies `M < N`
613                    //
614                    // Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
615                    // but cleanup has already happened on some slot `M >= N`. Because the call to
616                    // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
617                    // then that means in some *previous* iteration of this loop, we must have gotten a root
618                    // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
619                    // snapshot request channel.
620                    //
621                    // However, this is impossible because BankForks.set_root() will always flush the snapshot
622                    // request for `N` to the snapshot request channel before setting a root `R > N`, and
623                    // snapshot_request_handler.handle_requests() will always look for the latest
624                    // available snapshot in the channel.
625                    //
626                    // NOTE: We must wait for startup verification to complete before handling
627                    // snapshot requests.  This is because startup verification and snapshot
628                    // request handling can both kick off accounts hash calculations in background
629                    // threads, and these must not happen concurrently.
630                    let snapshot_handle_result = bank
631                        .is_startup_verification_complete()
632                        .then(|| {
633                            request_handlers.handle_snapshot_requests(
634                                test_hash_calculation,
635                                non_snapshot_time,
636                                &exit,
637                            )
638                        })
639                        .flatten();
640                    if snapshot_handle_result.is_some() {
641                        last_snapshot_end_time = Some(Instant::now());
642                    }
643
644                    // Note that the flush will do an internal clean of the
645                    // cache up to bank.slot(), so should be safe as long
646                    // as any later snapshots that are taken are of
647                    // slots >= bank.slot()
648                    bank.flush_accounts_cache_if_needed();
649
650                    if let Some(snapshot_handle_result) = snapshot_handle_result {
651                        // Safe, see proof above
652
653                        match snapshot_handle_result {
654                            Ok(snapshot_block_height) => {
655                                assert!(last_cleaned_block_height <= snapshot_block_height);
656                                last_cleaned_block_height = snapshot_block_height;
657                            }
658                            Err(err) => {
659                                error!("Stopping AccountsBackgroundService! Fatal error while handling snapshot requests: {err}");
660                                exit.store(true, Ordering::Relaxed);
661                                break;
662                            }
663                        }
664                    } else {
665                        if bank.block_height() - last_cleaned_block_height
666                            > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0..10))
667                        {
668                            // Note that the flush will do an internal clean of the
669                            // cache up to bank.slot(), so should be safe as long
670                            // as any later snapshots that are taken are of
671                            // slots >= bank.slot()
672                            bank.force_flush_accounts_cache();
673                            bank.clean_accounts();
674                            last_cleaned_block_height = bank.block_height();
675                            // See justification below for why we skip 'shrink' here.
676                            if bank.is_startup_verification_complete() {
677                                bank.shrink_ancient_slots();
678                            }
679                        }
680                        // Do not 'shrink' until *after* the startup verification is complete.
681                        // This is because startup verification needs to get the snapshot
682                        // storages *as they existed at startup* (to calculate the accounts hash).
683                        // If 'shrink' were to run, then it is possible startup verification
684                        // (1) could race with 'shrink', and fail to assert that shrinking is not in
685                        // progress, or (2) could get snapshot storages that were newer than what
686                        // was in the snapshot itself.
687                        if bank.is_startup_verification_complete() {
688                            bank.shrink_candidate_slots();
689                        }
690                    }
691                    stats.record_and_maybe_submit(start_time.elapsed());
692                    sleep(Duration::from_millis(INTERVAL_MS));
693                }
694                info!("AccountsBackgroundService has stopped");
695            })
696            .unwrap();
697
698        Self { t_background }
699    }
700
701    /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
702    /// should only be one bank, the root bank, in `bank_forks`
703    /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
704    /// the bank drop callback.
705    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
706        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
707
708        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
709        {
710            let root_bank = bank_forks.read().unwrap().root_bank();
711
712            root_bank
713                .rc
714                .accounts
715                .accounts_db
716                .enable_bank_drop_callback();
717            root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
718                pruned_banks_sender,
719            ))));
720        }
721        pruned_banks_receiver
722    }
723
724    pub fn join(self) -> thread::Result<()> {
725        self.t_background.join()
726    }
727}
728
729/// Get the AccountsPackageKind from a given SnapshotRequest
730#[must_use]
731fn new_accounts_package_kind(
732    snapshot_request: &SnapshotRequest,
733    snapshot_config: &SnapshotConfig,
734) -> AccountsPackageKind {
735    let block_height = snapshot_request.snapshot_root_bank.block_height();
736    let latest_full_snapshot_slot = snapshot_request
737        .snapshot_root_bank
738        .rc
739        .accounts
740        .accounts_db
741        .latest_full_snapshot_slot();
742    match snapshot_request.request_kind {
743        SnapshotRequestKind::EpochAccountsHash => AccountsPackageKind::EpochAccountsHash,
744        SnapshotRequestKind::Snapshot => {
745            if snapshot_utils::should_take_full_snapshot(
746                block_height,
747                snapshot_config.full_snapshot_archive_interval_slots,
748            ) {
749                AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
750            } else if snapshot_utils::should_take_incremental_snapshot(
751                block_height,
752                snapshot_config.incremental_snapshot_archive_interval_slots,
753                latest_full_snapshot_slot,
754            ) {
755                AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(
756                    latest_full_snapshot_slot.unwrap(),
757                ))
758            } else {
759                AccountsPackageKind::AccountsHashVerifier
760            }
761        }
762    }
763}
764
765/// Compare snapshot requests; used to pick the highest priority request to handle.
766///
767/// Priority, from highest to lowest:
768/// - Epoch Accounts Hash
769/// - Full Snapshot
770/// - Incremental Snapshot
771/// - Accounts Hash Verifier
772///
773/// If two requests of the same kind are being compared, their bank slots are the tiebreaker.
774#[must_use]
775fn cmp_requests_by_priority(
776    a: &(SnapshotRequest, AccountsPackageKind),
777    b: &(SnapshotRequest, AccountsPackageKind),
778) -> std::cmp::Ordering {
779    let (snapshot_request_a, accounts_package_kind_a) = a;
780    let (snapshot_request_b, accounts_package_kind_b) = b;
781    let slot_a = snapshot_request_a.snapshot_root_bank.slot();
782    let slot_b = snapshot_request_b.snapshot_root_bank.slot();
783    snapshot_package::cmp_accounts_package_kinds_by_priority(
784        accounts_package_kind_a,
785        accounts_package_kind_b,
786    )
787    .then(slot_a.cmp(&slot_b))
788}
789
790#[cfg(test)]
791mod test {
792    use {
793        super::*,
794        crate::{bank::epoch_accounts_hash_utils, genesis_utils::create_genesis_config},
795        crossbeam_channel::unbounded,
796        solana_accounts_db::epoch_accounts_hash::EpochAccountsHash,
797        solana_sdk::{
798            account::AccountSharedData, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
799        },
800    };
801
802    #[test]
803    fn test_accounts_background_service_remove_dead_slots() {
804        let genesis = create_genesis_config(10);
805        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
806        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
807        let pruned_banks_request_handler = PrunedBanksRequestHandler {
808            pruned_banks_receiver,
809        };
810
811        // Store an account in slot 0
812        let account_key = Pubkey::new_unique();
813        bank0.store_account(
814            &account_key,
815            &AccountSharedData::new(264, 0, &Pubkey::default()),
816        );
817        assert!(bank0.get_account(&account_key).is_some());
818        pruned_banks_sender.send((0, 0)).unwrap();
819
820        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
821
822        pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
823
824        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
825    }
826
827    /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
828    ///
829    /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
830    /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
831    /// This is needed if, for example, an Epoch Accounts Hash for slot X and a Full Snapshot for
832    /// slot X+1 are both in the request channel.  The EAH needs to be handled first, but the full
833    /// snapshot should also be handled afterwards, since future incremental snapshots will depend
834    /// on it.
835    #[test]
836    fn test_get_next_snapshot_request() {
837        // These constants were picked to ensure the desired snapshot requests were sent to the
838        // channel.  With 400 slots per Epoch, the EAH start will be at slot 100.  Ensure there are
839        // other requests before this slot, and then 2+ requests of each type afterwards (to
840        // further test the prioritization logic).
841        const SLOTS_PER_EPOCH: Slot = 400;
842        const FULL_SNAPSHOT_INTERVAL: Slot = 80;
843        const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
844
845        let snapshot_config = SnapshotConfig {
846            full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
847            incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
848            ..SnapshotConfig::default()
849        };
850
851        let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
852        let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
853        let snapshot_request_handler = SnapshotRequestHandler {
854            snapshot_config,
855            snapshot_request_sender: snapshot_request_sender.clone(),
856            snapshot_request_receiver,
857            accounts_package_sender,
858        };
859
860        let send_snapshot_request = |snapshot_root_bank, request_kind| {
861            let snapshot_request = SnapshotRequest {
862                snapshot_root_bank,
863                status_cache_slot_deltas: Vec::default(),
864                request_kind,
865                enqueued: Instant::now(),
866            };
867            snapshot_request_sender.send(snapshot_request).unwrap();
868        };
869
870        let mut genesis_config_info = create_genesis_config(10);
871        genesis_config_info.genesis_config.epoch_schedule =
872            EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
873        let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
874        bank.set_startup_verification_complete();
875        // Need to set the EAH to Valid so that `Bank::new_from_parent()` doesn't panic during
876        // freeze when parent is in the EAH calculation window.
877        bank.rc
878            .accounts
879            .accounts_db
880            .epoch_accounts_hash_manager
881            .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
882
883        // We need to get and set accounts-db's latest full snapshot slot to test
884        // get_next_snapshot_request().  To workaround potential borrowing issues
885        // caused by make_banks() below, Arc::clone bank0 and add helper functions.
886        let bank0 = bank.clone();
887        fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
888            bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
889        }
890        fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
891            bank.rc
892                .accounts
893                .accounts_db
894                .set_latest_full_snapshot_slot(slot);
895        }
896
897        // Create new banks and send snapshot requests so that the following requests will be in
898        // the channel before handling the requests:
899        //
900        // fss  80
901        // iss  90
902        // eah 100 <-- handled 1st
903        // iss 120
904        // iss 150
905        // fss 160
906        // iss 180
907        // iss 210
908        // fss 240 <-- handled 2nd
909        // iss 270
910        // iss 300 <-- handled 3rd
911        // ahv 301
912        // ahv 302
913        // ahv 303 <-- handled 4th
914        //
915        // (slots not called out will all be AHV)
916        // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
917        // actually be AHV since the latest full snapshot slot will be `None`.  This is expected and
918        // fine; but maybe unexpected for a reader/debugger without this additional context.
919        let mut make_banks = |num_banks| {
920            for _ in 0..num_banks {
921                let slot = bank.slot() + 1;
922                bank = Arc::new(Bank::new_from_parent(
923                    bank.clone(),
924                    &Pubkey::new_unique(),
925                    slot,
926                ));
927
928                // Since we're not using `BankForks::set_root()`, we have to handle sending the
929                // correct snapshot requests ourself.
930                if bank.slot() == epoch_accounts_hash_utils::calculation_start(&bank) {
931                    send_snapshot_request(
932                        Arc::clone(&bank),
933                        SnapshotRequestKind::EpochAccountsHash,
934                    );
935                } else {
936                    send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::Snapshot);
937                }
938            }
939        };
940        make_banks(303);
941
942        // Ensure the EAH is handled 1st
943        assert_eq!(latest_full_snapshot_slot(&bank0), None,);
944        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
945            .get_next_snapshot_request()
946            .unwrap();
947        assert_eq!(
948            accounts_package_kind,
949            AccountsPackageKind::EpochAccountsHash
950        );
951        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
952
953        // Ensure the full snapshot from slot 240 is handled 2nd
954        // (the older full snapshots are skipped and dropped)
955        assert_eq!(latest_full_snapshot_slot(&bank0), None,);
956        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
957            .get_next_snapshot_request()
958            .unwrap();
959        assert_eq!(
960            accounts_package_kind,
961            AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
962        );
963        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
964        set_latest_full_snapshot_slot(&bank0, 240);
965
966        // Ensure the incremental snapshot from slot 300 is handled 3rd
967        // (the older incremental snapshots are skipped and dropped)
968        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
969        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
970            .get_next_snapshot_request()
971            .unwrap();
972        assert_eq!(
973            accounts_package_kind,
974            AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(240))
975        );
976        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
977
978        // Ensure the accounts hash verifier from slot 303 is handled 4th
979        // (the older accounts hash verifiers are skipped and dropped)
980        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
981        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
982            .get_next_snapshot_request()
983            .unwrap();
984        assert_eq!(
985            accounts_package_kind,
986            AccountsPackageKind::AccountsHashVerifier
987        );
988        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 303);
989
990        // And now ensure the snapshot request channel is empty!
991        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
992        assert!(snapshot_request_handler
993            .get_next_snapshot_request()
994            .is_none());
995
996        // Create more banks and send snapshot requests so that the following requests will be in
997        // the channel before handling the requests:
998        //
999        // fss 480 <-- handled 1st
1000        // eah 500 <-- handled 2nd
1001        // iss 510
1002        // iss 540 <-- handled 3rd
1003        // ahv 541
1004        // ahv 542
1005        // ahv 543 <-- handled 4th
1006        //
1007        // This test differs from the one above by having an older full snapshot request that must
1008        // be handled before the new epoch accounts hash request.
1009        make_banks(240);
1010
1011        // Ensure the full snapshot is handled 1st
1012        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240),);
1013        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1014            .get_next_snapshot_request()
1015            .unwrap();
1016        assert_eq!(
1017            accounts_package_kind,
1018            AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot)
1019        );
1020        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 480);
1021        set_latest_full_snapshot_slot(&bank0, 480);
1022
1023        // Ensure the EAH is handled 2nd
1024        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1025        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1026            .get_next_snapshot_request()
1027            .unwrap();
1028        assert_eq!(
1029            accounts_package_kind,
1030            AccountsPackageKind::EpochAccountsHash
1031        );
1032        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 500);
1033
1034        // Ensure the incremental snapshot is handled 3rd
1035        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1036        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1037            .get_next_snapshot_request()
1038            .unwrap();
1039        assert_eq!(
1040            accounts_package_kind,
1041            AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(480))
1042        );
1043        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 540);
1044
1045        // Ensure the accounts hash verifier is handled 4th
1046        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1047        let (snapshot_request, accounts_package_kind, ..) = snapshot_request_handler
1048            .get_next_snapshot_request()
1049            .unwrap();
1050        assert_eq!(
1051            accounts_package_kind,
1052            AccountsPackageKind::AccountsHashVerifier
1053        );
1054        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 543);
1055
1056        // And now ensure the snapshot request channel is empty!
1057        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480),);
1058        assert!(snapshot_request_handler
1059            .get_next_snapshot_request()
1060            .is_none());
1061    }
1062
1063    /// Ensure that we can prune banks with the same slot (if they were on different forks)
1064    #[test]
1065    fn test_pruned_banks_request_handler_handle_request() {
1066        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
1067        let pruned_banks_request_handler = PrunedBanksRequestHandler {
1068            pruned_banks_receiver,
1069        };
1070        let genesis_config_info = create_genesis_config(10);
1071        let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
1072        bank.set_startup_verification_complete();
1073        bank.rc.accounts.accounts_db.enable_bank_drop_callback();
1074        bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
1075            pruned_banks_sender,
1076        ))));
1077
1078        let fork0_bank0 = Arc::new(bank);
1079        let fork0_bank1 = Arc::new(Bank::new_from_parent(
1080            fork0_bank0.clone(),
1081            &Pubkey::new_unique(),
1082            fork0_bank0.slot() + 1,
1083        ));
1084        let fork1_bank1 = Arc::new(Bank::new_from_parent(
1085            fork0_bank0.clone(),
1086            &Pubkey::new_unique(),
1087            fork0_bank0.slot() + 1,
1088        ));
1089        let fork2_bank1 = Arc::new(Bank::new_from_parent(
1090            fork0_bank0.clone(),
1091            &Pubkey::new_unique(),
1092            fork0_bank0.slot() + 1,
1093        ));
1094        let fork0_bank2 = Arc::new(Bank::new_from_parent(
1095            fork0_bank1.clone(),
1096            &Pubkey::new_unique(),
1097            fork0_bank1.slot() + 1,
1098        ));
1099        let fork1_bank2 = Arc::new(Bank::new_from_parent(
1100            fork1_bank1.clone(),
1101            &Pubkey::new_unique(),
1102            fork1_bank1.slot() + 1,
1103        ));
1104        let fork0_bank3 = Arc::new(Bank::new_from_parent(
1105            fork0_bank2.clone(),
1106            &Pubkey::new_unique(),
1107            fork0_bank2.slot() + 1,
1108        ));
1109        let fork3_bank3 = Arc::new(Bank::new_from_parent(
1110            fork0_bank2.clone(),
1111            &Pubkey::new_unique(),
1112            fork0_bank2.slot() + 1,
1113        ));
1114        fork0_bank3.squash();
1115
1116        drop(fork3_bank3);
1117        drop(fork1_bank2);
1118        drop(fork0_bank2);
1119        drop(fork1_bank1);
1120        drop(fork2_bank1);
1121        drop(fork0_bank1);
1122        drop(fork0_bank0);
1123        let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
1124        assert_eq!(num_banks_purged, 7);
1125    }
1126}