solana_accounts_db/
accounts_index_storage.rs

1use {
2    crate::{
3        accounts_index::{
4            self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
5            IndexValue,
6        },
7        bucket_map_holder::BucketMapHolder,
8        waitable_condvar::WaitableCondvar,
9    },
10    std::{
11        fmt::Debug,
12        num::NonZeroUsize,
13        sync::{
14            atomic::{AtomicBool, Ordering},
15            Arc, Mutex,
16        },
17        thread::{Builder, JoinHandle},
18    },
19};
20
21/// Manages the lifetime of the background processing threads.
22pub struct AccountsIndexStorage<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
23    _bg_threads: BgThreads,
24
25    pub storage: Arc<BucketMapHolder<T, U>>,
26    pub in_mem: Vec<Arc<InMemAccountsIndex<T, U>>>,
27    exit: Arc<AtomicBool>,
28
29    /// set_startup(true) creates bg threads which are kept alive until set_startup(false)
30    startup_worker_threads: Mutex<Option<BgThreads>>,
31}
32
33impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for AccountsIndexStorage<T, U> {
34    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        Ok(())
36    }
37}
38
39/// low-level managing the bg threads
40struct BgThreads {
41    exit: Arc<AtomicBool>,
42    handles: Option<Vec<JoinHandle<()>>>,
43    wait: Arc<WaitableCondvar>,
44}
45
46impl Drop for BgThreads {
47    fn drop(&mut self) {
48        self.exit.store(true, Ordering::Relaxed);
49        self.wait.notify_all();
50        if let Some(handles) = self.handles.take() {
51            handles
52                .into_iter()
53                .for_each(|handle| handle.join().unwrap());
54        }
55    }
56}
57
58impl BgThreads {
59    fn new<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
60        storage: &Arc<BucketMapHolder<T, U>>,
61        in_mem: &[Arc<InMemAccountsIndex<T, U>>],
62        threads: NonZeroUsize,
63        can_advance_age: bool,
64        exit: Arc<AtomicBool>,
65    ) -> Self {
66        // stop signal used for THIS batch of bg threads
67        let local_exit = Arc::new(AtomicBool::default());
68        let handles = Some(
69            (0..threads.get())
70                .map(|idx| {
71                    // the first thread we start is special
72                    let can_advance_age = can_advance_age && idx == 0;
73                    let storage_ = Arc::clone(storage);
74                    let local_exit = local_exit.clone();
75                    let system_exit = exit.clone();
76                    let in_mem_ = in_mem.to_vec();
77
78                    // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
79                    Builder::new()
80                        .name(format!("solIdxFlusher{idx:02}"))
81                        .spawn(move || {
82                            storage_.background(
83                                vec![local_exit, system_exit],
84                                in_mem_,
85                                can_advance_age,
86                            );
87                        })
88                        .unwrap()
89                })
90                .collect(),
91        );
92
93        BgThreads {
94            exit: local_exit,
95            handles,
96            wait: Arc::clone(&storage.wait_dirty_or_aged),
97        }
98    }
99}
100
101/// modes the system can be in
102pub enum Startup {
103    /// not startup, but steady state execution
104    Normal,
105    /// startup (not steady state execution)
106    /// requesting 'startup'-like behavior where in-mem acct idx items are flushed asap
107    Startup,
108    /// startup (not steady state execution)
109    /// but also requesting additional threads to be running to flush the acct idx to disk asap
110    /// The idea is that the best perf to ssds will be with multiple threads,
111    ///  but during steady state, we can't allocate as many threads because we'd starve the rest of the system.
112    StartupWithExtraThreads,
113}
114
115impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<T, U> {
116    /// startup=true causes:
117    ///      in mem to act in a way that flushes to disk asap
118    ///      also creates some additional bg threads to facilitate flushing to disk asap
119    /// startup=false is 'normal' operation
120    pub fn set_startup(&self, startup: Startup) {
121        let value = !matches!(startup, Startup::Normal);
122        if matches!(startup, Startup::StartupWithExtraThreads) {
123            // create some additional bg threads to help get things to the disk index asap
124            *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
125                &self.storage,
126                &self.in_mem,
127                accounts_index::default_num_flush_threads(),
128                false, // cannot advance age from any of these threads
129                self.exit.clone(),
130            ));
131        }
132        self.storage.set_startup(value);
133        if !value {
134            // transitioning from startup to !startup (ie. steady state)
135            // shutdown the bg threads
136            *self.startup_worker_threads.lock().unwrap() = None;
137            // maybe shrink hashmaps
138            self.shrink_to_fit();
139        }
140    }
141
142    /// estimate how many items are still needing to be flushed to the disk cache.
143    pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
144        self.storage
145            .disk
146            .as_ref()
147            .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
148            .unwrap_or_default()
149    }
150
151    fn shrink_to_fit(&self) {
152        self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
153    }
154
155    /// allocate BucketMapHolder and InMemAccountsIndex[]
156    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: Arc<AtomicBool>) -> Self {
157        let num_flush_threads = config
158            .as_ref()
159            .and_then(|config| config.num_flush_threads)
160            .unwrap_or_else(accounts_index::default_num_flush_threads);
161
162        let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get()));
163
164        let in_mem = (0..bins)
165            .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
166            .collect::<Vec<_>>();
167
168        Self {
169            _bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()),
170            storage,
171            in_mem,
172            startup_worker_threads: Mutex::default(),
173            exit,
174        }
175    }
176}