solana_runtime/
accounts_index_storage.rs1use {
2 crate::{
3 accounts_index::{AccountsIndexConfig, IndexValue},
4 bucket_map_holder::BucketMapHolder,
5 in_mem_accounts_index::InMemAccountsIndex,
6 waitable_condvar::WaitableCondvar,
7 },
8 std::{
9 fmt::Debug,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread::{Builder, JoinHandle},
15 },
16};
17
18pub struct AccountsIndexStorage<T: IndexValue> {
20 _bg_threads: BgThreads,
21
22 pub storage: Arc<BucketMapHolder<T>>,
23 pub in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
24
25 startup_worker_threads: Mutex<Option<BgThreads>>,
27}
28
29impl<T: IndexValue> Debug for AccountsIndexStorage<T> {
30 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 Ok(())
32 }
33}
34
35struct BgThreads {
37 exit: Arc<AtomicBool>,
38 handles: Option<Vec<JoinHandle<()>>>,
39 wait: Arc<WaitableCondvar>,
40}
41
42impl Drop for BgThreads {
43 fn drop(&mut self) {
44 self.exit.store(true, Ordering::Relaxed);
45 self.wait.notify_all();
46 if let Some(handles) = self.handles.take() {
47 handles
48 .into_iter()
49 .for_each(|handle| handle.join().unwrap());
50 }
51 }
52}
53
54impl BgThreads {
55 fn new<T: IndexValue>(
56 storage: &Arc<BucketMapHolder<T>>,
57 in_mem: &[Arc<InMemAccountsIndex<T>>],
58 threads: usize,
59 can_advance_age: bool,
60 ) -> Self {
61 let exit = Arc::new(AtomicBool::default());
63 let handles = Some(
64 (0..threads)
65 .into_iter()
66 .map(|idx| {
67 let can_advance_age = can_advance_age && idx == 0;
69 let storage_ = Arc::clone(storage);
70 let exit_ = Arc::clone(&exit);
71 let in_mem_ = in_mem.to_vec();
72
73 Builder::new()
75 .name(format!("solIdxFlusher{:02}", idx))
76 .spawn(move || {
77 storage_.background(exit_, in_mem_, can_advance_age);
78 })
79 .unwrap()
80 })
81 .collect(),
82 );
83
84 BgThreads {
85 exit,
86 handles,
87 wait: Arc::clone(&storage.wait_dirty_or_aged),
88 }
89 }
90}
91
92pub enum Startup {
94 Normal,
96 Startup,
99 StartupWithExtraThreads,
104}
105
106impl<T: IndexValue> AccountsIndexStorage<T> {
107 pub fn set_startup(&self, startup: Startup) {
112 let value = !matches!(startup, Startup::Normal);
113 if matches!(startup, Startup::StartupWithExtraThreads) {
114 *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
116 &self.storage,
117 &self.in_mem,
118 Self::num_threads(),
119 false, ));
121 }
122 self.storage.set_startup(value);
123 if !value {
124 *self.startup_worker_threads.lock().unwrap() = None;
127 self.shrink_to_fit();
129 }
130 }
131
132 pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
134 self.storage
135 .disk
136 .as_ref()
137 .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
138 .unwrap_or_default()
139 }
140
141 fn shrink_to_fit(&self) {
142 self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
143 }
144
145 fn num_threads() -> usize {
146 std::cmp::max(2, num_cpus::get() / 4)
147 }
148
149 pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
151 let threads = config
152 .as_ref()
153 .and_then(|config| config.flush_threads)
154 .unwrap_or_else(Self::num_threads);
155
156 let storage = Arc::new(BucketMapHolder::new(bins, config, threads));
157
158 let in_mem = (0..bins)
159 .into_iter()
160 .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
161 .collect::<Vec<_>>();
162
163 Self {
164 _bg_threads: BgThreads::new(&storage, &in_mem, threads, true),
165 storage,
166 in_mem,
167 startup_worker_threads: Mutex::default(),
168 }
169 }
170}