kraken_async_rs/rate_limiting/
trading_rate_limits.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
use crate::rate_limiting::ttl_cache::{TtlCache, TtlEntry};
use crate::request_types::{AddBatchedOrderRequest, EditOrderRequest};
use crate::response_types::VerificationTier;
use async_rate_limit::limiters::VariableCostRateLimiter;
use async_rate_limit::token_bucket::{TokenBucketRateLimiter, TokenBucketState};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;

// 300 seconds in microseconds
const ORDER_TTL_US: i128 = 300_i128 * 10_i128.pow(6);

/// An implementation of the most accurate trading rate limits given by Kraken
#[derive(Debug, Clone)]
pub struct KrakenTradingRateLimiter {
    ttl_ref_id_cache: Arc<Mutex<TtlCache<String, i64>>>,
    ttl_user_ref_cache: Arc<Mutex<TtlCache<i64, i64>>>,
    rate_limiter: TokenBucketRateLimiter,
}

/// Implements the Advanced rate limiting scheme that requires knowing each order's lifetime.
///
/// Detailed documentation is available from several locations, including the [overview rate-limiting page],
/// [api rate-limiting page] and [trading rate-limiting page].
///
/// [overview rate-limiting page]: https://docs.kraken.com/api/docs/guides/spot-rest-ratelimits
/// [api rate-limiting page]: https://support.kraken.com/hc/en-us/articles/206548367-What-are-the-API-rate-limits-#3
/// [trading rate-limiting page]: https://docs.kraken.com/api/docs/guides/spot-ratelimits/
impl KrakenTradingRateLimiter {
    /// Create a new instance for a user with the given [VerificationTier]
    pub fn new(user_verification: VerificationTier) -> KrakenTradingRateLimiter {
        KrakenTradingRateLimiter {
            ttl_ref_id_cache: Default::default(),
            ttl_user_ref_cache: Default::default(),
            rate_limiter: Self::get_rate_limiter(user_verification),
        }
    }

    /// Wait for the fixed cost of placing an order
    pub async fn add_order(&mut self) {
        self.rate_limiter.wait_with_cost(100).await;
    }

    /// Determine the cost and wait appropriately for the given [AddBatchedOrderRequest].
    ///
    /// The cost of a batch is n / 2, where n is the number of orders in the batch.
    pub async fn add_order_batch(&mut self, add_batched_order_request: &AddBatchedOrderRequest) {
        let cost = 1.0 + (add_batched_order_request.orders.len() as f64 / 2.0);
        self.rate_limiter
            .wait_with_cost((cost * 100.0) as usize)
            .await;
    }

    /// Determine the cost of amending an order and wait if necessary
    ///
    /// This is inclusive of penalties for orders amended soon after creation or their last amendment.
    pub async fn amend_order(&mut self, tx_id: &Option<String>, client_order_id: &Option<String>) {
        let now_seconds = OffsetDateTime::now_utc().unix_timestamp();

        // any request should have a tx_id or client_order_id, but should one not have it,
        //  "default_order" is used, which would penalize very conservatively by treating all orders
        //  like a single order
        let request_id = tx_id
            .clone()
            .or(client_order_id.clone())
            .unwrap_or("default_order".to_string());

        let order_lifetime = self
            .ttl_ref_id_cache
            .lock()
            .await
            .get(&request_id)
            .map(|ttl_entry| now_seconds - ttl_entry.data)
            .unwrap_or(i64::MAX);

        let penalty = Self::amend_order_penalty(order_lifetime);
        let cost = (penalty + 1) * 100;

        self.rate_limiter.wait_with_cost(cost as usize).await
    }

    /// Determine the cost of editing an order and wait if necessary
    ///
    /// This is inclusive of penalties for orders edited soon after creation.
    pub async fn edit_order(&mut self, edit_order_request: &EditOrderRequest) {
        let now_seconds = OffsetDateTime::now_utc().unix_timestamp();
        let tx_id = edit_order_request.tx_id.clone();

        let order_lifetime = self
            .ttl_ref_id_cache
            .lock()
            .await
            .get(&tx_id)
            .map(|ttl_entry| now_seconds - ttl_entry.data)
            .unwrap_or(i64::MAX);

        let penalty = Self::edit_order_penalty(order_lifetime);
        let cost = (penalty + 1) * 100;

        self.rate_limiter.wait_with_cost(cost as usize).await
    }

