#![allow(dead_code)]
use crate::client::retries::RetryPartition;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::debug;
#[non_exhaustive]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct ClientRateLimiterPartition {
retry_partition: RetryPartition,
}
impl ClientRateLimiterPartition {
pub fn new(retry_partition: RetryPartition) -> Self {
Self { retry_partition }
}
}
const RETRY_COST: f64 = 5.0;
const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
const INITIAL_REQUEST_COST: f64 = 1.0;
const MIN_FILL_RATE: f64 = 0.5;
const MIN_CAPACITY: f64 = 1.0;
const SMOOTH: f64 = 0.8;
const BETA: f64 = 0.7;
const SCALE_CONSTANT: f64 = 0.4;
#[derive(Clone, Debug)]
pub struct ClientRateLimiter {
inner: Arc<Mutex<Inner>>,
}
#[derive(Debug)]
pub(crate) struct Inner {
fill_rate: f64,
max_capacity: f64,
current_capacity: f64,
last_timestamp: Option<f64>,
enabled: bool,
measured_tx_rate: f64,
last_tx_rate_bucket: f64,
request_count: u64,
last_max_rate: f64,
time_of_last_throttle: f64,
}
pub(crate) enum RequestReason {
Retry,
RetryTimeout,
InitialRequest,
}
impl ClientRateLimiter {
pub fn new(seconds_since_unix_epoch: f64) -> Self {
Self::builder()
.tokens_retrieved_per_second(MIN_FILL_RATE)
.time_of_last_throttle(seconds_since_unix_epoch)
.previous_time_bucket(seconds_since_unix_epoch.floor())
.build()
}
fn builder() -> Builder {
Builder::new()
}
pub(crate) fn acquire_permission_to_send_a_request(
&self,
seconds_since_unix_epoch: f64,
kind: RequestReason,
) -> Result<(), Duration> {
let mut it = self.inner.lock().unwrap();
if !it.enabled {
return Ok(());
}
let amount = match kind {
RequestReason::Retry => RETRY_COST,
RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
RequestReason::InitialRequest => INITIAL_REQUEST_COST,
};
it.refill(seconds_since_unix_epoch);
let res = if amount > it.current_capacity {
let sleep_time = (amount - it.current_capacity) / it.fill_rate;
debug!(
amount,
it.current_capacity,
it.fill_rate,
sleep_time,
"client rate limiter delayed a request"
);
Err(Duration::from_secs_f64(sleep_time))
} else {
Ok(())
};
it.current_capacity -= amount;
res
}
pub(crate) fn update_rate_limiter(
&self,
seconds_since_unix_epoch: f64,
is_throttling_error: bool,
) {
let mut it = self.inner.lock().unwrap();
it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
let calculated_rate;
if is_throttling_error {
let rate_to_use = if it.enabled {
f64::min(it.measured_tx_rate, it.fill_rate)
} else {
it.measured_tx_rate
};
it.last_max_rate = rate_to_use;
it.calculate_time_window();
it.time_of_last_throttle = seconds_since_unix_epoch;
calculated_rate = cubic_throttle(rate_to_use);
it.enable_token_bucket();
} else {
it.calculate_time_window();
calculated_rate = it.cubic_success(seconds_since_unix_epoch);
}
let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
}
}
impl Inner {
fn refill(&mut self, seconds_since_unix_epoch: f64) {
if let Some(last_timestamp) = self.last_timestamp {
let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
self.current_capacity =
f64::min(self.max_capacity, self.current_capacity + fill_amount);
debug!(
fill_amount,
self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
);
}
self.last_timestamp = Some(seconds_since_unix_epoch);
}
fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
self.refill(seconds_since_unix_epoch);
self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
debug!(
fill_rate = self.fill_rate,
max_capacity = self.max_capacity,
current_capacity = self.current_capacity,
measured_tx_rate = self.measured_tx_rate,
"client rate limiter state has been updated"
);
self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
}
fn enable_token_bucket(&mut self) {
if !self.enabled {
debug!("client rate limiting has been enabled");
}
self.enabled = true;
}
fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
self.request_count += 1;
if next_time_bucket > self.last_tx_rate_bucket {
let current_rate =
self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
self.request_count = 0;
self.last_tx_rate_bucket = next_time_bucket;
}
}
fn calculate_time_window(&self) -> f64 {
let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
base.powf(1.0 / 3.0)
}
fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
let dt =
seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
(SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
}
}
fn cubic_throttle(rate_to_use: f64) -> f64 {
rate_to_use * BETA
}
#[derive(Clone, Debug, Default)]
struct Builder {
token_refill_rate: Option<f64>,
maximum_bucket_capacity: Option<f64>,
current_bucket_capacity: Option<f64>,
time_of_last_refill: Option<f64>,
tokens_retrieved_per_second: Option<f64>,
previous_time_bucket: Option<f64>,
request_count: Option<u64>,
enable_throttling: Option<bool>,
tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
time_of_last_throttle: Option<f64>,
}
impl Builder {
fn new() -> Self {
Builder::default()
}
fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
self.token_refill_rate = token_refill_rate;
self
}
fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
self.token_refill_rate = Some(token_refill_rate);
self
}
fn set_maximum_bucket_capacity(&mut self, maximum_bucket_capacity: Option<f64>) -> &mut Self {
self.maximum_bucket_capacity = maximum_bucket_capacity;
self
}
fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
self.maximum_bucket_capacity = Some(maximum_bucket_capacity);
self
}
fn set_current_bucket_capacity(&mut self, current_bucket_capacity: Option<f64>) -> &mut Self {
self.current_bucket_capacity = current_bucket_capacity;
self
}
fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
self.current_bucket_capacity = Some(current_bucket_capacity);
self
}
fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
self.time_of_last_refill = time_of_last_refill;
self
}
fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
self.time_of_last_refill = Some(time_of_last_refill);
self
}
fn set_tokens_retrieved_per_second(
&mut self,
tokens_retrieved_per_second: Option<f64>,
) -> &mut Self {
self.tokens_retrieved_per_second = tokens_retrieved_per_second;
self
}
fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
self.tokens_retrieved_per_second = Some(tokens_retrieved_per_second);
self
}
fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
self.previous_time_bucket = previous_time_bucket;
self
}
fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
self.previous_time_bucket = Some(previous_time_bucket);
self
}
fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
self.request_count = request_count;
self
}
fn request_count(mut self, request_count: u64) -> Self {
self.request_count = Some(request_count);
self
}
fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
self.enable_throttling = enable_throttling;
self
}
fn enable_throttling(mut self, enable_throttling: bool) -> Self {
self.enable_throttling = Some(enable_throttling);
self
}
fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
&mut self,
tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
) -> &mut Self {
self.tokens_retrieved_per_second_at_time_of_last_throttle =
tokens_retrieved_per_second_at_time_of_last_throttle;
self
}
fn tokens_retrieved_per_second_at_time_of_last_throttle(
mut self,
tokens_retrieved_per_second_at_time_of_last_throttle: f64,
) -> Self {
self.tokens_retrieved_per_second_at_time_of_last_throttle =
Some(tokens_retrieved_per_second_at_time_of_last_throttle);
self
}
fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
self.time_of_last_throttle = time_of_last_throttle;
self
}
fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
self.time_of_last_throttle = Some(time_of_last_throttle);
self
}
fn build(self) -> ClientRateLimiter {
ClientRateLimiter {
inner: Arc::new(Mutex::new(Inner {
fill_rate: self.token_refill_rate.unwrap_or_default(),
max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
current_capacity: self.current_bucket_capacity.unwrap_or_default(),
last_timestamp: self.time_of_last_refill,
enabled: self.enable_throttling.unwrap_or_default(),
measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
request_count: self.request_count.unwrap_or_default(),
last_max_rate: self
.tokens_retrieved_per_second_at_time_of_last_throttle
.unwrap_or_default(),
time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
})),
}
}
}
#[cfg(test)]
mod tests {
use super::{cubic_throttle, ClientRateLimiter};
use crate::client::retries::client_rate_limiter::RequestReason;
use approx::assert_relative_eq;
use aws_smithy_async::rt::sleep::AsyncSleep;
use aws_smithy_async::test_util::instant_time_and_sleep;
use std::time::{Duration, SystemTime};
const ONE_SECOND: Duration = Duration::from_secs(1);
const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
#[test]
fn should_match_beta_decrease() {
let new_rate = cubic_throttle(10.0);
assert_relative_eq!(new_rate, 7.0);
let rate_limiter = ClientRateLimiter::builder()
.tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
.time_of_last_throttle(1.0)
.build();
rate_limiter.inner.lock().unwrap().calculate_time_window();
let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
assert_relative_eq!(new_rate, 7.0);
}
#[tokio::test]
async fn throttling_is_enabled_once_throttling_error_is_received() {
let rate_limiter = ClientRateLimiter::builder()
.previous_time_bucket(0.0)
.time_of_last_throttle(0.0)
.build();
assert!(
!rate_limiter.inner.lock().unwrap().enabled,
"rate_limiter should be disabled by default"
);
rate_limiter.update_rate_limiter(0.0, true);
assert!(
rate_limiter.inner.lock().unwrap().enabled,
"rate_limiter should be enabled after throttling error"
);
}
#[tokio::test]
async fn test_calculated_rate_with_successes() {
let rate_limiter = ClientRateLimiter::builder()
.time_of_last_throttle(5.0)
.tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
.build();
struct Attempt {
seconds_since_unix_epoch: f64,
expected_calculated_rate: f64,
}
let attempts = [
Attempt {
seconds_since_unix_epoch: 5.0,
expected_calculated_rate: 7.0,
},
Attempt {
seconds_since_unix_epoch: 6.0,
expected_calculated_rate: 9.64893600966,
},
Attempt {
seconds_since_unix_epoch: 7.0,
expected_calculated_rate: 10.000030849917364,
},
Attempt {
seconds_since_unix_epoch: 8.0,
expected_calculated_rate: 10.453284520772092,
},
Attempt {
seconds_since_unix_epoch: 9.0,
expected_calculated_rate: 13.408697022224185,
},
Attempt {
seconds_since_unix_epoch: 10.0,
expected_calculated_rate: 21.26626835427364,
},
Attempt {
seconds_since_unix_epoch: 11.0,
expected_calculated_rate: 36.425998516920465,
},
];
for attempt in attempts {
rate_limiter.inner.lock().unwrap().calculate_time_window();
let calculated_rate = rate_limiter
.inner
.lock()
.unwrap()
.cubic_success(attempt.seconds_since_unix_epoch);
assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
}
}
#[tokio::test]
async fn test_calculated_rate_with_throttles() {
let rate_limiter = ClientRateLimiter::builder()
.tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
.time_of_last_throttle(5.0)
.build();
struct Attempt {
throttled: bool,
seconds_since_unix_epoch: f64,
expected_calculated_rate: f64,
}
let attempts = [
Attempt {
throttled: false,
seconds_since_unix_epoch: 5.0,
expected_calculated_rate: 7.0,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 6.0,
expected_calculated_rate: 9.64893600966,
},
Attempt {
throttled: true,
seconds_since_unix_epoch: 7.0,
expected_calculated_rate: 6.754255206761999,
},
Attempt {
throttled: true,
seconds_since_unix_epoch: 8.0,
expected_calculated_rate: 4.727978644733399,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 9.0,
expected_calculated_rate: 4.670125557970046,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 10.0,
expected_calculated_rate: 4.770870456867401,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 11.0,
expected_calculated_rate: 6.011819748005445,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 12.0,
expected_calculated_rate: 10.792973431384178,
},
];
let mut calculated_rate = 0.0;
for attempt in attempts {
let mut inner = rate_limiter.inner.lock().unwrap();
inner.calculate_time_window();
if attempt.throttled {
calculated_rate = cubic_throttle(calculated_rate);
inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
inner.last_max_rate = calculated_rate;
} else {
calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
};
assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
}
}
#[tokio::test]
async fn test_client_sending_rates() {
let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
let rate_limiter = ClientRateLimiter::builder().build();
struct Attempt {
throttled: bool,
seconds_since_unix_epoch: f64,
expected_tokens_retrieved_per_second: f64,
expected_token_refill_rate: f64,
}
let attempts = [
Attempt {
throttled: false,
seconds_since_unix_epoch: 0.2,
expected_tokens_retrieved_per_second: 0.000000,
expected_token_refill_rate: 0.500000,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 0.4,
expected_tokens_retrieved_per_second: 0.000000,
expected_token_refill_rate: 0.500000,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 0.6,
expected_tokens_retrieved_per_second: 4.800000000000001,
expected_token_refill_rate: 0.500000,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 0.8,
expected_tokens_retrieved_per_second: 4.800000000000001,
expected_token_refill_rate: 0.500000,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 1.0,
expected_tokens_retrieved_per_second: 4.160000,
expected_token_refill_rate: 0.500000,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 1.2,
expected_tokens_retrieved_per_second: 4.160000,
expected_token_refill_rate: 0.691200,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 1.4,
expected_tokens_retrieved_per_second: 4.160000,
expected_token_refill_rate: 1.0975999999999997,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 1.6,
expected_tokens_retrieved_per_second: 5.632000000000001,
expected_token_refill_rate: 1.6384000000000005,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 1.8,
expected_tokens_retrieved_per_second: 5.632000000000001,
expected_token_refill_rate: 2.332800,
},
Attempt {
throttled: true,
seconds_since_unix_epoch: 2.0,
expected_tokens_retrieved_per_second: 4.326400,
expected_token_refill_rate: 3.0284799999999996,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 2.2,
expected_tokens_retrieved_per_second: 4.326400,
expected_token_refill_rate: 3.48663917347026,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 2.4,
expected_tokens_retrieved_per_second: 4.326400,
expected_token_refill_rate: 3.821874416040255,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 2.6,
expected_tokens_retrieved_per_second: 5.665280,
expected_token_refill_rate: 4.053385727709987,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 2.8,
expected_tokens_retrieved_per_second: 5.665280,
expected_token_refill_rate: 4.200373108479454,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 3.0,
expected_tokens_retrieved_per_second: 4.333056,
expected_token_refill_rate: 4.282036558348658,
},
Attempt {
throttled: true,
seconds_since_unix_epoch: 3.2,
expected_tokens_retrieved_per_second: 4.333056,
expected_token_refill_rate: 2.99742559084406,
},
Attempt {
throttled: false,
seconds_since_unix_epoch: 3.4,
expected_tokens_retrieved_per_second: 4.333056,
expected_token_refill_rate: 3.4522263943863463,
},
];
for attempt in attempts {
sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
assert_eq!(
attempt.seconds_since_unix_epoch,
sleep_impl.total_duration().as_secs_f64()
);
rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
assert_relative_eq!(
attempt.expected_tokens_retrieved_per_second,
rate_limiter.inner.lock().unwrap().measured_tx_rate
);
assert_relative_eq!(
attempt.expected_token_refill_rate,
rate_limiter.inner.lock().unwrap().fill_rate
);
}
}
#[tokio::test]
async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
let crl = ClientRateLimiter::builder()
.time_of_last_throttle(0.0)
.previous_time_bucket(0.0)
.build();
crl.update_rate_limiter(0.0, true);
for _i in 0..100 {
let duration = Duration::from_secs_f64(fastrand::f64());
sleep_impl.sleep(duration).await;
if let Err(delay) = crl.acquire_permission_to_send_a_request(
time_source.seconds_since_unix_epoch(),
RequestReason::InitialRequest,
) {
sleep_impl.sleep(delay).await;
}
crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
}
let inner = crl.inner.lock().unwrap();
assert!(inner.enabled, "the rate limiter should still be enabled");
assert_relative_eq!(
inner.last_timestamp.unwrap(),
sleep_impl.total_duration().as_secs_f64(),
max_relative = 0.0001
);
}
}