solana_runtime/bucket_map_holder.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
use {
crate::{
accounts_index::{AccountsIndexConfig, IndexLimitMb, IndexValue},
bucket_map_holder_stats::BucketMapHolderStats,
in_mem_accounts_index::InMemAccountsIndex,
waitable_condvar::WaitableCondvar,
},
solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig},
safecoin_measure::measure::Measure,
solana_sdk::{
clock::{Slot, SLOT_MS},
timing::AtomicInterval,
},
std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
Arc,
},
time::Duration,
},
};
pub type Age = u8;
const AGE_MS: u64 = SLOT_MS; // match one age per slot time
// 10 GB limit for in-mem idx. In practice, we don't get this high. This tunes how aggressively to save items we expect to use soon.
pub const DEFAULT_DISK_INDEX: Option<usize> = Some(10_000);
pub struct BucketMapHolder<T: IndexValue> {
pub disk: Option<BucketMap<(Slot, T)>>,
pub count_buckets_flushed: AtomicUsize,
pub age: AtomicU8,
pub stats: BucketMapHolderStats,
age_timer: AtomicInterval,
// used by bg processing to know when any bucket has become dirty
pub wait_dirty_or_aged: Arc<WaitableCondvar>,
next_bucket_to_flush: AtomicUsize,
bins: usize,
pub threads: usize,
// how much mb are we allowed to keep in the in-mem index?
// Rest goes to disk.
pub mem_budget_mb: Option<usize>,
/// how many ages should elapse from the last time an item is used where the item will remain in the cache
pub ages_to_stay_in_cache: Age,
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
/// and writing to disk in parallel are.
/// Note startup is an optimization and is not required for correctness.
startup: AtomicBool,
}
impl<T: IndexValue> Debug for BucketMapHolder<T> {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[allow(clippy::mutex_atomic)]
impl<T: IndexValue> BucketMapHolder<T> {
/// is the accounts index using disk as a backing store
pub fn is_disk_index_enabled(&self) -> bool {
self.disk.is_some()
}
pub fn increment_age(&self) {
// since we are about to change age, there are now 0 buckets that have been flushed at this age
// this should happen before the age.fetch_add
// Otherwise, as soon as we increment the age, a thread could race us and flush before we swap this out since it detects the age has moved forward and a bucket will be eligible for flushing.
let previous = self.count_buckets_flushed.swap(0, Ordering::AcqRel);
// fetch_add is defined to wrap.
// That's what we want. 0..255, then back to 0.
self.age.fetch_add(1, Ordering::Release);
assert!(
previous >= self.bins,
"previous: {}, bins: {}",
previous,
self.bins
); // we should not have increased age before previous age was fully flushed
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
}
pub fn future_age_to_flush(&self) -> Age {
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
}
fn has_age_interval_elapsed(&self) -> bool {
// note that when this returns true, state of age_timer is modified
self.age_timer.should_update(self.age_interval_ms())
}
/// used by bg processes to determine # active threads and how aggressively to flush
pub fn get_startup(&self) -> bool {
self.startup.load(Ordering::Relaxed)
}
/// startup=true causes:
/// in mem to act in a way that flushes to disk asap
/// startup=false is 'normal' operation
pub fn set_startup(&self, value: bool) {
if !value {
self.wait_for_idle();
}
self.startup.store(value, Ordering::Relaxed)
}
/// return when the bg threads have reached an 'idle' state
pub(crate) fn wait_for_idle(&self) {
assert!(self.get_startup());
if self.disk.is_none() {
return;
}
// when age has incremented twice, we know that we have made it through scanning all bins since we started waiting,
// so we are then 'idle'
let end_age = self.current_age().wrapping_add(2);
loop {
self.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(self.age_interval_ms()));
if end_age == self.current_age() {
break;
}
}
}
pub fn current_age(&self) -> Age {
self.age.load(Ordering::Acquire)
}
pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
if can_advance_age {
self.maybe_advance_age_internal(
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
);
}
}
/// have all buckets been flushed at the current age?
pub fn all_buckets_flushed_at_current_age(&self) -> bool {
self.all_buckets_flushed_at_current_age_internal(self.count_buckets_flushed())
}
/// have all buckets been flushed at the current age?
fn all_buckets_flushed_at_current_age_internal(&self, count_buckets_flushed: usize) -> bool {
count_buckets_flushed >= self.bins
}
pub fn count_buckets_flushed(&self) -> usize {
self.count_buckets_flushed.load(Ordering::Acquire)
}
/// if all buckets are flushed at the current age and time has elapsed, then advance age
pub fn maybe_advance_age(&self) -> bool {
self.maybe_advance_age_internal(self.all_buckets_flushed_at_current_age())
}
/// if all buckets are flushed at the current age and time has elapsed, then advance age
fn maybe_advance_age_internal(&self, all_buckets_flushed_at_current_age: bool) -> bool {
// call has_age_interval_elapsed last since calling it modifies state on success
if all_buckets_flushed_at_current_age && self.has_age_interval_elapsed() {
self.increment_age();
true
} else {
false
}
}
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>, threads: usize) -> Self {
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
let ages_to_stay_in_cache = config
.as_ref()
.and_then(|config| config.ages_to_stay_in_cache)
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
let mut bucket_config = BucketMapConfig::new(bins);
bucket_config.drives = config.as_ref().and_then(|config| config.drives.clone());
let mem_budget_mb = match config
.as_ref()
.map(|config| &config.index_limit_mb)
.unwrap_or(&IndexLimitMb::Unspecified)
{
// creator said to use disk idx with a specific limit
IndexLimitMb::Limit(mb) => Some(*mb),
// creator said InMemOnly, so no disk index
IndexLimitMb::InMemOnly => None,
// whatever started us didn't specify whether to use the acct idx
IndexLimitMb::Unspecified => {
// check env var if we were not started from a validator
let mut use_default = true;
if !config
.as_ref()
.map(|config| config.started_from_validator)
.unwrap_or_default()
{
if let Ok(_limit) = std::env::var("SAFECOIN_TEST_ACCOUNTS_INDEX_MEMORY_LIMIT_MB")
{
// Note this env var means the opposite of the default. The default now is disk index is on.
// So, if this env var is set, DO NOT allocate with disk buckets if mem budget was not set, we were NOT started from validator, and env var was set
// we do not want the env var to have an effect when running the validator (only tests, benches, etc.)
use_default = false;
}
}
if use_default {
// if validator does not specify disk index limit or specify in mem only, then this is the default
DEFAULT_DISK_INDEX
} else {
None
}
}
};
// only allocate if mem_budget_mb is Some
let disk = mem_budget_mb.map(|_| BucketMap::new(bucket_config));
Self {
disk,
ages_to_stay_in_cache,
count_buckets_flushed: AtomicUsize::default(),
age: AtomicU8::default(),
stats: BucketMapHolderStats::new(bins),
wait_dirty_or_aged: Arc::default(),
next_bucket_to_flush: AtomicUsize::new(0),
age_timer: AtomicInterval::default(),
bins,
startup: AtomicBool::default(),
mem_budget_mb,
threads,
}
}
// get the next bucket to flush, with the idea that the previous bucket
// is perhaps being flushed by another thread already.
pub fn next_bucket_to_flush(&self) -> usize {
self.next_bucket_to_flush
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |bucket| {
Some((bucket + 1) % self.bins)
})
.unwrap()
}
/// prepare for this to be dynamic if necessary
/// For example, maybe startup has a shorter age interval.
fn age_interval_ms(&self) -> u64 {
AGE_MS
}
/// return an amount of ms to sleep
fn throttling_wait_ms_internal(
&self,
interval_ms: u64,
elapsed_ms: u64,
bins_flushed: u64,
) -> Option<u64> {
let target_percent = 90; // aim to finish in 90% of the allocated time
let remaining_ms = (interval_ms * target_percent / 100).saturating_sub(elapsed_ms);
let remaining_bins = (self.bins as u64).saturating_sub(bins_flushed);
if remaining_bins == 0 || remaining_ms == 0 || elapsed_ms == 0 || bins_flushed == 0 {
// any of these conditions result in 'do not wait due to progress'
return None;
}
let ms_per_s = 1_000;
let rate_bins_per_s = bins_flushed * ms_per_s / elapsed_ms;
let expected_bins_processed_in_remaining_time = rate_bins_per_s * remaining_ms / ms_per_s;
if expected_bins_processed_in_remaining_time > remaining_bins {
// wait because we predict will finish prior to target
Some(1)
} else {
// do not wait because we predict will finish after target
None
}
}
/// Check progress this age.
/// Return ms to wait to get closer to the wait target and spread out work over the entire age interval.
/// Goal is to avoid cpu spikes at beginning of age interval.
fn throttling_wait_ms(&self) -> Option<u64> {
let interval_ms = self.age_interval_ms();
let elapsed_ms = self.age_timer.elapsed_ms();
let bins_flushed = self.count_buckets_flushed() as u64;
self.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed)
}
/// true if this thread can sleep
fn should_thread_sleep(&self) -> bool {
let bins_flushed = self.count_buckets_flushed();
if bins_flushed >= self.bins {
// all bins flushed, so this thread can sleep
true
} else {
// at least 1 thread running for each bin that still needs to be flushed, so this thread can sleep
let active = self.stats.active_threads.load(Ordering::Relaxed);
bins_flushed.saturating_add(active as usize) >= self.bins
}
}
// intended to execute in a bg thread
pub fn background(
&self,
exit: Arc<AtomicBool>,
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
can_advance_age: bool,
) {
let bins = in_mem.len();
let flush = self.disk.is_some();
let mut throttling_wait_ms = None;
loop {
if !flush {
self.wait_dirty_or_aged.wait_timeout(Duration::from_millis(
self.stats.remaining_until_next_interval(),
));
} else if self.should_thread_sleep() || throttling_wait_ms.is_some() {
let mut wait = std::cmp::min(
self.age_timer
.remaining_until_next_interval(self.age_interval_ms()),
self.stats.remaining_until_next_interval(),
);
if !can_advance_age {
// if this thread cannot advance age, then make sure we don't sleep 0
wait = wait.max(1);
}
if let Some(throttling_wait_ms) = throttling_wait_ms {
self.stats
.bg_throttling_wait_us
.fetch_add(throttling_wait_ms * 1000, Ordering::Relaxed);
wait = std::cmp::min(throttling_wait_ms, wait);
}
let mut m = Measure::start("wait");
self.wait_dirty_or_aged
.wait_timeout(Duration::from_millis(wait));
m.stop();
self.stats
.bg_waiting_us
.fetch_add(m.as_us(), Ordering::Relaxed);
// likely some time has elapsed. May have been waiting for age time interval to elapse.
if can_advance_age {
self.maybe_advance_age();
}
}
throttling_wait_ms = None;
if exit.load(Ordering::Relaxed) {
break;
}
self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
for _ in 0..bins {
if flush {
let index = self.next_bucket_to_flush();
in_mem[index].flush(can_advance_age);
}
self.stats.report_stats(self);
if self.all_buckets_flushed_at_current_age() {
break;
}
throttling_wait_ms = self.throttling_wait_ms();
if throttling_wait_ms.is_some() {
break;
}
}
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
pub mod tests {
use {super::*, rayon::prelude::*, std::time::Instant};
#[test]
fn test_next_bucket_to_flush() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let visited = (0..bins)
.into_iter()
.map(|_| AtomicUsize::default())
.collect::<Vec<_>>();
let iterations = bins * 30;
let threads = bins * 4;
let expected = threads * iterations / bins;
(0..threads).into_par_iter().for_each(|_| {
(0..iterations).into_iter().for_each(|_| {
let bin = test.next_bucket_to_flush();
visited[bin].fetch_add(1, Ordering::Relaxed);
});
});
visited.iter().enumerate().for_each(|(bin, visited)| {
assert_eq!(visited.load(Ordering::Relaxed), expected, "bin: {}", bin)
});
}
#[test]
fn test_age_increment() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
for age in 0..513 {
assert_eq!(test.current_age(), (age % 256) as Age);
// inc all
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
// cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
}
// this would normally happen once time went off and all buckets had been flushed at the previous age
test.count_buckets_flushed
.fetch_add(bins, Ordering::Release);
test.increment_age();
}
}
#[test]
fn test_throttle() {
solana_logger::setup();
let bins = 128;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let bins = test.bins as u64;
let interval_ms = test.age_interval_ms();
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
let elapsed_ms = interval_ms * 89 / 100;
let bins_flushed = bins - 1;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, None);
// 10% of time, all bins but 1, should wait
let elapsed_ms = interval_ms / 10;
let bins_flushed = bins - 1;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, Some(1));
// 5% of time, 8% of bins, should wait. target is 90%. These #s roughly work
let elapsed_ms = interval_ms * 5 / 100;
let bins_flushed = bins * 8 / 100;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, Some(1));
// 11% of time, 12% of bins, should NOT wait. target is 90%. These #s roughly work
let elapsed_ms = interval_ms * 11 / 100;
let bins_flushed = bins * 12 / 100;
let result = test.throttling_wait_ms_internal(interval_ms, elapsed_ms, bins_flushed);
assert_eq!(result, None);
}
#[test]
fn test_disk_index_enabled() {
let bins = 1;
let config = AccountsIndexConfig {
index_limit_mb: IndexLimitMb::Limit(0),
..AccountsIndexConfig::default()
};
let test = BucketMapHolder::<u64>::new(bins, &Some(config), 1);
assert!(test.is_disk_index_enabled());
}
#[test]
fn test_age_time() {
solana_logger::setup();
let bins = 1;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
let threads = 2;
let time = AGE_MS * 8 / 3;
let expected = (time / AGE_MS) as Age;
let now = Instant::now();
test.bucket_flushed_at_current_age(true); // done with age 0
(0..threads).into_par_iter().for_each(|_| {
// This test used to be more strict with time, but in a parallel, multi test environment,
// sometimes threads starve and this test intermittently fails. So, give it more time than it should require.
// This may be aggrevated by the strategy of only allowing thread 0 to advance the age.
while now.elapsed().as_millis() < (time as u128) * 100 {
if test.maybe_advance_age() {
test.bucket_flushed_at_current_age(true);
}
if test.current_age() >= expected {
break;
}
}
});
assert!(
test.current_age() >= expected,
"{}, {}",
test.current_age(),
expected
);
}
#[test]
fn test_age_broad() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(test.current_age(), 0);
for _ in 0..bins {
assert!(!test.all_buckets_flushed_at_current_age());
test.bucket_flushed_at_current_age(true);
}
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
test.maybe_advance_age();
assert_eq!(test.current_age(), 1);
assert!(!test.all_buckets_flushed_at_current_age());
}
}