use {
crate::{
accounts_index::{AccountsIndexConfig, DiskIndexValue, IndexLimitMb, IndexValue},
bucket_map_holder_stats::BucketMapHolderStats,
in_mem_accounts_index::{InMemAccountsIndex, StartupStats},
waitable_condvar::WaitableCondvar,
},
solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig},
solana_measure::measure::Measure,
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
timing::AtomicInterval,
},
std::{
fmt::Debug,
marker::PhantomData,
sync::{
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
Arc,
},
time::Duration,
},
};
pub type Age = u8;
pub type AtomicAge = AtomicU8;
const _: () = assert!(std::mem::size_of::<Age>() == std::mem::size_of::<AtomicAge>());
const AGE_MS: u64 = DEFAULT_MS_PER_SLOT; pub const DEFAULT_DISK_INDEX: Option<usize> = Some(10_000);
pub struct BucketMapHolder<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
pub disk: Option<BucketMap<(Slot, U)>>,
pub count_buckets_flushed: AtomicUsize,
pub age: AtomicAge,
pub future_age_to_flush: AtomicAge,
pub future_age_to_flush_cached: AtomicAge,
pub stats: BucketMapHolderStats,
age_timer: AtomicInterval,
pub wait_dirty_or_aged: Arc<WaitableCondvar>,
next_bucket_to_flush: AtomicUsize,
bins: usize,
pub threads: usize,
pub mem_budget_mb: Option<usize>,
pub ages_to_stay_in_cache: Age,
startup: AtomicBool,
_phantom: PhantomData<T>,
pub(crate) startup_stats: Arc<StartupStats>,
}
impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for BucketMapHolder<T, U> {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[allow(clippy::mutex_atomic)]
impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> BucketMapHolder<T, U> {
pub fn is_disk_index_enabled(&self) -> bool {
self.disk.is_some()
}
pub fn increment_age(&self) {
let previous = self.count_buckets_flushed.swap(0, Ordering::AcqRel);
self.age.fetch_add(1, Ordering::Release);
self.future_age_to_flush.fetch_add(1, Ordering::Release);
self.future_age_to_flush_cached
.fetch_add(1, Ordering::Release);
assert!(
previous >= self.bins,
"previous: {}, bins: {}",
previous,
self.bins
); self.wait_dirty_or_aged.notify_all(); }
pub fn future_age_to_flush(&self, is_cached: bool) -> Age {
if is_cached {
&self.future_age_to_flush_cached
} else {
&self.future_age_to_flush
}
.load(Ordering::Acquire)
}
fn has_age_interval_elapsed(&self) -> bool {
self.age_timer.should_update(self.age_interval_ms())
}
pub fn get_startup(&self) -> bool {
self.startup.load(Ordering::Relaxed)
}
pub fn set_startup(&self, value: bool) {
if !value {
self.wait_for_idle();
}
self.startup.store(value, Ordering::Relaxed)
}
pub fn wait_for_idle(&self) {
assert!(self.get_startup());
if self.disk.is_none() {
return;
}
let end_age = self.current_age().wrapping_add(2);
loop {
self.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(self.age_interval_ms()));
if end_age == self.current_age() {
break;
}
}
}
pub fn current_age(&self) -> Age {
self.age.load(Ordering::Acquire)
}
pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
if can_advance_age {
self.maybe_advance_age_internal(
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
);
}
}
pub fn all_buckets_flushed_at_current_age(&self) -> bool {
self.all_buckets_flushed_at_current_age_internal(self.count_buckets_flushed())
}
fn all_buckets_flushed_at_current_age_internal(&self, count_buckets_flushed: usize) -> bool {
count_buckets_flushed >= self.bins
}
pub fn count_buckets_flushed(&self) -> usize {
self.count_buckets_flushed.load(Ordering::Acquire)
}
pub fn maybe_advance_age(&self) -> bool {
self.maybe_advance_age_internal(self.all_buckets_flushed_at_current_age())
}
fn maybe_advance_age_internal(&self, all_buckets_flushed_at_current_age: bool) -> bool {
if all_buckets_flushed_at_current_age && self.has_age_interval_elapsed() {
self.increment_age();
true
} else {
false
}
}
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
let ages_to_stay_in_cache = config
.as_ref()
.and_then(|config| config.ages_to_stay_in_cache)
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
let mut bucket_config = BucketMapConfig::new(bins);
bucket_config.drives = config.as_ref().and_then(|config| {
bucket_config.restart_config_file = config.drives.as_ref().and_then(|drives| {
drives
.first()
.map(|drive| drive.join("accounts_index_restart"))
});
config.drives.clone()
});
let mem_budget_mb = match config
.as_ref()
.map(|config| &config.index_limit_mb)
.unwrap_or(&IndexLimitMb::Unspecified)
{
IndexLimitMb::Limit(mb) => Some(*mb),
IndexLimitMb::InMemOnly => None,
IndexLimitMb::Unspecified => {
let mut use_default = true;
if !config
.as_ref()
.map(|config| config.started_from_validator)
.unwrap_or_default()
{
if let Ok(_limit) = std::env::var("SOLANA_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB")
{
use_default = false;
}
}
if use_default {
DEFAULT_DISK_INDEX
} else {
None
}
}
};
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
Self {
disk,
ages_to_stay_in_cache,
count_buckets_flushed: AtomicUsize::default(),
age: AtomicAge::default(),
future_age_to_flush: AtomicAge::new(ages_to_stay_in_cache),
future_age_to_flush_cached: AtomicAge::new(Age::MAX),
stats: BucketMapHolderStats::new(bins),
wait_dirty_or_aged: Arc::default(),
next_bucket_to_flush: AtomicUsize::new(0),
age_timer: AtomicInterval::default(),
bins,
startup: AtomicBool::default(),
mem_budget_mb,
threads,
_phantom: PhantomData,
startup_stats: Arc::default(),
}
}
pub fn next_bucket_to_flush(&self) -> usize {
self.next_bucket_to_flush
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
Some((bucket + 1) % self.bins)
})
.unwrap()
}
fn age_interval_ms(&self) -> u64 {
AGE_MS
}
fn throttling_wait_ms_internal(
&self,
interval_ms: u64,
elapsed_ms: u64,
bins_flushed: u64,
) -> Option<u64> {
let target_percent = 90; let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
return None;
}
let ms_per_s = 1_000;
let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
if expected_bins_processed_in_remaining_time > remaining_bins {
Some(1)
} else {
None
}
}
fn throttling_wait_ms(&self) -> Option<u64> {
let interval_ms = self.age_interval_ms();
let elapsed_ms = self.age_timer.elapsed_ms();
let bins_flushed = self.count_buckets_flushed() as u64;
self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
}
fn should_thread_sleep(&self) -> bool {
let bins_flushed = self.count_buckets_flushed();
if bins_flushed >= self.bins {
true
} else {
let active = self.stats.active_threads.load(Ordering::Relaxed);
bins_flushed.saturating_add(active as usize) >= self.bins
}
}
pub fn background(
&self,
exit: Vec<Arc<AtomicBool>>,
in_mem: Vec<Arc<InMemAccountsIndex<T, U>>>,
can_advance_age: bool,
) {
let bins = in_mem.len();
let flush = self.disk.is_some();
let mut throttling_wait_ms = None;
loop {
if !flush {
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
self.stats.remaining_until_next_interval(),
));
} else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
let mut wait = std::cmp::min(
self.age_timer
.remaining_until_next_interval(self.age_interval_ms()),
self.stats.remaining_until_next_interval(),
);
if !can_advance_age {
wait = wait.max(1);
}
if let Some(throttling_wait_ms) = throttling_wait_ms {
self.stats
.bg_throttling_wait_us
.fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
wait = std::cmp::min(throttling_wait_ms, wait);
}
let mut m = Measure::start("wait");
self.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(wait));
m.stop();
self.stats
.bg_waiting_us
.fetch_add(m.as_us(), Ordering::Relaxed);
if can_advance_age {
self.maybe_advance_age();
}
}
throttling_wait_ms = None;
if exit.iter().any(|exit| exit.load(Ordering::Relaxed)) {
break;
}
self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
for _ in 0..bins {
if flush {
let index = self.next_bucket_to_flush();
in_mem[index].flush(can_advance_age);
}
self.stats.report_stats(self);
if self.all_buckets_flushed_at_current_age() {
break;
}
throttling_wait_ms = self.throttling_wait_ms();
if throttling_wait_ms.is_some() {
break;
}
}
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
pub mod tests {
use {super::*, rayon::prelude::*, std::time::Instant};
#[test]
fn test_next_bucket_to_flush() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let visited = (0..bins)
.map(|_| AtomicUsize::default())
.collect::<Vec<_>>();
let iterations = bins * 30;
let threads = bins * 4;
let expected = threads * iterations / bins;
(0..threads).into_par_iter().for_each(|_| {
(0..iterations).for_each(|_| {
let bin = test.next_bucket_to_flush();
visited[bin].fetch_add(1, Ordering::Relaxed);
});
});
visited.iter().enumerate().for_each(|(bin, visited)| {
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {bin}")
});
}
#[test]
fn test_ages() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(0, test.current_age());
assert_eq!(test.ages_to_stay_in_cache, test.future_age_to_flush(false));
assert_eq!(Age::MAX, test.future_age_to_flush(true));
(0..bins).for_each(|_| {
test.bucket_flushed_at_current_age(false);
});
test.increment_age();
assert_eq!(1, test.current_age());
assert_eq!(
test.ages_to_stay_in_cache + 1,
test.future_age_to_flush(false)
);
assert_eq!(0, test.future_age_to_flush(true));
}
#[test]
fn test_age_increment() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
for age in 0..513 {
assert_eq!(test.current_age(), (age % 256) as Age);
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
}
test.count_buckets_flushed
.fetch_add(bins, Ordering::Release);
test.increment_age();
}
}
#[test]
fn test_throttle() {
solana_logger::setup();
let bins = 128;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let bins = test.bins as u64;
let interval_ms = test.age_interval_ms();
let elapsed_ms = interval_ms * 89 / 100;
let bins_flushed = bins - 1;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, None);
let elapsed_ms = interval_ms / 10;
let bins_flushed = bins - 1;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, Some(1));
let elapsed_ms = interval_ms * 5 / 100;
let bins_flushed = bins * 8 / 100;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, Some(1));
let elapsed_ms = interval_ms * 11 / 100;
let bins_flushed = bins * 12 / 100;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, None);
}
#[test]
fn test_disk_index_enabled() {
let bins = 1;
let config = AccountsIndexConfig {
index_limit_mb: IndexLimitMb::Limit(0),
..AccountsIndexConfig::default()
};
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(config), 1);
assert!(test.is_disk_index_enabled());
}
#[test]
fn test_age_time() {
solana_logger::setup();
let bins = 1;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let threads = 2;
let time = AGE_MS * 8 / 3;
let expected = (time / AGE_MS) as Age;
let now = Instant::now();
test.bucket_flushed_at_current_age(true); (0..threads).into_par_iter().for_each(|_| {
while now.elapsed().as_millis() < (time as u128) * 100 {
if test.maybe_advance_age() {
test.bucket_flushed_at_current_age(true);
}
if test.current_age() >= expected {
break;
}
}
});
assert!(
test.current_age() >= expected,
"{}, {}",
test.current_age(),
expected
);
}
#[test]
fn test_age_broad() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64, u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(test.current_age(), 0);
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
test.bucket_flushed_at_current_age(true);
}
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
test.maybe_advance_age();
assert_eq!(test.current_age(), 1);
assert!(!test.all_buckets_flushed_at_current_age());
}
}