solana_accounts_db/
accounts_index_storage.rs1use {
2 crate::{
3 accounts_index::{
4 self, in_mem_accounts_index::InMemAccountsIndex, AccountsIndexConfig, DiskIndexValue,
5 IndexValue,
6 },
7 bucket_map_holder::BucketMapHolder,
8 waitable_condvar::WaitableCondvar,
9 },
10 std::{
11 fmt::Debug,
12 num::NonZeroUsize,
13 sync::{
14 atomic::{AtomicBool, Ordering},
15 Arc, Mutex,
16 },
17 thread::{Builder, JoinHandle},
18 },
19};
20
21pub struct AccountsIndexStorage<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
23 _bg_threads: BgThreads,
24
25 pub storage: Arc<BucketMapHolder<T, U>>,
26 pub in_mem: Vec<Arc<InMemAccountsIndex<T, U>>>,
27 exit: Arc<AtomicBool>,
28
29 startup_worker_threads: Mutex<Option<BgThreads>>,
31}
32
33impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for AccountsIndexStorage<T, U> {
34 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 Ok(())
36 }
37}
38
39struct BgThreads {
41 exit: Arc<AtomicBool>,
42 handles: Option<Vec<JoinHandle<()>>>,
43 wait: Arc<WaitableCondvar>,
44}
45
46impl Drop for BgThreads {
47 fn drop(&mut self) {
48 self.exit.store(true, Ordering::Relaxed);
49 self.wait.notify_all();
50 if let Some(handles) = self.handles.take() {
51 handles
52 .into_iter()
53 .for_each(|handle| handle.join().unwrap());
54 }
55 }
56}
57
58impl BgThreads {
59 fn new<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>(
60 storage: &Arc<BucketMapHolder<T, U>>,
61 in_mem: &[Arc<InMemAccountsIndex<T, U>>],
62 threads: NonZeroUsize,
63 can_advance_age: bool,
64 exit: Arc<AtomicBool>,
65 ) -> Self {
66 let local_exit = Arc::new(AtomicBool::default());
68 let handles = Some(
69 (0..threads.get())
70 .map(|idx| {
71 let can_advance_age = can_advance_age && idx == 0;
73 let storage_ = Arc::clone(storage);
74 let local_exit = local_exit.clone();
75 let system_exit = exit.clone();
76 let in_mem_ = in_mem.to_vec();
77
78 Builder::new()
80 .name(format!("solIdxFlusher{idx:02}"))
81 .spawn(move || {
82 storage_.background(
83 vec![local_exit, system_exit],
84 in_mem_,
85 can_advance_age,
86 );
87 })
88 .unwrap()
89 })
90 .collect(),
91 );
92
93 BgThreads {
94 exit: local_exit,
95 handles,
96 wait: Arc::clone(&storage.wait_dirty_or_aged),
97 }
98 }
99}
100
101pub enum Startup {
103 Normal,
105 Startup,
108 StartupWithExtraThreads,
113}
114
115impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndexStorage<T, U> {
116 pub fn set_startup(&self, startup: Startup) {
121 let value = !matches!(startup, Startup::Normal);
122 if matches!(startup, Startup::StartupWithExtraThreads) {
123 *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
125 &self.storage,
126 &self.in_mem,
127 accounts_index::default_num_flush_threads(),
128 false, self.exit.clone(),
130 ));
131 }
132 self.storage.set_startup(value);
133 if !value {
134 *self.startup_worker_threads.lock().unwrap() = None;
137 self.shrink_to_fit();
139 }
140 }
141
142 pub fn get_startup_remaining_items_to_flush_estimate(&self) -> usize {
144 self.storage
145 .disk
146 .as_ref()
147 .map(|_| self.storage.stats.get_remaining_items_to_flush_estimate())
148 .unwrap_or_default()
149 }
150
151 fn shrink_to_fit(&self) {
152 self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
153 }
154
155 pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, exit: Arc<AtomicBool>) -> Self {
157 let num_flush_threads = config
158 .as_ref()
159 .and_then(|config| config.num_flush_threads)
160 .unwrap_or_else(accounts_index::default_num_flush_threads);
161
162 let storage = Arc::new(BucketMapHolder::new(bins, config, num_flush_threads.get()));
163
164 let in_mem = (0..bins)
165 .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin)))
166 .collect::<Vec<_>>();
167
168 Self {
169 _bg_threads: BgThreads::new(&storage, &in_mem, num_flush_threads, true, exit.clone()),
170 storage,
171 in_mem,
172 startup_worker_threads: Mutex::default(),
173 exit,
174 }
175 }
176}