pub struct MokaCache<K, V, S = RandomState> { /* private fields */ }
manager-moka
only.Expand description
A thread-safe, futures-aware concurrent in-memory cache.
Cache
supports full concurrency of retrievals and a high expected concurrency
for updates. It utilizes a lock-free concurrent hash table as the central
key-value storage. It performs a best-effort bounding of the map using an entry
replacement algorithm to determine which entries to evict when the capacity is
exceeded.
To use this cache, enable a crate feature called “future”.
§Table of Contents
- Example:
insert
,get
andinvalidate
- Avoiding to clone the value at
get
- Sharing a cache across asynchronous tasks
- Hashing Algorithm
- Example: Size-based Eviction
- Example: Time-based Expirations
- Example: Eviction Listener
§Example: insert
, get
and invalidate
Cache entries are manually added using insert
of
get_with
method, and are stored in the cache until either
evicted or manually invalidated:
Here’s an example of reading and updating a cache by using multiple asynchronous tasks with Tokio runtime:
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures-util = "0.3"
use moka::future::Cache;
#[tokio::main]
async fn main() {
const NUM_TASKS: usize = 16;
const NUM_KEYS_PER_TASK: usize = 64;
fn value(n: usize) -> String {
format!("value {n}")
}
// Create a cache that can store up to 10,000 entries.
let cache = Cache::new(10_000);
// Spawn async tasks and write to and read from the cache.
let tasks: Vec<_> = (0..NUM_TASKS)
.map(|i| {
// To share the same cache across the async tasks, clone it.
// This is a cheap operation.
let my_cache = cache.clone();
let start = i * NUM_KEYS_PER_TASK;
let end = (i + 1) * NUM_KEYS_PER_TASK;
tokio::spawn(async move {
// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
my_cache.insert(key, value(key)).await;
// get() returns Option<String>, a clone of the stored value.
assert_eq!(my_cache.get(&key).await, Some(value(key)));
}
// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
my_cache.invalidate(&key).await;
}
})
})
.collect();
// Wait for all tasks to complete.
futures_util::future::join_all(tasks).await;
// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
if key % 4 == 0 {
assert_eq!(cache.get(&key).await, None);
} else {
assert_eq!(cache.get(&key).await, Some(value(key)));
}
}
}
If you want to atomically initialize and insert a value when the key is not
present, you might want to check other insertion methods
get_with
and try_get_with
.
§Avoiding to clone the value at get
The return type of get
method is Option<V>
instead of Option<&V>
. Every
time get
is called for an existing key, it creates a clone of the stored value
V
and returns it. This is because the Cache
allows concurrent updates from
threads so a value stored in the cache can be dropped or replaced at any time by
any other thread. get
cannot return a reference &V
as it is impossible to
guarantee the value outlives the reference.
If you want to store values that will be expensive to clone, wrap them by
std::sync::Arc
before storing in a cache. Arc
is a
thread-safe reference-counted pointer and its clone()
method is cheap.
§Sharing a cache across asynchronous tasks
To share a cache across async tasks (or OS threads), do one of the followings:
- Create a clone of the cache by calling its
clone
method and pass it to other task. - If you are using a web application framework such as Actix Web or Axum, you can
store a cache in Actix Web’s
web::Data
or Axum’s shared state, and access it from each request handler. - Wrap the cache by a
sync::OnceCell
orsync::Lazy
from once_cell create, and set it to astatic
variable.
Cloning is a cheap operation for Cache
as it only creates thread-safe
reference-counted pointers to the internal data structures.
§No lock is needed
Don’t wrap a Cache
by a lock such as Mutex
or RwLock
. All methods provided
by the Cache
are considered thread-safe, and can be safely called by multiple
async tasks at the same time. No lock is needed.
§Hashing Algorithm
By default, Cache
uses a hashing algorithm selected to provide resistance
against HashDoS attacks. It will be the same one used by
std::collections::HashMap
, which is currently SipHash 1-3.
While SipHash’s performance is very competitive for medium sized keys, other hashing algorithms will outperform it for small keys such as integers as well as large keys such as long strings. However those algorithms will typically not protect against attacks such as HashDoS.
The hashing algorithm can be replaced on a per-Cache
basis using the
build_with_hasher
method of the CacheBuilder
.
Many alternative algorithms are available on crates.io, such as the
AHash crate.
§Example: Size-based Eviction
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures-util = "0.3"
use moka::future::Cache;
#[tokio::main]
async fn main() {
// Evict based on the number of entries in the cache.
let cache = Cache::builder()
// Up to 10,000 entries.
.max_capacity(10_000)
// Create the cache.
.build();
cache.insert(1, "one".to_string()).await;
// Evict based on the byte length of strings in the cache.
let cache = Cache::builder()
// A weigher closure takes &K and &V and returns a u32
// representing the relative size of the entry.
.weigher(|_key, value: &String| -> u32 {
value.len().try_into().unwrap_or(u32::MAX)
})
// This cache will hold up to 32MiB of values.
.max_capacity(32 * 1024 * 1024)
.build();
cache.insert(2, "two".to_string()).await;
}
If your cache should not grow beyond a certain size, use the max_capacity
method of the CacheBuilder
to set the upper bound. The cache
will try to evict entries that have not been used recently or very often.
At the cache creation time, a weigher closure can be set by the weigher
method
of the CacheBuilder
. A weigher closure takes &K
and &V
as the arguments and
returns a u32
representing the relative size of the entry:
- If the
weigher
is not set, the cache will treat each entry has the same size of1
. This means the cache will be bounded by the number of entries. - If the
weigher
is set, the cache will call the weigher to calculate the weighted size (relative size) on an entry. This means the cache will be bounded by the total weighted size of entries.
Note that weighted sizes are not used when making eviction selections.
§Example: Time-based Expirations
§Cache-level TTL and TTI policies
Cache
supports the following cache-level expiration policies:
- Time to live (TTL): A cached entry will be expired after the specified
duration past from
insert
. - Time to idle (TTI): A cached entry will be expired after the specified
duration past from
get
orinsert
.
They are a cache-level expiration policies; all entries in the cache will have the same TTL and/or TTI durations. If you want to set different expiration durations for different entries, see the next section.
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures-util = "0.3"
use moka::future::Cache;
use std::time::Duration;
#[tokio::main]
async fn main() {
let cache = Cache::builder()
// Time to live (TTL): 30 minutes
.time_to_live(Duration::from_secs(30 * 60))
// Time to idle (TTI): 5 minutes
.time_to_idle(Duration::from_secs( 5 * 60))
// Create the cache.
.build();
// This entry will expire after 5 minutes (TTI) if there is no get().
cache.insert(0, "zero").await;
// This get() will extend the entry life for another 5 minutes.
cache.get(&0);
// Even though we keep calling get(), the entry will expire
// after 30 minutes (TTL) from the insert().
}
§Per-entry expiration policy
Cache
supports per-entry expiration policy through the Expiry
trait.
Expiry
trait provides three callback methods:
expire_after_create
, expire_after_read
and
expire_after_update
. When a cache entry is inserted, read or
updated, one of these methods is called. These methods return an
Option<Duration>
, which is used as the expiration duration of the entry.
Expiry
trait provides the default implementations of these methods, so you will
implement only the methods you want to customize.
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros", "time" ] }
use moka::{future::Cache, Expiry};
use std::time::{Duration, Instant};
// In this example, we will create a `future::Cache` with `u32` as the key, and
// `(Expiration, String)` as the value. `Expiration` is an enum to represent the
// expiration of the value, and `String` is the application data of the value.
/// An enum to represent the expiration of a value.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Expiration {
/// The value never expires.
Never,
/// The value expires after a short time. (5 seconds in this example)
AfterShortTime,
/// The value expires after a long time. (15 seconds in this example)
AfterLongTime,
}
impl Expiration {
/// Returns the duration of this expiration.
pub fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::Never => None,
Expiration::AfterShortTime => Some(Duration::from_secs(5)),
Expiration::AfterLongTime => Some(Duration::from_secs(15)),
}
}
}
/// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
/// default implementations of three callback methods `expire_after_create`,
/// `expire_after_read`, and `expire_after_update`.
///
/// In this example, we only override the `expire_after_create` method.
pub struct MyExpiry;
impl Expiry<u32, (Expiration, String)> for MyExpiry {
/// Returns the duration of the expiration of the value that was just
/// created.
fn expire_after_create(
&self,
_key: &u32,
value: &(Expiration, String),
_current_time: Instant,
) -> Option<Duration> {
let duration = value.0.as_duration();
println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
duration
}
}
#[tokio::main]
async fn main() {
// Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
// eviction listener.
let expiry = MyExpiry;
let eviction_listener = |key, _value, cause| {
println!("Evicted key {key}. Cause: {cause:?}");
};
let cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.eviction_listener(eviction_listener)
.build();
// Insert some entries into the cache with different expirations.
cache
.get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
.await;
cache
.get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
.await;
cache
.get_with(2, async { (Expiration::Never, "c".to_string()) })
.await;
// Verify that all the inserted entries exist.
assert!(cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Sleep for 6 seconds. Key 0 should expire.
println!("\nSleeping for 6 seconds...\n");
tokio::time::sleep(Duration::from_secs(6)).await;
cache.run_pending_tasks().await;
println!("Entry count: {}", cache.entry_count());
// Verify that key 0 has been evicted.
assert!(!cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Sleep for 10 more seconds. Key 1 should expire.
println!("\nSleeping for 10 seconds...\n");
tokio::time::sleep(Duration::from_secs(10)).await;
cache.run_pending_tasks().await;
println!("Entry count: {}", cache.entry_count());
// Verify that key 1 has been evicted.
assert!(!cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Manually invalidate key 2.
cache.invalidate(&2).await;
assert!(!cache.contains_key(&2));
println!("\nSleeping for a second...\n");
tokio::time::sleep(Duration::from_secs(1)).await;
cache.run_pending_tasks().await;
println!("Entry count: {}", cache.entry_count());
println!("\nDone!");
}
§Example: Eviction Listener
A Cache
can be configured with an eviction listener, a closure that is called
every time there is a cache eviction. The listener takes three parameters: the
key and value of the evicted entry, and the
RemovalCause
to indicate why the
entry was evicted.
An eviction listener can be used to keep other data structures in sync with the cache, for example.
The following example demonstrates how to use an eviction listener with time-to-live expiration to manage the lifecycle of temporary files on a filesystem. The cache stores the paths of the files, and when one of them has expired, the eviction listener will be called with the path, so it can remove the file from the filesystem.
// Cargo.toml
//
// [dependencies]
// anyhow = "1.0"
// uuid = { version = "1.1", features = ["v4"] }
// tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
use moka::{future::Cache, notification::ListenerFuture};
// FutureExt trait provides the boxed method.
use moka::future::FutureExt;
use anyhow::{anyhow, Context};
use std::{
io,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use tokio::{fs, sync::RwLock};
use uuid::Uuid;
/// The DataFileManager writes, reads and removes data files.
struct DataFileManager {
base_dir: PathBuf,
file_count: usize,
}
impl DataFileManager {
fn new(base_dir: PathBuf) -> Self {
Self {
base_dir,
file_count: 0,
}
}
async fn write_data_file(
&mut self,
key: impl AsRef<str>,
contents: String
) -> io::Result<PathBuf> {
// Use the key as a part of the filename.
let mut path = self.base_dir.to_path_buf();
path.push(key.as_ref());
assert!(!path.exists(), "Path already exists: {path:?}");
// create the file at the path and write the contents to the file.
fs::write(&path, contents).await?;
self.file_count += 1;
println!("Created a data file at {path:?} (file count: {})", self.file_count);
Ok(path)
}
async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
// Reads the contents of the file at the path, and return the contents.
fs::read_to_string(path).await
}
async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
// Remove the file at the path.
fs::remove_file(path.as_ref()).await?;
self.file_count -= 1;
println!(
"Removed a data file at {:?} (file count: {})",
path.as_ref(),
self.file_count
);
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create an instance of the DataFileManager and wrap it with
// Arc<RwLock<_>> so it can be shared across threads.
let mut base_dir = std::env::temp_dir();
base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
println!("base_dir: {base_dir:?}");
std::fs::create_dir(&base_dir)?;
let file_mgr = DataFileManager::new(base_dir);
let file_mgr = Arc::new(RwLock::new(file_mgr));
let file_mgr1 = Arc::clone(&file_mgr);
let rt = tokio::runtime::Handle::current();
// Create an eviction listener closure.
let eviction_listener = move |k, v: PathBuf, cause| -> ListenerFuture {
println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
let file_mgr2 = Arc::clone(&file_mgr1);
// Create a Future that removes the data file at the path `v`.
async move {
// Acquire the write lock of the DataFileManager.
let mut mgr = file_mgr2.write().await;
// Remove the data file. We must handle error cases here to
// prevent the listener from panicking.
if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
eprintln!("Failed to remove a data file at {v:?}");
}
}
// Convert the regular Future into ListenerFuture. This method is
// provided by moka::future::FutureExt trait.
.boxed()
};
// Create the cache. Set time to live for two seconds and set the
// eviction listener.
let cache = Cache::builder()
.max_capacity(100)
.time_to_live(Duration::from_secs(2))
.async_eviction_listener(eviction_listener)
.build();
// Insert an entry to the cache.
// This will create and write a data file for the key "user1", store the
// path of the file to the cache, and return it.
println!("== try_get_with()");
let key = "user1";
let path = cache
.try_get_with(key, async {
let mut mgr = file_mgr.write().await;
let path = mgr
.write_data_file(key, "user data".into())
.await
.with_context(|| format!("Failed to create a data file"))?;
Ok(path) as anyhow::Result<_>
})
.await
.map_err(|e| anyhow!("{e}"))?;
// Read the data file at the path and print the contents.
println!("\n== read_data_file()");
{
let mgr = file_mgr.read().await;
let contents = mgr
.read_data_file(path.as_path())
.await
.with_context(|| format!("Failed to read data from {path:?}"))?;
println!("contents: {contents}");
}
// Sleep for five seconds. While sleeping, the cache entry for key "user1"
// will be expired and evicted, so the eviction listener will be called to
// remove the file.
tokio::time::sleep(Duration::from_secs(5)).await;
cache.run_pending_tasks();
Ok(())
}
§You should avoid eviction listener to panic
It is very important to make an eviction listener closure not to panic. Otherwise, the cache will stop calling the listener after a panic. This is an intended behavior because the cache cannot know whether it is memory safe or not to call the panicked listener again.
When a listener panics, the cache will swallow the panic and disable the
listener. If you want to know when a listener panics and the reason of the panic,
you can enable an optional logging
feature of Moka and check error-level logs.
To enable the logging
, do the followings:
- In
Cargo.toml
, add the crate featurelogging
formoka
. - Set the logging level for
moka
toerror
or any lower levels (warn
,info
, …):- If you are using the
env_logger
crate, you can achieve this by settingRUST_LOG
environment variable tomoka=error
.
- If you are using the
- If you have more than one caches, you may want to set a distinct name for each
cache by using cache builder’s
name
method. The name will appear in the log.
Implementations§
Source§impl<K, V, S> Cache<K, V, S>
impl<K, V, S> Cache<K, V, S>
Sourcepub fn policy(&self) -> Policy
pub fn policy(&self) -> Policy
Returns a read-only cache policy of this cache.
At this time, cache policy cannot be modified after cache creation. A future version may support to modify it.
Sourcepub fn entry_count(&self) -> u64
pub fn entry_count(&self) -> u64
Returns an approximate number of entries in this cache.
The value returned is an estimate; the actual count may differ if there are
concurrent insertions or removals, or if some entries are pending removal due
to expiration. This inaccuracy can be mitigated by calling
run_pending_tasks
first.
§Example
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
#[tokio::main]
async fn main() {
let cache = Cache::new(10);
cache.insert('n', "Netherland Dwarf").await;
cache.insert('l', "Lop Eared").await;
cache.insert('d', "Dutch").await;
// Ensure an entry exists.
assert!(cache.contains_key(&'n'));
// However, followings may print stale number zeros instead of threes.
println!("{}", cache.entry_count()); // -> 0
println!("{}", cache.weighted_size()); // -> 0
// To mitigate the inaccuracy, call `run_pending_tasks` to run pending
// internal tasks.
cache.run_pending_tasks().await;
// Followings will print the actual numbers.
println!("{}", cache.entry_count()); // -> 3
println!("{}", cache.weighted_size()); // -> 3
}
Sourcepub fn weighted_size(&self) -> u64
pub fn weighted_size(&self) -> u64
Returns an approximate total weighted size of entries in this cache.
The value returned is an estimate; the actual size may differ if there are
concurrent insertions or removals, or if some entries are pending removal due
to expiration. This inaccuracy can be mitigated by calling
run_pending_tasks
first. See entry_count
for a
sample code.
Source§impl<K, V> Cache<K, V>
impl<K, V> Cache<K, V>
Sourcepub fn new(max_capacity: u64) -> Cache<K, V>
pub fn new(max_capacity: u64) -> Cache<K, V>
Constructs a new Cache<K, V>
that will store up to the max_capacity
.
To adjust various configuration knobs such as initial_capacity
or
time_to_live
, use the CacheBuilder
.
Sourcepub fn builder() -> CacheBuilder<K, V, Cache<K, V>>
pub fn builder() -> CacheBuilder<K, V, Cache<K, V>>
Returns a CacheBuilder
, which can builds a Cache
with
various configuration knobs.
Source§impl<K, V, S> Cache<K, V, S>
impl<K, V, S> Cache<K, V, S>
Sourcepub fn contains_key<Q>(&self, key: &Q) -> bool
pub fn contains_key<Q>(&self, key: &Q) -> bool
Returns true
if the cache contains a value for the key.
Unlike the get
method, this method is not considered a cache read operation,
so it does not update the historic popularity estimator or reset the idle
timer for the key.
The key may be any borrowed form of the cache’s key type, but Hash
and Eq
on the borrowed form must match those for the key type.
Sourcepub async fn get<Q>(&self, key: &Q) -> Option<V>
pub async fn get<Q>(&self, key: &Q) -> Option<V>
Returns a clone of the value corresponding to the key.
If you want to store values that will be expensive to clone, wrap them by
std::sync::Arc
before storing in a cache. Arc
is a
thread-safe reference-counted pointer and its clone()
method is cheap.
The key may be any borrowed form of the cache’s key type, but Hash
and Eq
on the borrowed form must match those for the key type.
Sourcepub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
Takes a key K
and returns an OwnedKeyEntrySelector
that can be used to
select or insert an entry.
§Example
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
#[tokio::main]
async fn main() {
let cache: Cache<String, u32> = Cache::new(100);
let key = "key1".to_string();
let entry = cache.entry(key.clone()).or_insert(3).await;
assert!(entry.is_fresh());
assert_eq!(entry.key(), &key);
assert_eq!(entry.into_value(), 3);
let entry = cache.entry(key).or_insert(6).await;
// Not fresh because the value was already in the cache.
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), 3);
}
Sourcepub fn entry_by_ref<'a, Q>(
&'a self,
key: &'a Q,
) -> RefKeyEntrySelector<'a, K, Q, V, S>
pub fn entry_by_ref<'a, Q>( &'a self, key: &'a Q, ) -> RefKeyEntrySelector<'a, K, Q, V, S>
Takes a reference &Q
of a key and returns an RefKeyEntrySelector
that
can be used to select or insert an entry.
§Example
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
#[tokio::main]
async fn main() {
let cache: Cache<String, u32> = Cache::new(100);
let key = "key1".to_string();
let entry = cache.entry_by_ref(&key).or_insert(3).await;
assert!(entry.is_fresh());
assert_eq!(entry.key(), &key);
assert_eq!(entry.into_value(), 3);
let entry = cache.entry_by_ref(&key).or_insert(6).await;
// Not fresh because the value was already in the cache.
assert!(!entry.is_fresh());
assert_eq!(entry.into_value(), 3);
}
Sourcepub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V
pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V
Returns a clone of the value corresponding to the key. If the value does
not exist, resolve the init
future and inserts the output.
§Concurrent calls on the same key
This method guarantees that concurrent calls on the same not-existing key are
coalesced into one evaluation of the init
future. Only one of the calls
evaluates its future, and other calls wait for that future to resolve.
The following code snippet demonstrates this behavior:
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// futures-util = "0.3"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
use std::sync::Arc;
#[tokio::main]
async fn main() {
const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
let cache = Cache::new(100);
// Spawn four async tasks.
let tasks: Vec<_> = (0..4_u8)
.map(|task_id| {
let my_cache = cache.clone();
tokio::spawn(async move {
println!("Task {task_id} started.");
// Insert and get the value for key1. Although all four async
// tasks will call `get_with` at the same time, the `init`
// async block must be resolved only once.
let value = my_cache
.get_with("key1", async move {
println!("Task {task_id} inserting a value.");
Arc::new(vec![0u8; TEN_MIB])
})
.await;
// Ensure the value exists now.
assert_eq!(value.len(), TEN_MIB);
assert!(my_cache.get(&"key1").await.is_some());
println!("Task {task_id} got the value. (len: {})", value.len());
})
})
.collect();
// Run all tasks concurrently and wait for them to complete.
futures_util::future::join_all(tasks).await;
}
A Sample Result
- The
init
future (async black) was resolved exactly once by task 3. - Other tasks were blocked until task 3 inserted the value.
Task 0 started.
Task 3 started.
Task 1 started.
Task 2 started.
Task 3 inserting a value.
Task 3 got the value. (len: 10485760)
Task 0 got the value. (len: 10485760)
Task 1 got the value. (len: 10485760)
Task 2 got the value. (len: 10485760)
§Panics
This method panics when the init
future has panicked. When it happens, only
the caller whose init
future panicked will get the panic (e.g. only task 3
in the above sample). If there are other calls in progress (e.g. task 0, 1
and 2 above), this method will restart and resolve one of the remaining
init
futures.
Sourcepub async fn get_with_by_ref<Q>(
&self,
key: &Q,
init: impl Future<Output = V>,
) -> V
pub async fn get_with_by_ref<Q>( &self, key: &Q, init: impl Future<Output = V>, ) -> V
Similar to get_with
, but instead of passing an owned
key, you can pass a reference to the key. If the key does not exist in the
cache, the key will be cloned to create new entry in the cache.
Sourcepub async fn get_with_if(
&self,
key: K,
init: impl Future<Output = V>,
replace_if: impl FnMut(&V) -> bool + Send,
) -> V
👎Deprecated since 0.10.0: Replaced with entry().or_insert_with_if()
pub async fn get_with_if( &self, key: K, init: impl Future<Output = V>, replace_if: impl FnMut(&V) -> bool + Send, ) -> V
entry().or_insert_with_if()
TODO: Remove this in v0.13.0.
Deprecated, replaced with
entry()::or_insert_with_if()
Sourcepub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
Returns a clone of the value corresponding to the key. If the value does
not exist, resolves the init
future, and inserts the value if Some(value)
was returned. If None
was returned from the future, this method does not
insert a value and returns None
.
§Concurrent calls on the same key
This method guarantees that concurrent calls on the same not-existing key are
coalesced into one evaluation of the init
future. Only one of the calls
evaluates its future, and other calls wait for that future to resolve.
The following code snippet demonstrates this behavior:
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// futures-util = "0.3"
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
// This async function tries to get HTML from the given URI.
async fn get_html(task_id: u8, uri: &str) -> Option<String> {
println!("get_html() called by task {task_id}.");
reqwest::get(uri).await.ok()?.text().await.ok()
}
#[tokio::main]
async fn main() {
let cache = Cache::new(100);
// Spawn four async tasks.
let tasks: Vec<_> = (0..4_u8)
.map(|task_id| {
let my_cache = cache.clone();
tokio::spawn(async move {
println!("Task {task_id} started.");
// Try to insert and get the value for key1. Although
// all four async tasks will call `try_get_with`
// at the same time, get_html() must be called only once.
let value = my_cache
.optionally_get_with(
"key1",
get_html(task_id, "https://www.rust-lang.org"),
).await;
// Ensure the value exists now.
assert!(value.is_some());
assert!(my_cache.get(&"key1").await.is_some());
println!(
"Task {task_id} got the value. (len: {})",
value.unwrap().len()
);
})
})
.collect();
// Run all tasks concurrently and wait for them to complete.
futures_util::future::join_all(tasks).await;
}
A Sample Result
get_html()
was called exactly once by task 2.- Other tasks were blocked until task 2 inserted the value.
Task 1 started.
Task 0 started.
Task 2 started.
Task 3 started.
get_html() called by task 2.
Task 2 got the value. (len: 19419)
Task 1 got the value. (len: 19419)
Task 0 got the value. (len: 19419)
Task 3 got the value. (len: 19419)
§Panics
This method panics when the init
future has panicked. When it happens, only
the caller whose init
future panicked will get the panic (e.g. only task 2
in the above sample). If there are other calls in progress (e.g. task 0, 1
and 3 above), this method will restart and resolve one of the remaining
init
futures.
Sourcepub async fn optionally_get_with_by_ref<F, Q>(
&self,
key: &Q,
init: F,
) -> Option<V>
pub async fn optionally_get_with_by_ref<F, Q>( &self, key: &Q, init: F, ) -> Option<V>
Similar to optionally_get_with
, but instead
of passing an owned key, you can pass a reference to the key. If the key does
not exist in the cache, the key will be cloned to create new entry in the
cache.
Sourcepub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
Returns a clone of the value corresponding to the key. If the value does
not exist, resolves the init
future, and inserts the value if Ok(value)
was returned. If Err(_)
was returned from the future, this method does not
insert a value and returns the Err
wrapped by std::sync::Arc
.
§Concurrent calls on the same key
This method guarantees that concurrent calls on the same not-existing key are
coalesced into one evaluation of the init
future (as long as these
futures return the same error type). Only one of the calls evaluates its
future, and other calls wait for that future to resolve.
The following code snippet demonstrates this behavior:
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// futures-util = "0.3"
// reqwest = "0.11"
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
// This async function tries to get HTML from the given URI.
async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
println!("get_html() called by task {task_id}.");
reqwest::get(uri).await?.text().await
}
#[tokio::main]
async fn main() {
let cache = Cache::new(100);
// Spawn four async tasks.
let tasks: Vec<_> = (0..4_u8)
.map(|task_id| {
let my_cache = cache.clone();
tokio::spawn(async move {
println!("Task {task_id} started.");
// Try to insert and get the value for key1. Although
// all four async tasks will call `try_get_with`
// at the same time, get_html() must be called only once.
let value = my_cache
.try_get_with(
"key1",
get_html(task_id, "https://www.rust-lang.org"),
).await;
// Ensure the value exists now.
assert!(value.is_ok());
assert!(my_cache.get(&"key1").await.is_some());
println!(
"Task {task_id} got the value. (len: {})",
value.unwrap().len()
);
})
})
.collect();
// Run all tasks concurrently and wait for them to complete.
futures_util::future::join_all(tasks).await;
}
A Sample Result
get_html()
was called exactly once by task 2.- Other tasks were blocked until task 2 inserted the value.
Task 1 started.
Task 0 started.
Task 2 started.
Task 3 started.
get_html() called by task 2.
Task 2 got the value. (len: 19419)
Task 1 got the value. (len: 19419)
Task 0 got the value. (len: 19419)
Task 3 got the value. (len: 19419)
§Panics
This method panics when the init
future has panicked. When it happens, only
the caller whose init
future panicked will get the panic (e.g. only task 2
in the above sample). If there are other calls in progress (e.g. task 0, 1
and 3 above), this method will restart and resolve one of the remaining
init
futures.
Sourcepub async fn try_get_with_by_ref<F, E, Q>(
&self,
key: &Q,
init: F,
) -> Result<V, Arc<E>>
pub async fn try_get_with_by_ref<F, E, Q>( &self, key: &Q, init: F, ) -> Result<V, Arc<E>>
Similar to try_get_with
, but instead of passing an
owned key, you can pass a reference to the key. If the key does not exist in
the cache, the key will be cloned to create new entry in the cache.
Sourcepub async fn insert(&self, key: K, value: V)
pub async fn insert(&self, key: K, value: V)
Inserts a key-value pair into the cache.
If the cache has this key present, the value is updated.
Sourcepub async fn invalidate<Q>(&self, key: &Q)
pub async fn invalidate<Q>(&self, key: &Q)
Discards any cached value for the key.
If you need to get the value that has been discarded, use the
remove
method instead.
The key may be any borrowed form of the cache’s key type, but Hash
and Eq
on the borrowed form must match those for the key type.
Sourcepub async fn remove<Q>(&self, key: &Q) -> Option<V>
pub async fn remove<Q>(&self, key: &Q) -> Option<V>
Discards any cached value for the key and returns a clone of the value.
If you do not need to get the value that has been discarded, use the
invalidate
method instead.
The key may be any borrowed form of the cache’s key type, but Hash
and Eq
on the borrowed form must match those for the key type.
Sourcepub fn invalidate_all(&self)
pub fn invalidate_all(&self)
Discards all cached values.
This method returns immediately and a background thread will evict all the
cached values inserted before the time when this method was called. It is
guaranteed that the get
method must not return these invalidated values
even if they have not been evicted.
Like the invalidate
method, this method does not clear the historic
popularity estimator of keys so that it retains the client activities of
trying to retrieve an item.
Sourcepub fn invalidate_entries_if<F>(
&self,
predicate: F,
) -> Result<String, PredicateError>
pub fn invalidate_entries_if<F>( &self, predicate: F, ) -> Result<String, PredicateError>
Discards cached values that satisfy a predicate.
invalidate_entries_if
takes a closure that returns true
or false
. This
method returns immediately and a background thread will apply the closure to
each cached value inserted before the time when invalidate_entries_if
was
called. If the closure returns true
on a value, that value will be evicted
from the cache.
Also the get
method will apply the closure to a value to determine if it
should have been invalidated. Therefore, it is guaranteed that the get
method must not return invalidated values.
Note that you must call
CacheBuilder::support_invalidation_closures
at the cache creation time as the cache needs to maintain additional internal
data structures to support this method. Otherwise, calling this method will
fail with a
PredicateError::InvalidationClosuresDisabled
.
Like the invalidate
method, this method does not clear the historic
popularity estimator of keys so that it retains the client activities of
trying to retrieve an item.
Sourcepub fn iter(&self) -> Iter<'_, K, V>
pub fn iter(&self) -> Iter<'_, K, V>
Creates an iterator visiting all key-value pairs in arbitrary order. The
iterator element type is (Arc<K>, V)
, where V
is a clone of a stored
value.
Iterators do not block concurrent reads and writes on the cache. An entry can be inserted to, invalidated or evicted from a cache while iterators are alive on the same cache.
Unlike the get
method, visiting entries via an iterator do not update the
historic popularity estimator or reset idle timers for keys.
§Guarantees
In order to allow concurrent access to the cache, iterator’s next
method
does not guarantee the following:
- It does not guarantee to return a key-value pair (an entry) if its key has
been inserted to the cache after the iterator was created.
- Such an entry may or may not be returned depending on key’s hash and timing.
and the next
method guarantees the followings:
- It guarantees not to return the same entry more than once.
- It guarantees not to return an entry if it has been removed from the cache
after the iterator was created.
- Note: An entry can be removed by following reasons:
- Manually invalidated.
- Expired (e.g. time-to-live).
- Evicted as the cache capacity exceeded.
- Note: An entry can be removed by following reasons:
§Examples
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.12", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
use moka::future::Cache;
#[tokio::main]
async fn main() {
let cache = Cache::new(100);
cache.insert("Julia", 14).await;
let mut iter = cache.iter();
let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
assert_eq!(*k, "Julia");
assert_eq!(v, 14);
assert!(iter.next().is_none());
}
Sourcepub async fn run_pending_tasks(&self)
pub async fn run_pending_tasks(&self)
Performs any pending maintenance operations needed by the cache.
Trait Implementations§
Source§impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
impl<K, V, S> Send for Cache<K, V, S>
impl<K, V, S> Sync for Cache<K, V, S>
Auto Trait Implementations§
impl<K, V, S> Freeze for Cache<K, V, S>
impl<K, V, S = RandomState> !RefUnwindSafe for Cache<K, V, S>
impl<K, V, S> Unpin for Cache<K, V, S>where
V: Unpin,
impl<K, V, S = RandomState> !UnwindSafe for Cache<K, V, S>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more