    /// Determine the cost of cancelling the provided order id and wait appropriately
    ///
    /// This is inclusive of penalties for orders cancelled soon after creation.
    pub async fn cancel_order_tx_id(&mut self, id: &String) {
        let now_seconds = OffsetDateTime::now_utc().unix_timestamp();

        let mut cache_guard = self.ttl_ref_id_cache.lock().await;
        let order_lifetime = cache_guard
            .get(id)
            .map(|ttl_entry| now_seconds - ttl_entry.data)
            .unwrap_or(i64::MAX);
        drop(cache_guard);

        self.cancel_with_penalty(order_lifetime).await;
    }

    /// Determine the cost of cancelling the provided user ref and wait appropriately
    ///
    /// This is inclusive of penalties for orders cancelled soon after creation.
    pub async fn cancel_order_user_ref(&mut self, id: &i64) {
        let now_seconds = OffsetDateTime::now_utc().unix_timestamp();

        let order_lifetime = self
            .ttl_user_ref_cache
            .lock()
            .await
            .get(id)
            .map(|ttl_entry| now_seconds - ttl_entry.data)
            .unwrap_or(i64::MAX);

        self.cancel_with_penalty(order_lifetime).await;
    }

    async fn cancel_with_penalty(&mut self, order_lifetime: i64) {
        let penalty = Self::cancel_order_penalty(order_lifetime);
        let cost = penalty * 100;

        self.rate_limiter.wait_with_cost(cost as usize).await
    }

    /// Notify the rate limiter of a new order being created -- this is essential to the rate limiting scheme!
    ///
    /// Order lifetimes must be known in order to determine the penalties for editing or cancelling
    /// orders that were placed less than 300s ago.
    pub async fn notify_add_order(
        &mut self,
        tx_id: String,
        placement_time: i64,
        user_ref: Option<i64>,
        client_order_id: &Option<String>,
    ) {
        let ttl_ref_entry = TtlEntry::new(tx_id, ORDER_TTL_US, placement_time);

        self.ttl_ref_id_cache.lock().await.insert(ttl_ref_entry);

        if let Some(user_ref) = user_ref {
            let ttl_user_ref_entry = TtlEntry::new(user_ref, ORDER_TTL_US, placement_time);
            self.ttl_user_ref_cache
                .lock()
                .await
                .insert(ttl_user_ref_entry);
        }

        if let Some(client_id) = client_order_id {
            let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
            self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
        }
    }

    /// Notify the cache that an order was amended at the given time -- this is essential to the rate limiting scheme!
    pub async fn notify_amend_order(
        &mut self,
        tx_id: &Option<String>,
        placement_time: i64,
        client_order_id: &Option<String>,
    ) {
        if let Some(id) = tx_id {
            let ttl_client_entry = TtlEntry::new(id.clone(), ORDER_TTL_US, placement_time);
            self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
        }
        if let Some(client_id) = client_order_id {
            let ttl_client_entry = TtlEntry::new(client_id.clone(), ORDER_TTL_US, placement_time);
            self.ttl_ref_id_cache.lock().await.insert(ttl_client_entry);
        }
    }

    fn amend_order_penalty(lifetime_seconds: i64) -> i64 {
        if lifetime_seconds < 5 {
            3
        } else if lifetime_seconds < 10 {
            2
        } else if lifetime_seconds < 15 {
            1
        } else {
            0
        }
    }

    fn edit_order_penalty(lifetime_seconds: i64) -> i64 {
        if lifetime_seconds < 5 {
            6
        } else if lifetime_seconds < 10 {
            5
        } else if lifetime_seconds < 15 {
            4
        } else if lifetime_seconds < 45 {
            3
        } else if lifetime_seconds < 90 {
            2
        } else {
            0
        }
    }

    fn cancel_order_penalty(lifetime_seconds: i64) -> i64 {
        if lifetime_seconds < 5 {
            8
        } else if lifetime_seconds < 10 {
            6
        } else if lifetime_seconds < 15 {
            5
        } else if lifetime_seconds < 45 {
            4
        } else if lifetime_seconds < 90 {
            2
        } else if lifetime_seconds < 300 {
            1
        } else {
            0
        }
    }

