kraken_async_rs/rate_limiting/
trading_rate_limits.rsuse crate::rate_limiting::ttl_cache::{TtlCache, TtlEntry};
use crate::request_types::{AddBatchedOrderRequest, EditOrderRequest};
use crate::response_types::VerificationTier;
use async_rate_limit::limiters::VariableCostRateLimiter;
use async_rate_limit::token_bucket::{TokenBucketRateLimiter, TokenBucketState};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
const ORDER_TTL_US: i128 = 300_i128 * 10_i128.pow(6);
#[derive(Debug, Clone)]
pub struct KrakenTradingRateLimiter {
ttl_ref_id_cache: Arc<Mutex<TtlCache<String, i64>>>,
ttl_user_ref_cache: Arc<Mutex<TtlCache<i64, i64>>>,
rate_limiter: TokenBucketRateLimiter,
}
impl KrakenTradingRateLimiter {
pub fn new(user_verification: VerificationTier) -> KrakenTradingRateLimiter {
KrakenTradingRateLimiter {
ttl_ref_id_cache: Default::default(),
ttl_user_ref_cache: Default::default(),
rate_limiter: Self::get_rate_limiter(user_verification),
}
}
pub async fn add_order(&mut self) {
self.rate_limiter.wait_with_cost(100).await;
}
pub async fn add_order_batch(&mut self, add_batched_order_request: &AddBatchedOrderRequest) {
let cost = 1.0 + (add_batched_order_request.orders.len() as f64 / 2.0);
self.rate_limiter
.wait_with_cost((cost * 100.0) as usize)
.await;
}
pub async fn amend_order(&mut self, tx_id: &Option<String>, client_order_id: &Option<String>) {
let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
let request_id = tx_id
.clone()
.or(client_order_id.clone())
.unwrap_or("default_order".to_string());
let order_lifetime = self
.ttl_ref_id_cache
.lock()
.await
.get(&request_id)
.map(|ttl_entry| now_seconds - ttl_entry.data)
.unwrap_or(i64::MAX);
let penalty = Self::amend_order_penalty(order_lifetime);
let cost = (penalty + 1) * 100;
self.rate_limiter.wait_with_cost(cost as usize).await
}
pub async fn edit_order(&mut self, edit_order_request: &EditOrderRequest) {
let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
let tx_id = edit_order_request.tx_id.clone();
let order_lifetime = self
.ttl_ref_id_cache
.lock()
.await
.get(&tx_id)
.map(|ttl_entry| now_seconds - ttl_entry.data)
.unwrap_or(i64::MAX);
let penalty = Self::edit_order_penalty(order_lifetime);
let cost = (penalty + 1) * 100;
self.rate_limiter.wait_with_cost(cost as usize).await
}
pub async fn cancel_order_tx_id(&mut self, id: &String) {
let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
let mut cache_guard = self.ttl_ref_id_cache.lock().await;
let order_lifetime = cache_guard
.get(id)
.map(|ttl_entry| now_seconds - ttl_entry.data)
.unwrap_or(i64::MAX);
drop(cache_guard);
self.cancel_with_penalty(order_lifetime).await;
}
pub async fn cancel_order_user_ref(&mut self, id: &i64) {
let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
let order_lifetime = self
.ttl_user_ref_cache
.lock()
.await
.get(id)
.map(|ttl_entry| now_seconds - ttl_entry.data)
.unwrap_or(i64::MAX);
self.cancel_with_penalty(order_lifetime).await;
}
async fn cancel_with_penalty(&mut self, order_lifetime: i64) {
let penalty = Self::cancel_order_penalty(order_lifetime);
let cost = penalty * 100;
self.rate_limiter.wait_with_cost(cost as usize).await
}
pub async fn notify_add_order(
&mut self,
tx_id: String,
placement_time: i64,
user_ref: Option<i64>,
client_order_id: &Option<String>,
) {
let ttl_ref_entry = TtlEntry::new(tx_id, ORDER_TTL_US, placement_time);
self.ttl_ref_id_cache.lock().await.insert(ttl_ref_entry);
if let Some(user_ref) = user_ref {
let ttl_user_ref_entry = TtlEntry::new(user_ref, ORDER_TTL_US, placement_time);
self.ttl_user_ref_cache
.lock()
.await
.insert(ttl_user_ref_entry);
}
if let Some(client_id) = client_order_id {
let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
}
}
pub async fn notify_amend_order(
&mut self,
tx_id: &Option<String>,
placement_time: i64,
client_order_id: &Option<String>,
) {
if let Some(id) = tx_id {
let ttl_client_entry = TtlEntry::new(id.clone(), ORDER_TTL_US, placement_time);
self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
}
if let Some(client_id) = client_order_id {
let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
}
}
fn amend_order_penalty(lifetime_seconds: i64) -> i64 {
if lifetime_seconds < 5 {
3
} else if lifetime_seconds < 10 {
2
} else if lifetime_seconds < 15 {
1
} else {
0
}
}
fn edit_order_penalty(lifetime_seconds: i64) -> i64 {
if lifetime_seconds < 5 {
6
} else if lifetime_seconds < 10 {
5
} else if lifetime_seconds < 15 {
4
} else if lifetime_seconds < 45 {
3
} else if lifetime_seconds < 90 {
2
} else {
0
}
}
fn cancel_order_penalty(lifetime_seconds: i64) -> i64 {
if lifetime_seconds < 5 {
8
} else if lifetime_seconds < 10 {
6
} else if lifetime_seconds < 15 {
5
} else if lifetime_seconds < 45 {
4
} else if lifetime_seconds < 90 {
2
} else if lifetime_seconds < 300 {
1
} else {
0
}
}
fn get_rate_limiter(user_verification: VerificationTier) -> TokenBucketRateLimiter {
match user_verification {
VerificationTier::Intermediate => {
let token_bucket_state = TokenBucketState::new(12500, 234, Duration::from_secs(1));
TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
}
VerificationTier::Pro => {
let token_bucket_state = TokenBucketState::new(18000, 375, Duration::from_secs(1));
TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
}
}
}
}
#[cfg(test)]
mod tests {
use crate::rate_limiting::trading_rate_limits::KrakenTradingRateLimiter;
use crate::response_types::VerificationTier::{Intermediate, Pro};
use std::time::Duration;
use tokio::time::{pause, Instant};
#[tokio::test]
async fn test_trading_rate_limiter_intermediate_add_order_limit() {
pause();
let mut limiter = KrakenTradingRateLimiter::new(Intermediate);
let start = Instant::now();
for _ in 0..126 {
limiter.add_order().await;
}
let end = Instant::now();
let elapsed = end - start;
assert!(elapsed > Duration::from_secs(1));
assert!(elapsed < Duration::from_secs(2));
}
#[tokio::test]
async fn test_trading_rate_limiter_pro_add_order_limit() {
pause();
let mut limiter = KrakenTradingRateLimiter::new(Pro);
let start = Instant::now();
for _ in 0..181 {
limiter.add_order().await;
}
let end = Instant::now();
let elapsed = end - start;
assert!(elapsed > Duration::from_secs(1));
assert!(elapsed < Duration::from_secs(2));
}
#[tokio::test]
async fn test_trading_rate_limiter_intermediate_add_order_limit_replenish() {
pause();
let mut limiter = KrakenTradingRateLimiter::new(Intermediate);
let start = Instant::now();
for _ in 0..(126 + 15) {
limiter.add_order().await;
}
let end = Instant::now();
let elapsed = end - start;
assert!(elapsed > Duration::from_secs(7));
assert!(elapsed < Duration::from_secs(8));
}
#[tokio::test]
async fn test_trading_rate_limiter_pro_add_order_limit_replenish() {
pause();
let mut limiter = KrakenTradingRateLimiter::new(Pro);
let start = Instant::now();
for _ in 0..(181 + (4 * 3)) {
limiter.add_order().await;
}
let end = Instant::now();
let elapsed = end - start;
assert!(elapsed > Duration::from_secs(4));
assert!(elapsed < Duration::from_secs(5));
}
#[test]
fn test_amend_order_penalties() {
let cases = vec![
(0, 3),
(4, 3),
(5, 2),
(9, 2),
(10, 1),
(14, 1),
(15, 0),
(i64::MAX, 0),
];
for (lifetime, expected) in cases {
assert_eq!(
expected,
KrakenTradingRateLimiter::amend_order_penalty(lifetime)
);
}
}
#[test]
fn test_edit_order_penalties() {
let cases = vec![
(0, 6),
(4, 6),
(5, 5),
(9, 5),
(10, 4),
(14, 4),
(15, 3),
(44, 3),
(45, 2),
(89, 2),
(90, 0),
(i64::MAX, 0),
];
for (lifetime, expected) in cases {
assert_eq!(
expected,
KrakenTradingRateLimiter::edit_order_penalty(lifetime)
);
}
}
#[test]
fn test_cancel_order_penalties() {
let cases = vec![
(0, 8),
(4, 8),
(5, 6),
(9, 6),
(10, 5),
(14, 5),
(15, 4),
(44, 4),
(45, 2),
(89, 2),
(90, 1),
(299, 1),
(300, 0),
(i64::MAX, 0),
];
for (lifetime, expected) in cases {
assert_eq!(
expected,
KrakenTradingRateLimiter::cancel_order_penalty(lifetime)
);
}
}
}