use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_common::{DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Default)]
pub struct UnboundedMemoryPool {
used: AtomicUsize,
}
impl MemoryPool for UnboundedMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.grow(reservation, additional);
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
used: AtomicUsize,
}
impl GreedyMemoryPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
}
}
}
impl MemoryPool for GreedyMemoryPool {
fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
self.used.fetch_add(additional, Ordering::Relaxed);
}
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
self.used.fetch_sub(shrink, Ordering::Relaxed);
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
self.used
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
let new_used = used + additional;
(new_used <= self.pool_size).then_some(new_used)
})
.map_err(|used| {
insufficient_capacity_err(
reservation,
additional,
self.pool_size.saturating_sub(used),
)
})?;
Ok(())
}
fn reserved(&self) -> usize {
self.used.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
pub struct FairSpillPool {
pool_size: usize,
state: Mutex<FairSpillPoolState>,
}
#[derive(Debug)]
struct FairSpillPoolState {
num_spill: usize,
spillable: usize,
unspillable: usize,
}
impl FairSpillPool {
pub fn new(pool_size: usize) -> Self {
debug!("Created new FairSpillPool(pool_size={pool_size})");
Self {
pool_size,
state: Mutex::new(FairSpillPoolState {
num_spill: 0,
spillable: 0,
unspillable: 0,
}),
}
}
}
impl MemoryPool for FairSpillPool {
fn register(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill += 1;
}
}
fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
let mut state = self.state.lock();
state.num_spill = state.num_spill.checked_sub(1).unwrap();
}
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable += additional,
false => state.unspillable += additional,
}
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable -= shrink,
false => state.unspillable -= shrink,
}
}
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => {
let spill_available = self.pool_size.saturating_sub(state.unspillable);
let available = spill_available
.checked_div(state.num_spill)
.unwrap_or(spill_available);
if reservation.size + additional > available {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.spillable += additional;
}
false => {
let available = self
.pool_size
.saturating_sub(state.unspillable + state.spillable);
if available < additional {
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.unspillable += additional;
}
}
Ok(())
}
fn reserved(&self) -> usize {
let state = self.state.lock();
state.spillable + state.unspillable
}
}
fn insufficient_capacity_err(
reservation: &MemoryReservation,
additional: usize,
available: usize,
) -> DataFusionError {
DataFusionError::ResourcesExhausted(format!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn test_fair() {
let pool = Arc::new(FairSpillPool::new(100)) as _;
let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
r1.grow(2000);
assert_eq!(pool.reserved(), 2000);
let mut r2 = MemoryConsumer::new("r2")
.with_can_spill(true)
.register(&pool);
r2.grow(2000);
assert_eq!(pool.reserved(), 4000);
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");
let err = r2.try_grow(1).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0");
r1.shrink(1990);
r2.shrink(2000);
assert_eq!(pool.reserved(), 10);
r1.try_grow(10).unwrap();
assert_eq!(pool.reserved(), 20);
r2.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r2.shrink(70);
assert_eq!(r1.size(), 20);
assert_eq!(r2.size(), 10);
assert_eq!(pool.reserved(), 30);
let mut r3 = MemoryConsumer::new("r3")
.with_can_spill(true)
.register(&pool);
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");
r2.free();
let err = r3.try_grow(70).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40");
drop(r2);
assert_eq!(pool.reserved(), 20);
r3.try_grow(80).unwrap();
assert_eq!(pool.reserved(), 100);
r1.free();
assert_eq!(pool.reserved(), 80);
let mut r4 = MemoryConsumer::new("s4").register(&pool);
let err = r4.try_grow(30).unwrap_err().strip_backtrace();
assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20");
}
}