    fn get_rate_limiter(user_verification: VerificationTier) -> TokenBucketRateLimiter {
        // tokens are scaled 100x from Kraken's floating-point method to keep as integers
        match user_verification {
            VerificationTier::Intermediate => {
                let token_bucket_state = TokenBucketState::new(12500, 234, Duration::from_secs(1));
                TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
            }
            VerificationTier::Pro => {
                let token_bucket_state = TokenBucketState::new(18000, 375, Duration::from_secs(1));
                TokenBucketRateLimiter::new(Arc::new(Mutex::new(token_bucket_state)))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::rate_limiting::trading_rate_limits::KrakenTradingRateLimiter;
    /// Tests use Tokio's pause() functionality to have instantaneous testing that relies on Tokio
    /// keeping track of time elapsed by fast-forwarding when there are no pending tasks on the
    /// event loop.
    ///
    /// Tests are done at a high enough level that execution time of Rust is thought to be negligible.
    ///
    use crate::response_types::VerificationTier::{Intermediate, Pro};
    use std::time::Duration;
    use tokio::time::{pause, Instant};

    #[tokio::test]
    async fn test_trading_rate_limiter_intermediate_add_order_limit() {
        pause();

        let mut limiter = KrakenTradingRateLimiter::new(Intermediate);

        let start = Instant::now();
        // 126 calls should push limiter over the 12500 limit, requiring waiting 1s
        for _ in 0..126 {
            limiter.add_order().await;
        }

        let end = Instant::now();
        let elapsed = end - start;

        assert!(elapsed > Duration::from_secs(1));
        assert!(elapsed < Duration::from_secs(2));
    }

    #[tokio::test]
    async fn test_trading_rate_limiter_pro_add_order_limit() {
        pause();

        let mut limiter = KrakenTradingRateLimiter::new(Pro);

        let start = Instant::now();
        // 181 calls should push limiter over the 18000 limit, requiring waiting 1s
        for _ in 0..181 {
            limiter.add_order().await;
        }

        let end = Instant::now();
        let elapsed = end - start;

        assert!(elapsed > Duration::from_secs(1));
        assert!(elapsed < Duration::from_secs(2));
    }

    #[tokio::test]
    async fn test_trading_rate_limiter_intermediate_add_order_limit_replenish() {
        pause();

        let mut limiter = KrakenTradingRateLimiter::new(Intermediate);

        let start = Instant::now();
        // 126 calls should push limiter over the 12500 limit, requiring waiting 1s.
        //  Replenishing at 234/s means that an additional 15 orders (costing 100 * 15 = 1500 total) should
        //  take another (1500 / 234 = ) 6.4s wait
        for _ in 0..(126 + 15) {
            limiter.add_order().await;
        }

        let end = Instant::now();
        let elapsed = end - start;

        // expect that the first 126 orders took 1s
        //  the remaining 15 should take another ~6.4s
        assert!(elapsed > Duration::from_secs(7));
        assert!(elapsed < Duration::from_secs(8));
    }

    #[tokio::test]
    async fn test_trading_rate_limiter_pro_add_order_limit_replenish() {
        pause();

        let mut limiter = KrakenTradingRateLimiter::new(Pro);

        let start = Instant::now();
        // 181 calls should push limiter over the 18000 limit, requiring waiting 1s.
        //  Replenishing at 375/s means that each additional 4 orders (costing 400 total) should
        //  take another 1s wait for that batch of 4
        for _ in 0..(181 + (4 * 3)) {
            limiter.add_order().await;
        }

        let end = Instant::now();
        let elapsed = end - start;

        // expect that the first 181 orders took 1s
        //  the remaining 3 sets of 4 should take another 3s
        assert!(elapsed > Duration::from_secs(4));
        assert!(elapsed < Duration::from_secs(5));
    }

    #[test]
    fn test_amend_order_penalties() {
        let cases = vec![
            (0, 3),
            (4, 3),
            (5, 2),
            (9, 2),
            (10, 1),
            (14, 1),
            (15, 0),
            (i64::MAX, 0),
        ];

        for (lifetime, expected) in cases {
            assert_eq!(
                expected,
                KrakenTradingRateLimiter::amend_order_penalty(lifetime)
            );
        }
    }

    #[test]
    fn test_edit_order_penalties() {
        let cases = vec![
            (0, 6),
            (4, 6),
            (5, 5),
            (9, 5),
            (10, 4),
            (14, 4),
            (15, 3),
            (44, 3),
            (45, 2),
            (89, 2),
            (90, 0),
            (i64::MAX, 0),
        ];

        for (lifetime, expected) in cases {
            assert_eq!(
                expected,
                KrakenTradingRateLimiter::edit_order_penalty(lifetime)
            );
        }
    }

    #[test]
    fn test_cancel_order_penalties() {
        let cases = vec![
            (0, 8),
            (4, 8),
            (5, 6),
            (9, 6),
            (10, 5),
            (14, 5),
            (15, 4),
            (44, 4),
            (45, 2),
            (89, 2),
            (90, 1),
            (299, 1),
            (300, 0),
            (i64::MAX, 0),
        ];

        for (lifetime, expected) in cases {
            assert_eq!(
                expected,
                KrakenTradingRateLimiter::cancel_order_penalty(lifetime)
            );
        }
    }
}