solana_runtime/
accounts_index_storage.rs

1use {
2    crate::{
3        accounts_index::{AccountsIndexConfig, IndexValue},
4        bucket_map_holder::BucketMapHolder,
5        in_mem_accounts_index::InMemAccountsIndex,
6        waitable_condvar::WaitableCondvar,
7    },
8    std::{
9        fmt::Debug,
10        sync::{
11            atomic::{AtomicBool, Ordering},
12            Arc, Mutex,
13        },
14        thread::{Builder, JoinHandle},
15    },
16};
17
18/// Manages the lifetime of the background processing threads.
19pub struct AccountsIndexStorage<T: IndexValue> {
20    _bg_threads: BgThreads,
21
22    pub storage: Arc<BucketMapHolder<T>>,
23    pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
24
25    /// set_startup(true) creates bg threads which are kept alive until set_startup(false)
26    startup_worker_threads: Mutex<Option<BgThreads>>,
27}
28
29impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
30    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        Ok(())
32    }
33}
34
35/// low-level managing the bg threads
36struct BgThreads {
37    exit: Arc<AtomicBool>,
38    handles: Option<Vec<JoinHandle<()>>>,
39    wait: Arc<WaitableCondvar>,
40}
41
42impl Drop for BgThreads {
43    fn drop(&mut self) {
44        self.exit.store(true, Ordering::Relaxed);
45        self.wait.notify_all();
46        if let Some(handles) = self.handles.take() {
47            handles
48                .into_iter()
49                .for_each(|handle| handle.join().unwrap());
50        }
51    }
52}
53
54impl BgThreads {
55    fn new<T: IndexValue>(
56        storage: &Arc<BucketMapHolder<T>>,
57        in_mem: &[Arc<InMemAccountsIndex<T>>],
58        threads: usize,
59        can_advance_age: bool,
60    ) -> Self {
61        // stop signal used for THIS batch of bg threads
62        let exit = Arc::new(AtomicBool::default());
63        let handles = Some(
64            (0..threads)
65                .into_iter()
66                .map(|idx| {
67                    // the first thread we start is special
68                    let can_advance_age = can_advance_age && idx == 0;
69                    let storage_ = Arc::clone(storage);
70                    let exit_ = Arc::clone(&exit);
71                    let in_mem_ = in_mem.to_vec();
72
73                    // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock
74                    Builder::new()
75                        .name(format!("solIdxFlusher{:02}", idx))
76                        .spawn(move || {
77                            storage_.background(exit_, in_mem_, can_advance_age);
78                        })
79                        .unwrap()
80                })
81                .collect(),
82        );
83
84        BgThreads {
85            exit,
86            handles,
87            wait: Arc::clone(&storage.wait_dirty_or_aged),
88        }
89    }
90}
91
92/// modes the system can be in
93pub enum Startup {
94    /// not startup, but steady state execution
95    Normal,
96    /// startup (not steady state execution)
97    /// requesting 'startup'-like behavior where in-mem acct idx items are flushed asap
98    Startup,
99    /// startup (not steady state execution)
100    /// but also requesting additional threads to be running to flush the acct idx to disk asap
101    /// The idea is that the best perf to ssds will be with multiple threads,
102    ///  but during steady state, we can't allocate as many threads because we'd starve the rest of the system.
103    StartupWithExtraThreads,
104}
105
106impl<T: IndexValue> AccountsIndexStorage<T> {
107    /// startup=true causes:
108    ///      in mem to act in a way that flushes to disk asap
109    ///      also creates some additional bg threads to facilitate flushing to disk asap
110    /// startup=false is 'normal' operation
111    pub fn set_startup(&self, startup: Startup) {
112        let value = !matches!(startup, Startup::Normal);
113        if matches!(startup, Startup::StartupWithExtraThreads) {
114            // create some additional bg threads to help get things to the disk index asap
115            *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
116                &self.storage,
117                &self.in_mem,
118                Self::num_threads(),
119                false, // cannot advance age from any of these threads
120            ));
121        }
122        self.storage.set_startup(value);
123        if !value {
124            // transitioning from startup to !startup (ie. steady state)
125            // shutdown the bg threads
126            *self.startup_worker_threads.lock().unwrap() = None;
127            // maybe shrink hashmaps
128            self.shrink_to_fit();
129        }
130    }
131
132    /// estimate how many items are still needing to be flushed to the disk cache.
133    pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
134        self.storage
135            .disk
136            .as_ref()
137            .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
138            .unwrap_or_default()
139    }
140
141    fn shrink_to_fit(&self) {
142        self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
143    }
144
145    fn num_threads() -> usize {
146        std::cmp::max(2, num_cpus::get() / 4)
147    }
148
149    /// allocate BucketMapHolder and InMemAccountsIndex[]
150    pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
151        let threads = config
152            .as_ref()
153            .and_then(|config| config.flush_threads)
154            .unwrap_or_else(Self::num_threads);
155
156        let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
157
158        let in_mem = (0..bins)
159            .into_iter()
160            .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
161            .collect::<Vec<_>>();
162
163        Self {
164            _bg_threads: BgThreads::new(&storage, &in_mem, threads, true),
165            storage,
166            in_mem,
167            startup_worker_threads: Mutex::default(),
168        }
169    }
170}