use super::{CacheSize, NodeCached};
use hash_db::Hasher;
use nohash_hasher::BuildNoHashHasher;
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use schnellru::LruMap;
use std::{
collections::{hash_map::Entry as SetEntry, HashMap},
hash::{BuildHasher, Hasher as _},
sync::Arc,
};
use trie_db::{node::NodeOwned, CachedValue};
lazy_static::lazy_static! {
static ref RANDOM_STATE: ahash::RandomState = {
use rand::Rng;
let mut rng = rand::thread_rng();
ahash::RandomState::generate_with(rng.gen(), rng.gen(), rng.gen(), rng.gen())
};
}
pub struct SharedNodeCacheLimiter {
max_inline_size: usize,
max_heap_size: usize,
heap_size: usize,
items_evicted: usize,
max_items_evicted: usize,
}
impl<H> schnellru::Limiter<H, NodeOwned<H>> for SharedNodeCacheLimiter
where
H: AsRef<[u8]>,
{
type KeyToInsert<'a> = H;
type LinkType = u32;
#[inline]
fn is_over_the_limit(&self, _length: usize) -> bool {
self.items_evicted <= self.max_items_evicted && self.heap_size > self.max_heap_size
}
#[inline]
fn on_insert(
&mut self,
_length: usize,
key: Self::KeyToInsert<'_>,
node: NodeOwned<H>,
) -> Option<(H, NodeOwned<H>)> {
let new_item_heap_size = node.size_in_bytes() - std::mem::size_of::<NodeOwned<H>>();
if new_item_heap_size > self.max_heap_size {
return None
}
self.heap_size += new_item_heap_size;
Some((key, node))
}
#[inline]
fn on_replace(
&mut self,
_length: usize,
_old_key: &mut H,
_new_key: H,
old_node: &mut NodeOwned<H>,
new_node: &mut NodeOwned<H>,
) -> bool {
debug_assert_eq!(_old_key.as_ref(), _new_key.as_ref());
let new_item_heap_size = new_node.size_in_bytes() - std::mem::size_of::<NodeOwned<H>>();
if new_item_heap_size > self.max_heap_size {
return false
}
let old_item_heap_size = old_node.size_in_bytes() - std::mem::size_of::<NodeOwned<H>>();
self.heap_size = self.heap_size - old_item_heap_size + new_item_heap_size;
true
}
#[inline]
fn on_cleared(&mut self) {
self.heap_size = 0;
}
#[inline]
fn on_removed(&mut self, _: &mut H, node: &mut NodeOwned<H>) {
self.heap_size -= node.size_in_bytes() - std::mem::size_of::<NodeOwned<H>>();
self.items_evicted += 1;
}
#[inline]
fn on_grow(&mut self, new_memory_usage: usize) -> bool {
new_memory_usage <= self.max_inline_size
}
}
pub struct SharedValueCacheLimiter {
max_inline_size: usize,
max_heap_size: usize,
heap_size: usize,
known_storage_keys: HashMap<Arc<[u8]>, (), ahash::RandomState>,
items_evicted: usize,
max_items_evicted: usize,
}
impl<H> schnellru::Limiter<ValueCacheKey<H>, CachedValue<H>> for SharedValueCacheLimiter
where
H: AsRef<[u8]>,
{
type KeyToInsert<'a> = ValueCacheKey<H>;
type LinkType = u32;
#[inline]
fn is_over_the_limit(&self, _length: usize) -> bool {
self.items_evicted <= self.max_items_evicted && self.heap_size > self.max_heap_size
}
#[inline]
fn on_insert(
&mut self,
_length: usize,
mut key: Self::KeyToInsert<'_>,
value: CachedValue<H>,
) -> Option<(ValueCacheKey<H>, CachedValue<H>)> {
match self.known_storage_keys.entry(key.storage_key.clone()) {
SetEntry::Vacant(entry) => {
let new_item_heap_size = key.storage_key.len();
if new_item_heap_size > self.max_heap_size {
return None
}
self.heap_size += new_item_heap_size;
entry.insert(());
},
SetEntry::Occupied(entry) => {
key.storage_key = entry.key().clone();
},
}
Some((key, value))
}
#[inline]
fn on_replace(
&mut self,
_length: usize,
_old_key: &mut ValueCacheKey<H>,
_new_key: ValueCacheKey<H>,
_old_value: &mut CachedValue<H>,
_new_value: &mut CachedValue<H>,
) -> bool {
debug_assert_eq!(_new_key.storage_key, _old_key.storage_key);
true
}
#[inline]
fn on_removed(&mut self, key: &mut ValueCacheKey<H>, _: &mut CachedValue<H>) {
if Arc::strong_count(&key.storage_key) == 2 {
self.heap_size -= key.storage_key.len();
self.known_storage_keys.remove(&key.storage_key);
}
self.items_evicted += 1;
}
#[inline]
fn on_cleared(&mut self) {
self.heap_size = 0;
self.known_storage_keys.clear();
}
#[inline]
fn on_grow(&mut self, new_memory_usage: usize) -> bool {
new_memory_usage <= self.max_inline_size
}
}
type SharedNodeCacheMap<H> =
LruMap<H, NodeOwned<H>, SharedNodeCacheLimiter, schnellru::RandomState>;
pub(super) struct SharedNodeCache<H>
where
H: AsRef<[u8]>,
{
pub(super) lru: SharedNodeCacheMap<H>,
}
impl<H: AsRef<[u8]> + Eq + std::hash::Hash> SharedNodeCache<H> {
fn new(max_inline_size: usize, max_heap_size: usize) -> Self {
Self {
lru: LruMap::new(SharedNodeCacheLimiter {
max_inline_size,
max_heap_size,
heap_size: 0,
items_evicted: 0,
max_items_evicted: 0, }),
}
}
pub fn update(&mut self, list: impl IntoIterator<Item = (H, NodeCached<H>)>) {
let mut access_count = 0;
let mut add_count = 0;
self.lru.limiter_mut().items_evicted = 0;
self.lru.limiter_mut().max_items_evicted =
self.lru.len() * 100 / super::SHARED_NODE_CACHE_MAX_REPLACE_PERCENT;
for (key, cached_node) in list {
if cached_node.is_from_shared_cache {
if self.lru.get(&key).is_some() {
access_count += 1;
if access_count >= super::SHARED_NODE_CACHE_MAX_PROMOTED_KEYS {
break
}
continue
}
}
self.lru.insert(key, cached_node.node);
add_count += 1;
if self.lru.limiter().items_evicted > self.lru.limiter().max_items_evicted {
break
}
}
tracing::debug!(
target: super::LOG_TARGET,
"Updated the shared node cache: {} accesses, {} new values, {}/{} evicted (length = {}, inline size={}/{}, heap size={}/{})",
access_count,
add_count,
self.lru.limiter().items_evicted,
self.lru.limiter().max_items_evicted,
self.lru.len(),
self.lru.memory_usage(),
self.lru.limiter().max_inline_size,
self.lru.limiter().heap_size,
self.lru.limiter().max_heap_size,
);
}
fn reset(&mut self) {
self.lru.clear();
}
}
#[derive(PartialEq, Eq, Clone, Copy, Hash)]
#[repr(transparent)]
pub struct ValueCacheKeyHash(u64);
impl ValueCacheKeyHash {
pub fn raw(self) -> u64 {
self.0
}
}
impl ValueCacheKeyHash {
pub fn from_hasher_and_storage_key(
mut hasher: impl std::hash::Hasher,
storage_key: &[u8],
) -> Self {
hasher.write(storage_key);
Self(hasher.finish())
}
}
impl nohash_hasher::IsEnabled for ValueCacheKeyHash {}
#[derive(Eq)]
pub(super) struct ValueCacheKey<H> {
pub storage_root: H,
pub storage_key: Arc<[u8]>,
pub hash: ValueCacheKeyHash,
}
pub(super) struct ValueCacheRef<'a, H> {
pub storage_root: H,
pub storage_key: &'a [u8],
pub hash: ValueCacheKeyHash,
}
impl<'a, H> ValueCacheRef<'a, H> {
pub fn new(storage_key: &'a [u8], storage_root: H) -> Self
where
H: AsRef<[u8]>,
{
let hash = ValueCacheKey::<H>::hash_data(&storage_key, &storage_root);
Self { storage_root, storage_key, hash }
}
}
impl<'a, H> From<ValueCacheRef<'a, H>> for ValueCacheKey<H> {
fn from(value: ValueCacheRef<'a, H>) -> Self {
ValueCacheKey {
storage_root: value.storage_root,
storage_key: value.storage_key.into(),
hash: value.hash,
}
}
}
impl<'a, H: std::hash::Hash> std::hash::Hash for ValueCacheRef<'a, H> {
fn hash<Hasher: std::hash::Hasher>(&self, state: &mut Hasher) {
self.hash.hash(state)
}
}
impl<'a, H> PartialEq<ValueCacheKey<H>> for ValueCacheRef<'a, H>
where
H: AsRef<[u8]>,
{
fn eq(&self, rhs: &ValueCacheKey<H>) -> bool {
self.storage_root.as_ref() == rhs.storage_root.as_ref() &&
self.storage_key == &*rhs.storage_key
}
}
impl<H> ValueCacheKey<H> {
#[cfg(test)] pub fn new_value(storage_key: impl Into<Arc<[u8]>>, storage_root: H) -> Self
where
H: AsRef<[u8]>,
{
let storage_key = storage_key.into();
let hash = Self::hash_data(&storage_key, &storage_root);
Self { storage_root, storage_key, hash }
}
pub fn hash_partial_data(storage_root: &H) -> impl std::hash::Hasher + Clone
where
H: AsRef<[u8]>,
{
let mut hasher = RANDOM_STATE.build_hasher();
hasher.write(storage_root.as_ref());
hasher
}
pub fn hash_data(key: &[u8], storage_root: &H) -> ValueCacheKeyHash
where
H: AsRef<[u8]>,
{
let hasher = Self::hash_partial_data(storage_root);
ValueCacheKeyHash::from_hasher_and_storage_key(hasher, key)
}
#[inline]
pub fn is_eq(&self, storage_root: &H, storage_key: &[u8]) -> bool
where
H: PartialEq,
{
self.storage_root == *storage_root && *self.storage_key == *storage_key
}
}
impl<H: std::hash::Hash> std::hash::Hash for ValueCacheKey<H> {
fn hash<Hasher: std::hash::Hasher>(&self, state: &mut Hasher) {
self.hash.hash(state)
}
}
impl<H> nohash_hasher::IsEnabled for ValueCacheKey<H> {}
impl<H: PartialEq> PartialEq for ValueCacheKey<H> {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.is_eq(&other.storage_root, &other.storage_key)
}
}
type SharedValueCacheMap<H> = schnellru::LruMap<
ValueCacheKey<H>,
CachedValue<H>,
SharedValueCacheLimiter,
BuildNoHashHasher<ValueCacheKey<H>>,
>;
pub(super) struct SharedValueCache<H>
where
H: AsRef<[u8]>,
{
pub(super) lru: SharedValueCacheMap<H>,
}
impl<H: Eq + std::hash::Hash + Clone + Copy + AsRef<[u8]>> SharedValueCache<H> {
fn new(max_inline_size: usize, max_heap_size: usize) -> Self {
Self {
lru: schnellru::LruMap::with_hasher(
SharedValueCacheLimiter {
max_inline_size,
max_heap_size,
heap_size: 0,
known_storage_keys: HashMap::with_hasher(RANDOM_STATE.clone()),
items_evicted: 0,
max_items_evicted: 0, },
Default::default(),
),
}
}
pub fn update(
&mut self,
added: impl IntoIterator<Item = (ValueCacheKey<H>, CachedValue<H>)>,
accessed: impl IntoIterator<Item = ValueCacheKeyHash>,
) {
let mut access_count = 0;
let mut add_count = 0;
for hash in accessed {
self.lru.get_by_hash(hash.raw(), |existing_key, _| existing_key.hash == hash);
access_count += 1;
}
self.lru.limiter_mut().items_evicted = 0;
self.lru.limiter_mut().max_items_evicted =
self.lru.len() * 100 / super::SHARED_VALUE_CACHE_MAX_REPLACE_PERCENT;
for (key, value) in added {
self.lru.insert(key, value);
add_count += 1;
if self.lru.limiter().items_evicted > self.lru.limiter().max_items_evicted {
break
}
}
tracing::debug!(
target: super::LOG_TARGET,
"Updated the shared value cache: {} accesses, {} new values, {}/{} evicted (length = {}, known_storage_keys = {}, inline size={}/{}, heap size={}/{})",
access_count,
add_count,
self.lru.limiter().items_evicted,
self.lru.limiter().max_items_evicted,
self.lru.len(),
self.lru.limiter().known_storage_keys.len(),
self.lru.memory_usage(),
self.lru.limiter().max_inline_size,
self.lru.limiter().heap_size,
self.lru.limiter().max_heap_size
);
}
fn reset(&mut self) {
self.lru.clear();
}
}
pub(super) struct SharedTrieCacheInner<H: Hasher> {
node_cache: SharedNodeCache<H::Out>,
value_cache: SharedValueCache<H::Out>,
}
impl<H: Hasher> SharedTrieCacheInner<H> {
#[cfg(test)]
pub(super) fn value_cache(&self) -> &SharedValueCache<H::Out> {
&self.value_cache
}
pub(super) fn value_cache_mut(&mut self) -> &mut SharedValueCache<H::Out> {
&mut self.value_cache
}
#[cfg(test)]
pub(super) fn node_cache(&self) -> &SharedNodeCache<H::Out> {
&self.node_cache
}
pub(super) fn node_cache_mut(&mut self) -> &mut SharedNodeCache<H::Out> {
&mut self.node_cache
}
}
pub struct SharedTrieCache<H: Hasher> {
inner: Arc<RwLock<SharedTrieCacheInner<H>>>,
}
impl<H: Hasher> Clone for SharedTrieCache<H> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<H: Hasher> SharedTrieCache<H> {
pub fn new(cache_size: CacheSize) -> Self {
let total_budget = cache_size.0;
let value_cache_budget = (total_budget as f32 * 0.20) as usize; let node_cache_budget = total_budget - value_cache_budget; let value_cache_inline_budget = (value_cache_budget as f32 * 0.70) as usize;
let node_cache_inline_budget = (node_cache_budget as f32 * 0.70) as usize;
let value_cache_max_inline_size =
SharedValueCacheMap::<H::Out>::memory_usage_for_memory_budget(
value_cache_inline_budget,
);
let node_cache_max_inline_size =
SharedNodeCacheMap::<H::Out>::memory_usage_for_memory_budget(node_cache_inline_budget);
let value_cache_max_heap_size = value_cache_budget - value_cache_max_inline_size;
let node_cache_max_heap_size = node_cache_budget - node_cache_max_inline_size;
tracing::debug!(
target: super::LOG_TARGET,
"Configured a shared trie cache with a budget of ~{} bytes (node_cache_max_inline_size = {}, node_cache_max_heap_size = {}, value_cache_max_inline_size = {}, value_cache_max_heap_size = {})",
total_budget,
node_cache_max_inline_size,
node_cache_max_heap_size,
value_cache_max_inline_size,
value_cache_max_heap_size,
);
Self {
inner: Arc::new(RwLock::new(SharedTrieCacheInner {
node_cache: SharedNodeCache::new(
node_cache_max_inline_size,
node_cache_max_heap_size,
),
value_cache: SharedValueCache::new(
value_cache_max_inline_size,
value_cache_max_heap_size,
),
})),
}
}
pub fn local_cache(&self) -> super::LocalTrieCache<H> {
super::LocalTrieCache {
shared: self.clone(),
node_cache: Default::default(),
value_cache: Default::default(),
shared_value_cache_access: Mutex::new(super::ValueAccessSet::with_hasher(
schnellru::ByLength::new(super::SHARED_VALUE_CACHE_MAX_PROMOTED_KEYS),
Default::default(),
)),
stats: Default::default(),
}
}
#[inline]
pub fn peek_node(&self, key: &H::Out) -> Option<NodeOwned<H::Out>> {
self.inner.read().node_cache.lru.peek(key).cloned()
}
pub fn peek_value_by_hash(
&self,
hash: ValueCacheKeyHash,
storage_root: &H::Out,
storage_key: &[u8],
) -> Option<CachedValue<H::Out>> {
self.inner
.read()
.value_cache
.lru
.peek_by_hash(hash.0, |existing_key, _| existing_key.is_eq(storage_root, storage_key))
.cloned()
}
pub fn used_memory_size(&self) -> usize {
let inner = self.inner.read();
let value_cache_size =
inner.value_cache.lru.memory_usage() + inner.value_cache.lru.limiter().heap_size;
let node_cache_size =
inner.node_cache.lru.memory_usage() + inner.node_cache.lru.limiter().heap_size;
node_cache_size + value_cache_size
}
pub fn reset_node_cache(&self) {
self.inner.write().node_cache.reset();
}
pub fn reset_value_cache(&self) {
self.inner.write().value_cache.reset();
}
pub fn reset(&self) {
self.reset_node_cache();
self.reset_value_cache();
}
#[cfg(test)]
pub(super) fn read_lock_inner(
&self,
) -> parking_lot::RwLockReadGuard<'_, SharedTrieCacheInner<H>> {
self.inner.read()
}
pub(super) fn write_lock_inner(&self) -> Option<RwLockWriteGuard<'_, SharedTrieCacheInner<H>>> {
self.inner.try_write_for(super::SHARED_CACHE_WRITE_LOCK_TIMEOUT)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256 as Hash;
#[test]
fn shared_value_cache_works() {
let mut cache = SharedValueCache::<sp_core::H256>::new(usize::MAX, 10 * 10);
let key = vec![0; 10];
let root0 = Hash::repeat_byte(1);
let root1 = Hash::repeat_byte(2);
cache.update(
vec![
(ValueCacheKey::new_value(&key[..], root0), CachedValue::NonExisting),
(ValueCacheKey::new_value(&key[..], root1), CachedValue::NonExisting),
],
vec![],
);
assert_eq!(1, cache.lru.limiter_mut().known_storage_keys.len());
assert_eq!(
3, Arc::strong_count(
cache.lru.limiter_mut().known_storage_keys.get_key_value(&key[..]).unwrap().0
)
);
assert_eq!(key.len(), cache.lru.limiter().heap_size);
assert_eq!(cache.lru.len(), 2);
assert_eq!(cache.lru.peek_newest().unwrap().0.storage_root, root1);
assert_eq!(cache.lru.peek_oldest().unwrap().0.storage_root, root0);
assert!(cache.lru.limiter().heap_size <= cache.lru.limiter().max_heap_size);
assert_eq!(cache.lru.limiter().heap_size, 10);
cache.update(vec![], vec![ValueCacheKey::hash_data(&key[..], &root0)]);
assert_eq!(1, cache.lru.limiter_mut().known_storage_keys.len());
assert_eq!(
3,
Arc::strong_count(
cache.lru.limiter_mut().known_storage_keys.get_key_value(&key[..]).unwrap().0
)
);
assert_eq!(key.len(), cache.lru.limiter().heap_size);
assert_eq!(cache.lru.len(), 2);
assert_eq!(cache.lru.peek_newest().unwrap().0.storage_root, root0);
assert_eq!(cache.lru.peek_oldest().unwrap().0.storage_root, root1);
assert!(cache.lru.limiter().heap_size <= cache.lru.limiter().max_heap_size);
assert_eq!(cache.lru.limiter().heap_size, 10);
cache.update(
vec![
(ValueCacheKey::new_value(&key[..], root1), CachedValue::NonExisting),
(ValueCacheKey::new_value(&key[..], root0), CachedValue::NonExisting),
],
vec![],
);
assert_eq!(1, cache.lru.limiter_mut().known_storage_keys.len());
assert_eq!(
3,
Arc::strong_count(
cache.lru.limiter_mut().known_storage_keys.get_key_value(&key[..]).unwrap().0
)
);
assert_eq!(key.len(), cache.lru.limiter().heap_size);
assert_eq!(cache.lru.len(), 2);
assert_eq!(cache.lru.peek_newest().unwrap().0.storage_root, root0);
assert_eq!(cache.lru.peek_oldest().unwrap().0.storage_root, root1);
assert!(cache.lru.limiter().heap_size <= cache.lru.limiter().max_heap_size);
assert_eq!(cache.lru.limiter().items_evicted, 0);
assert_eq!(cache.lru.limiter().heap_size, 10);
cache.update(
(1..11)
.map(|i| vec![i; 10])
.map(|key| (ValueCacheKey::new_value(&key[..], root0), CachedValue::NonExisting)),
vec![],
);
assert_eq!(cache.lru.limiter().items_evicted, 2);
assert_eq!(10, cache.lru.len());
assert_eq!(10, cache.lru.limiter_mut().known_storage_keys.len());
assert!(cache.lru.limiter_mut().known_storage_keys.get_key_value(&key[..]).is_none());
assert_eq!(key.len() * 10, cache.lru.limiter().heap_size);
assert_eq!(cache.lru.len(), 10);
assert!(cache.lru.limiter().heap_size <= cache.lru.limiter().max_heap_size);
assert_eq!(cache.lru.limiter().heap_size, 100);
assert!(matches!(
cache.lru.peek(&ValueCacheKey::new_value(&[1; 10][..], root0)).unwrap(),
CachedValue::<Hash>::NonExisting
));
assert!(cache.lru.peek(&ValueCacheKey::new_value(&[1; 10][..], root1)).is_none(),);
assert!(cache.lru.peek(&ValueCacheKey::new_value(&key[..], root0)).is_none());
assert!(cache.lru.peek(&ValueCacheKey::new_value(&key[..], root1)).is_none());
cache.update(
vec![(ValueCacheKey::new_value(vec![10; 10], root0), CachedValue::NonExisting)],
vec![],
);
assert!(cache.lru.limiter_mut().known_storage_keys.get_key_value(&key[..]).is_none());
}
}