kraken_async_rs/rate_limiting/
trading_rate_limits.rs1use crate::rate_limiting::ttl_cache::{TtlCache, TtlEntry};
2use crate::request_types::{AddBatchedOrderRequest, EditOrderRequest};
3use crate::response_types::VerificationTier;
4use async_rate_limit::limiters::VariableCostRateLimiter;
5use async_rate_limit::token_bucket::{TokenBucketRateLimiter, TokenBucketState};
6use std::sync::Arc;
7use std::time::Duration;
8use time::OffsetDateTime;
9use tokio::sync::Mutex;
10
11const ORDER_TTL_US: i128 = 300_i128 * 10_i128.pow(6);
13
14#[derive(Debug, Clone)]
16pub struct KrakenTradingRateLimiter {
17 ttl_ref_id_cache: Arc<Mutex<TtlCache<String, i64>>>,
18 ttl_user_ref_cache: Arc<Mutex<TtlCache<i64, i64>>>,
19 rate_limiter: TokenBucketRateLimiter,
20}
21
22impl KrakenTradingRateLimiter {
31 pub fn new(user_verification: VerificationTier) -> KrakenTradingRateLimiter {
33 KrakenTradingRateLimiter {
34 ttl_ref_id_cache: Default::default(),
35 ttl_user_ref_cache: Default::default(),
36 rate_limiter: Self::get_rate_limiter(user_verification),
37 }
38 }
39
40 pub async fn add_order(&mut self) {
42 self.rate_limiter.wait_with_cost(100).await;
43 }
44
45 pub async fn add_order_batch(&mut self, add_batched_order_request: &AddBatchedOrderRequest) {
49 let cost = 1.0 + (add_batched_order_request.orders.len() as f64 / 2.0);
50 self.rate_limiter
51 .wait_with_cost((cost * 100.0) as usize)
52 .await;
53 }
54
55 pub async fn amend_order(&mut self, tx_id: &Option<String>, client_order_id: &Option<String>) {
59 let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
60
61 let request_id = tx_id
65 .clone()
66 .or(client_order_id.clone())
67 .unwrap_or("default_order".to_string());
68
69 let order_lifetime = self
70 .ttl_ref_id_cache
71 .lock()
72 .await
73 .get(&request_id)
74 .map(|ttl_entry| now_seconds - ttl_entry.data)
75 .unwrap_or(i64::MAX);
76
77 let penalty = Self::amend_order_penalty(order_lifetime);
78 let cost = (penalty + 1) * 100;
79
80 self.rate_limiter.wait_with_cost(cost as usize).await
81 }
82
83 pub async fn edit_order(&mut self, edit_order_request: &EditOrderRequest) {
87 let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
88 let tx_id = edit_order_request.tx_id.clone();
89
90 let order_lifetime = self
91 .ttl_ref_id_cache
92 .lock()
93 .await
94 .get(&tx_id)
95 .map(|ttl_entry| now_seconds - ttl_entry.data)
96 .unwrap_or(i64::MAX);
97
98 let penalty = Self::edit_order_penalty(order_lifetime);
99 let cost = (penalty + 1) * 100;
100
101 self.rate_limiter.wait_with_cost(cost as usize).await
102 }
103
104 pub async fn cancel_order_tx_id(&mut self, id: &String) {
108 let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
109
110 let mut cache_guard = self.ttl_ref_id_cache.lock().await;
111 let order_lifetime = cache_guard
112 .get(id)
113 .map(|ttl_entry| now_seconds - ttl_entry.data)
114 .unwrap_or(i64::MAX);
115 drop(cache_guard);
116
117 self.cancel_with_penalty(order_lifetime).await;
118 }
119
120 pub async fn cancel_order_user_ref(&mut self, id: &i64) {
124 let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
125
126 let order_lifetime = self
127 .ttl_user_ref_cache
128 .lock()
129 .await
130 .get(id)
131 .map(|ttl_entry| now_seconds - ttl_entry.data)
132 .unwrap_or(i64::MAX);
133
134 self.cancel_with_penalty(order_lifetime).await;
135 }
136
137 async fn cancel_with_penalty(&mut self, order_lifetime: i64) {
138 let penalty = Self::cancel_order_penalty(order_lifetime);
139 let cost = penalty * 100;
140
141 self.rate_limiter.wait_with_cost(cost as usize).await
142 }
143
144 pub async fn notify_add_order(
149 &mut self,
150 tx_id: String,
151 placement_time: i64,
152 user_ref: Option<i64>,
153 client_order_id: &Option<String>,
154 ) {
155 let ttl_ref_entry = TtlEntry::new(tx_id, ORDER_TTL_US, placement_time);
156
157 self.ttl_ref_id_cache.lock().await.insert(ttl_ref_entry);
158
159 if let Some(user_ref) = user_ref {
160 let ttl_user_ref_entry = TtlEntry::new(user_ref, ORDER_TTL_US, placement_time);
161 self.ttl_user_ref_cache
162 .lock()
163 .await
164 .insert(ttl_user_ref_entry);
165 }
166
167 if let Some(client_id) = client_order_id {
168 let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
169 self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
170 }
171 }
172
173 pub async fn notify_amend_order(
175 &mut self,
176 tx_id: &Option<String>,
177 placement_time: i64,
178 client_order_id: &Option<String>,
179 ) {
180 if let Some(id) = tx_id {
181 let ttl_client_entry = TtlEntry::new(id.clone(), ORDER_TTL_US, placement_time);
182 self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
183 }
184 if let Some(client_id) = client_order_id {
185 let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
186 self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
187 }
188 }
189
190 fn amend_order_penalty(lifetime_seconds: i64) -> i64 {
191 if lifetime_seconds < 5 {
192 3
193 } else if lifetime_seconds < 10 {
194 2
195 } else if lifetime_seconds < 15 {
196 1
197 } else {
198 0
199 }
200 }
201
202 fn edit_order_penalty(lifetime_seconds: i64) -> i64 {
203 if lifetime_seconds < 5 {
204 6
205 } else if lifetime_seconds < 10 {
206 5
207 } else if lifetime_seconds < 15 {
208 4
209 } else if lifetime_seconds < 45 {
210 3
211 } else if lifetime_seconds < 90 {
212 2
213 } else {
214 0
215 }
216 }
217
218 fn cancel_order_penalty(lifetime_seconds: i64) -> i64 {
219 if lifetime_seconds < 5 {
220 8
221 } else if lifetime_seconds < 10 {
222 6
223 } else if lifetime_seconds < 15 {
224 5
225 } else if lifetime_seconds < 45 {
226 4
227 } else if lifetime_seconds < 90 {
228 2
229 } else if lifetime_seconds < 300 {
230 1
231 } else {
232 0
233 }
234 }
235
236 fn get_rate_limiter(user_verification: VerificationTier) -> TokenBucketRateLimiter {
237 match user_verification {
239 VerificationTier::Intermediate => {
240 let token_bucket_state = TokenBucketState::new(12500, 234, Duration::from_secs(1));
241 TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
242 }
243 VerificationTier::Pro => {
244 let token_bucket_state = TokenBucketState::new(18000, 375, Duration::from_secs(1));
245 TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
246 }
247 }
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use crate::rate_limiting::trading_rate_limits::KrakenTradingRateLimiter;
254 use crate::response_types::VerificationTier::{Intermediate, Pro};
261 use std::time::Duration;
262 use tokio::time::{pause, Instant};
263
264 #[tokio::test]
265 async fn test_trading_rate_limiter_intermediate_add_order_limit() {
266 pause();
267
268 let mut limiter = KrakenTradingRateLimiter::new(Intermediate);
269
270 let start = Instant::now();
271 for _ in 0..126 {
273 limiter.add_order().await;
274 }
275
276 let end = Instant::now();
277 let elapsed = end - start;
278
279 assert!(elapsed > Duration::from_secs(1));
280 assert!(elapsed < Duration::from_secs(2));
281 }
282
283 #[tokio::test]
284 async fn test_trading_rate_limiter_pro_add_order_limit() {
285 pause();
286
287 let mut limiter = KrakenTradingRateLimiter::new(Pro);
288
289 let start = Instant::now();
290 for _ in 0..181 {
292 limiter.add_order().await;
293 }
294
295 let end = Instant::now();
296 let elapsed = end - start;
297
298 assert!(elapsed > Duration::from_secs(1));
299 assert!(elapsed < Duration::from_secs(2));
300 }
301
302 #[tokio::test]
303 async fn test_trading_rate_limiter_intermediate_add_order_limit_replenish() {
304 pause();
305
306 let mut limiter = KrakenTradingRateLimiter::new(Intermediate);
307
308 let start = Instant::now();
309 for _ in 0..(126 + 15) {
313 limiter.add_order().await;
314 }
315
316 let end = Instant::now();
317 let elapsed = end - start;
318
319 assert!(elapsed > Duration::from_secs(7));
322 assert!(elapsed < Duration::from_secs(8));
323 }
324
325 #[tokio::test]
326 async fn test_trading_rate_limiter_pro_add_order_limit_replenish() {
327 pause();
328
329 let mut limiter = KrakenTradingRateLimiter::new(Pro);
330
331 let start = Instant::now();
332 for _ in 0..(181 + (4 * 3)) {
336 limiter.add_order().await;
337 }
338
339 let end = Instant::now();
340 let elapsed = end - start;
341
342 assert!(elapsed > Duration::from_secs(4));
345 assert!(elapsed < Duration::from_secs(5));
346 }
347
348 #[test]
349 fn test_amend_order_penalties() {
350 let cases = vec![
351 (0, 3),
352 (4, 3),
353 (5, 2),
354 (9, 2),
355 (10, 1),
356 (14, 1),
357 (15, 0),
358 (i64::MAX, 0),
359 ];
360
361 for (lifetime, expected) in cases {
362 assert_eq!(
363 expected,
364 KrakenTradingRateLimiter::amend_order_penalty(lifetime)
365 );
366 }
367 }
368
369 #[test]
370 fn test_edit_order_penalties() {
371 let cases = vec![
372 (0, 6),
373 (4, 6),
374 (5, 5),
375 (9, 5),
376 (10, 4),
377 (14, 4),
378 (15, 3),
379 (44, 3),
380 (45, 2),
381 (89, 2),
382 (90, 0),
383 (i64::MAX, 0),
384 ];
385
386 for (lifetime, expected) in cases {
387 assert_eq!(
388 expected,
389 KrakenTradingRateLimiter::edit_order_penalty(lifetime)
390 );
391 }
392 }
393
394 #[test]
395 fn test_cancel_order_penalties() {
396 let cases = vec![
397 (0, 8),
398 (4, 8),
399 (5, 6),
400 (9, 6),
401 (10, 5),
402 (14, 5),
403 (15, 4),
404 (44, 4),
405 (45, 2),
406 (89, 2),
407 (90, 1),
408 (299, 1),
409 (300, 0),
410 (i64::MAX, 0),
411 ];
412
413 for (lifetime, expected) in cases {
414 assert_eq!(
415 expected,
416 KrakenTradingRateLimiter::cancel_order_penalty(lifetime)
417 );
418 }
419 }
420}