kraken_async_rs/wss/
kraken_wss_client.rs

1//! Kraken WSS client and message streams
2use crate::wss::errors::WSSError;
3use crate::wss::Message;
4use futures_util::SinkExt;
5use serde::{Deserialize, Serialize};
6use std::fmt::Debug;
7use std::marker::PhantomData;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use tokio::net::TcpStream;
11use tokio_stream::Stream;
12use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
13use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
14use tracing::trace;
15use url::Url;
16
17pub const WS_KRAKEN: &str = "wss://ws.kraken.com/v2";
18pub const WS_KRAKEN_AUTH: &str = "wss://ws-auth.kraken.com/v2";
19
20type RawStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
21
22/// A client for connecting to Kraken websockets via the V2 protocol.
23#[derive(Debug, Clone)]
24pub struct KrakenWSSClient {
25    base_url: String,
26    auth_url: String,
27    trace_inbound: bool,
28    trace_outbound: bool,
29}
30
31impl Default for KrakenWSSClient {
32    fn default() -> Self {
33        KrakenWSSClient::new()
34    }
35}
36
37impl KrakenWSSClient {
38    /// Create a client using the default Kraken URLs.
39    pub fn new() -> KrakenWSSClient {
40        KrakenWSSClient::new_with_tracing(WS_KRAKEN, WS_KRAKEN_AUTH, false, false)
41    }
42
43    /// Create a client with custom URLs.
44    ///
45    /// This is most useful for use with a proxy, or for testing.
46    pub fn new_with_urls(base_url: impl ToString, auth_url: impl ToString) -> KrakenWSSClient {
47        KrakenWSSClient::new_with_tracing(base_url, auth_url, false, false)
48    }
49
50    pub fn new_with_tracing(
51        base_url: impl ToString,
52        auth_url: impl ToString,
53        trace_inbound: bool,
54        trace_outbound: bool,
55    ) -> KrakenWSSClient {
56        KrakenWSSClient {
57            base_url: base_url.to_string(),
58            auth_url: auth_url.to_string(),
59            trace_inbound,
60            trace_outbound,
61        }
62    }
63
64    /// Connect to the Kraken public websocket channel, returning a [`Result`] containing a
65    /// [`KrakenMessageStream`] of [`PublicMessage`]s.
66    pub async fn connect<T>(&mut self) -> Result<KrakenMessageStream<T>, WSSError>
67    where
68        T: for<'d> Deserialize<'d>,
69    {
70        self._connect(&self.base_url.clone()).await
71    }
72
73    /// Connect to the Kraken private websocket channel, returning a [`Result`] containing a
74    /// [`KrakenMessageStream`] of [`PrivateMessage`]s.
75    pub async fn connect_auth<T>(&mut self) -> Result<KrakenMessageStream<T>, WSSError>
76    where
77        T: for<'d> Deserialize<'d>,
78    {
79        self._connect(&self.auth_url.clone()).await
80    }
81
82    #[tracing::instrument(skip(self))]
83    async fn _connect<T>(&mut self, url: &str) -> Result<KrakenMessageStream<T>, WSSError>
84    where
85        T: for<'d> Deserialize<'d>,
86    {
87        let url = Url::parse(url)?;
88        let (raw_stream, _response) = connect_async(url.as_str()).await?;
89
90        Ok(KrakenMessageStream {
91            stream: raw_stream,
92            phantom: PhantomData,
93            trace_inbound: self.trace_inbound,
94            trace_outbound: self.trace_outbound,
95        })
96    }
97}
98
99/// A futures_core::[`Stream`] implementation that returns deserializable messages. Messages can be
100/// retrieved by awaiting `someStream.next()`.
101///
102/// # Example: Listening to Public Messages
103/// See the full example including subscribing to channels in examples/live_public_wss_listening.rs.
104/// ```ignore
105///let mut client = KrakenWSSClient::new();
106///let mut kraken_stream: KrakenMessageStream<PublicMessage> = client.connect().await.unwrap();
107///
108///while let Some(message) = kraken_stream.next().await {
109///    println!("{:?}", message.unwrap());
110///}
111/// ```
112pub struct KrakenMessageStream<T>
113where
114    T: for<'a> Deserialize<'a>,
115{
116    stream: RawStream,
117    phantom: PhantomData<T>,
118    trace_inbound: bool,
119    trace_outbound: bool,
120}
121
122impl<T> Unpin for KrakenMessageStream<T>
123where
124    T: for<'a> Deserialize<'a>,
125{
126    // required for stream to be borrow-mutable when polling
127}
128
129impl<T> KrakenMessageStream<T>
130where
131    T: for<'a> Deserialize<'a>,
132{
133    /// Send an arbitrary serializable message through the stream.
134    #[tracing::instrument(skip(self))]
135    pub async fn send<M>(&mut self, message: &Message<M>) -> Result<(), WSSError>
136    where
137        M: Serialize + Debug,
138    {
139        self.send_as_str(message).await
140    }
141
142    #[tracing::instrument(skip(self))]
143    async fn send_as_str<M>(&mut self, message: &Message<M>) -> Result<(), WSSError>
144    where
145        M: Serialize + Debug,
146    {
147        let message_json = serde_json::to_string(message)?;
148
149        if self.trace_outbound {
150            trace!("Sending: {}", message_json);
151        }
152
153        self.stream
154            .send(TungsteniteMessage::Binary(
155                message_json.as_bytes().to_vec().into(),
156            ))
157            .await?;
158        Ok(())
159    }
160}
161
162impl<T> Stream for KrakenMessageStream<T>
163where
164    T: for<'a> Deserialize<'a>,
165{
166    type Item = Result<T, WSSError>;
167
168    /// returns Poll:Ready with a message if available, otherwise Poll:Pending
169    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170        if let Poll::Ready(Some(message)) = Pin::new(&mut self.stream).poll_next(cx)? {
171            if self.trace_inbound {
172                trace!("Received: {}", message.to_string());
173            }
174            let parsed: T = serde_json::from_str(message.to_text()?)?;
175            Poll::Ready(Some(Ok(parsed)))
176        } else {
177            Poll::Pending
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::crypto::secrets::Token;
186    use crate::request_types::{TimeInForce, TimeInForceV2, TriggerType};
187    use crate::response_types::{BuySell, OrderStatusV2, OrderType, SystemStatus};
188    use crate::test_data::{
189        get_balances_subscription_response, get_book_subscription_response,
190        get_execution_subscription_response, get_expected_balances_message,
191        get_expected_balances_subscription, get_expected_book_message,
192        get_expected_book_subscription, get_expected_execution_message,
193        get_expected_execution_subscription, get_expected_instruments_message,
194        get_expected_instruments_subscription, get_expected_l3_message,
195        get_expected_l3_subscription, get_expected_ohlc_message, get_expected_ohlc_subscription,
196        get_expected_ping, get_expected_pong_message, get_expected_ticker_message,
197        get_expected_ticker_subscription, get_expected_trade_message,
198        get_expected_trade_subscription, get_instruments_subscription_response,
199        get_l3_subscription_response, get_ohlc_subscription_response, get_pong,
200        get_ticker_subscription_response, get_trade_subscription_response, parse_for_test,
201        CallResponseTest, ParseIncomingTest,
202    };
203    use crate::wss::ChannelMessage::{Heartbeat, Status};
204    use crate::wss::MethodMessage::{AddOrder, AmendOrder, CancelOrder, EditOrder};
205    use crate::wss::{
206        AddOrderParams, AddOrderResult, AmendOrderParams, AmendOrderResult, Asset, AssetStatus,
207        Balance, BalanceResponse, BalancesSubscription, BatchCancelParams, BatchCancelResponse,
208        BatchOrder, BatchOrderParams, BidAsk, BookSubscription, CancelAllOrdersParams,
209        CancelAllOrdersResult, CancelOnDisconnectParams, CancelOnDisconnectResult,
210        CancelOrderParams, CancelOrderResult, ChannelMessage, EditOrderParams, EditOrderResult,
211        ExecutionResult, ExecutionSubscription, ExecutionType, Fee, FeePreference, Instruments,
212        InstrumentsSubscription, L3BidAsk, L3BidAskUpdate, L3Orderbook, L3OrderbookUpdate,
213        LedgerCategory, LedgerEntryTypeV2, LedgerUpdate, MakerTaker, MarketDataResponse,
214        MarketLimit, MethodMessage, Ohlc, OhlcSubscription, Orderbook, OrderbookEvent,
215        OrderbookUpdate, Pair, PairStatus, PriceType, Response, ResultResponse, SingleResponse,
216        StatusUpdate, Ticker, TickerSubscription, Trade, TradesSubscription, TriggerDescription,
217        TriggerStatus, Wallet, WalletId, WalletType, WssMessage, L2, L3,
218    };
219    use rust_decimal_macros::dec;
220    use serde_json::{json, Number};
221    use std::str::FromStr;
222    use std::time::Duration;
223    use tokio::time::timeout;
224    use tokio_stream::StreamExt;
225    use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
226    use tracing_test::traced_test;
227    use ws_mock::matchers::Any;
228    use ws_mock::ws_mock_server::{WsMock, WsMockServer};
229
230    #[test]
231    fn test_wss_client_creates() {
232        let client = KrakenWSSClient::new();
233        assert_eq!(WS_KRAKEN, client.base_url);
234        assert_eq!(WS_KRAKEN_AUTH, client.auth_url);
235    }
236
237    #[test]
238    fn test_wss_default_creates_client() {
239        let client = KrakenWSSClient::default();
240        assert_eq!(WS_KRAKEN, client.base_url);
241        assert_eq!(WS_KRAKEN_AUTH, client.auth_url);
242    }
243
244    #[test]
245    fn test_wss_client_new_with_urls() {
246        let mock_url = "https://trades.com";
247        let mock_auth_url = "https://auth.trades.com";
248        let client =
249            KrakenWSSClient::new_with_urls(mock_url.to_string(), mock_auth_url.to_string());
250        assert_eq!(mock_url, client.base_url);
251        assert_eq!(mock_auth_url, client.auth_url);
252    }
253
254    #[traced_test]
255    #[tokio::test]
256    async fn test_tracing_flags_disabled_by_default() {
257        let mock_server = WsMockServer::start().await;
258        let uri = mock_server.uri().await;
259        let mut client = KrakenWSSClient::new_with_urls(uri.clone(), uri);
260
261        WsMock::new()
262            .matcher(Any::new())
263            .respond_with(TungsteniteMessage::text("response"))
264            .mount(&mock_server)
265            .await;
266
267        let mut stream = client.connect::<String>().await.unwrap();
268
269        stream.send(&Message::new_subscription(0, 0)).await.unwrap();
270
271        let _message = timeout(Duration::from_secs(1), stream.next())
272            .await
273            .unwrap();
274
275        assert!(!logs_contain("Sending:"));
276        assert!(!logs_contain("Received: response"));
277    }
278
279    #[traced_test]
280    #[tokio::test]
281    async fn test_tracing_flags_enabled() {
282        let mock_server = WsMockServer::start().await;
283        let uri = mock_server.uri().await;
284        let mut client = KrakenWSSClient::new_with_tracing(&uri, &uri, true, true);
285
286        WsMock::new()
287            .matcher(Any::new())
288            .respond_with(TungsteniteMessage::text("response"))
289            .mount(&mock_server)
290            .await;
291
292        let mut stream = client.connect::<String>().await.unwrap();
293
294        stream.send(&Message::new_subscription(0, 0)).await.unwrap();
295
296        let _message = timeout(Duration::from_secs(1), stream.next())
297            .await
298            .unwrap();
299
300        assert!(logs_contain(
301            r#"Sending: {"method":"subscribe","params":0,"req_id":0}"#
302        ));
303        assert!(logs_contain("Received: response"));
304    }
305
306    #[tokio::test]
307    async fn test_admin_messages() {
308        let heartbeat = r#"{"channel":"heartbeat"}"#.to_string();
309        let status_update = r#"{"channel":"status","data":[{"api_version":"v2","connection_id":12393906104898154338,"system":"online","version":"2.0.4"}],"type":"update"}"#.to_string();
310
311        let status_message = WssMessage::Channel(Status(SingleResponse {
312            data: StatusUpdate {
313                api_version: "v2".to_string(),
314                connection_id: Some(Number::from_str("12393906104898154338").unwrap()),
315                system: SystemStatus::Online,
316                version: "2.0.4".to_string(),
317            },
318        }));
319
320        ParseIncomingTest::new()
321            .with_incoming(heartbeat)
322            .expect_message(WssMessage::Channel(Heartbeat))
323            .with_incoming(status_update)
324            .expect_message(status_message)
325            .test()
326            .await;
327    }
328
329    #[tokio::test]
330    async fn test_ping_pong() {
331        let ping: Option<()> = None;
332
333        let message = Message {
334            method: "ping".to_string(),
335            params: ping,
336            req_id: 1,
337        };
338
339        CallResponseTest::builder()
340            .match_on(get_expected_ping())
341            .respond_with(get_pong())
342            .send(message)
343            .expect(get_expected_pong_message())
344            .build()
345            .test()
346            .await;
347    }
348
349    #[tokio::test]
350    async fn test_cloudflare_error() {
351        // a bare string isn't valid JSON, so this fails to parse rather than producing an error
352        let cloudflare_restarting = r#"CloudFlare WebSocket proxy restarting"#;
353
354        let result = parse_for_test(cloudflare_restarting).await;
355
356        assert!(matches!(result, Err(WSSError::Serde(..))));
357    }
358
359    #[tokio::test]
360    async fn test_error_messages() {
361        let unsupported_field = r#"{"error":"Unsupported field: 'params' for the given msg type: ping","method":"ping","req_id":0,"success":false,"time_in":"2024-05-19T19:58:40.170724Z","time_out":"2024-05-19T19:58:40.170758Z"}"#.to_string();
362
363        let expected_unsupported_field = WssMessage::Method(MethodMessage::Ping(ResultResponse {
364            result: None,
365            error: Some("Unsupported field: 'params' for the given msg type: ping".to_string()),
366            success: false,
367            req_id: 0,
368            time_in: "2024-05-19T19:58:40.170724Z".to_string(),
369            time_out: "2024-05-19T19:58:40.170758Z".to_string(),
370        }));
371
372        let unsupported_event = r#"{"error":"Unsupported event","method":"subscribe","req_id":0,"success":false,"time_in":"2024-05-19T20:02:10.316562Z","time_out":"2024-05-19T20:02:10.316592Z"}"#.to_string();
373
374        let expected_unsupported_event =
375            WssMessage::Method(MethodMessage::Subscription(ResultResponse {
376                result: None,
377                error: Some("Unsupported event".to_string()),
378                success: false,
379                req_id: 0,
380                time_in: "2024-05-19T20:02:10.316562Z".to_string(),
381                time_out: "2024-05-19T20:02:10.316592Z".to_string(),
382            }));
383
384        let invalid_arguments = r#"{"error":"EGeneral:Invalid arguments:no_mpp order option is only available when ordertype = market","method":"add_order","req_id":0,"success":false,"time_in":"2024-05-18T12:03:08.768086Z","time_out":"2024-05-18T12:03:08.768149Z"}"#.to_string();
385
386        let expected_invalid_arguments =
387            WssMessage::Method(MethodMessage::AddOrder(ResultResponse {
388                result: None,
389                error: Some("EGeneral:Invalid arguments:no_mpp order option is only available when ordertype = market".to_string()),
390                success: false,
391                req_id: 0,
392                time_in: "2024-05-18T12:03:08.768086Z".to_string(),
393                time_out: "2024-05-18T12:03:08.768149Z".to_string(),
394            }));
395
396        let add_order_failure = r#"{"error":"Cash_order_qty field must be a number_float","method":"add_order","req_id":7,"success":false,"time_in":"2024-05-18T12:00:03.886027Z","time_out":"2024-05-18T12:00:03.886141Z"}"#.to_string();
397
398        let expected_add_order_failure =
399            WssMessage::Method(MethodMessage::AddOrder(ResultResponse {
400                result: None,
401                error: Some("Cash_order_qty field must be a number_float".to_string()),
402                success: false,
403                req_id: 7,
404                time_in: "2024-05-18T12:00:03.886027Z".to_string(),
405                time_out: "2024-05-18T12:00:03.886141Z".to_string(),
406            }));
407
408        let permission_denied = r#"{"error":"EGeneral:Permission denied","method":"add_order","req_id":0,"success":false,"time_in":"2024-05-18T12:03:43.466650Z","time_out":"2024-05-18T12:03:43.471987Z"}"#.to_string();
409
410        let expected_permission_denied =
411            WssMessage::Method(MethodMessage::AddOrder(ResultResponse {
412                result: None,
413                error: Some("EGeneral:Permission denied".to_string()),
414                success: false,
415                req_id: 0,
416                time_in: "2024-05-18T12:03:43.466650Z".to_string(),
417                time_out: "2024-05-18T12:03:43.471987Z".to_string(),
418            }));
419
420        let no_token = r#"{"error":"Token(s) not found","method":"edit_order","req_id":0,"success":false,"time_in":"2024-05-18T13:04:41.754066Z","time_out":"2024-05-18T13:04:41.754113Z"}"#.to_string();
421
422        let expected_no_token = WssMessage::Method(MethodMessage::EditOrder(ResultResponse {
423            result: None,
424            error: Some("Token(s) not found".to_string()),
425            success: false,
426            req_id: 0,
427            time_in: "2024-05-18T13:04:41.754066Z".to_string(),
428            time_out: "2024-05-18T13:04:41.754113Z".to_string(),
429        }));
430
431        ParseIncomingTest::new()
432            .with_incoming(unsupported_field)
433            .expect_message(expected_unsupported_field)
434            .with_incoming(unsupported_event)
435            .expect_message(expected_unsupported_event)
436            .with_incoming(invalid_arguments)
437            .expect_message(expected_invalid_arguments)
438            .with_incoming(add_order_failure)
439            .expect_message(expected_add_order_failure)
440            .with_incoming(permission_denied)
441            .expect_message(expected_permission_denied)
442            .with_incoming(no_token)
443            .expect_message(expected_no_token)
444            .test()
445            .await;
446    }
447
448    #[tokio::test]
449    async fn test_ticker_snapshot() {
450        let ticker_snapshot = r#"{
451        "channel":"ticker",
452        "type":"snapshot",
453        "data":[{
454            "symbol":"BTC/USD",
455            "bid":65972.8,
456            "bid_qty":0.10000000,
457            "ask":65972.9,
458            "ask_qty":39.67506683,
459            "last":65972.9,
460            "volume":4216.61829370,
461            "vwap":64275.2,
462            "low":61325.4,
463            "high":66450.0,
464            "change":4412.1,
465            "change_pct":7.17
466        }]
467    }"#
468        .to_string();
469
470        let expected_snapshot = WssMessage::Channel(ChannelMessage::Ticker(SingleResponse {
471            data: Ticker {
472                ask: dec!(65972.9),
473                ask_quantity: dec!(39.67506683),
474                bid: dec!(65972.8),
475                bid_quantity: dec!(0.10000000),
476                change: dec!(4412.1),
477                change_pct: dec!(7.17),
478                high: dec!(66450.0),
479                last: dec!(65972.9),
480                low: dec!(61325.4),
481                symbol: "BTC/USD".to_string(),
482                volume: dec!(4216.61829370),
483                vwap: dec!(64275.2),
484            },
485        }));
486
487        ParseIncomingTest::new()
488            .with_incoming(ticker_snapshot)
489            .expect_message(expected_snapshot)
490            .test()
491            .await;
492    }
493
494    #[tokio::test]
495    async fn test_ticker_update() {
496        let ticker_update = r#"{
497        "channel":"ticker",
498        "type":"update",
499        "data":[{
500            "symbol":"BTC/USD",
501            "bid":65843.7,
502            "bid_qty":12.31628629,
503            "ask":65843.8,
504            "ask_qty":0.31232000,
505            "last":65843.7,
506            "volume":4182.59447976,
507            "vwap":64223.4,
508            "low":61325.4,
509            "high":66450.0,
510            "change":4213.8,
511            "change_pct":6.84
512        }]
513    }"#
514        .to_string();
515
516        let expected_update = WssMessage::Channel(ChannelMessage::Ticker(SingleResponse {
517            data: Ticker {
518                ask: dec!(65843.8),
519                ask_quantity: dec!(0.31232000),
520                bid: dec!(65843.7),
521                bid_quantity: dec!(12.31628629),
522                change: dec!(4213.8),
523                change_pct: dec!(6.84),
524                high: dec!(66450.0),
525                last: dec!(65843.7),
526                low: dec!(61325.4),
527                symbol: "BTC/USD".to_string(),
528                volume: dec!(4182.59447976),
529                vwap: dec!(64223.4),
530            },
531        }));
532
533        ParseIncomingTest::new()
534            .with_incoming(ticker_update)
535            .expect_message(expected_update)
536            .test()
537            .await;
538    }
539
540    #[tokio::test]
541    async fn test_book_snapshot() {
542        let book_snapshot = r#"{
543        "channel":"book",
544        "type":"snapshot",
545        "data":[{
546            "symbol":"BTC/USD",
547            "bids":[
548                {"price":66788.0,"qty":3.21926649},
549                {"price":66787.5,"qty":0.44916298},
550                {"price":66787.4,"qty":0.05992580},
551                {"price":66785.3,"qty":0.01496904},
552                {"price":66785.2,"qty":0.86989511}
553            ],
554            "asks":[
555                {"price":66788.1,"qty":1.67939137},
556                {"price":66788.4,"qty":1.49726637},
557                {"price":66790.0,"qty":1.49723133},
558                {"price":66791.1,"qty":0.01100000},
559                {"price":66792.6,"qty":1.49717197}
560            ],
561            "checksum":2330500275
562        }]
563    }"#
564        .to_string();
565
566        let expected_snapshot = WssMessage::Channel(ChannelMessage::Orderbook(SingleResponse {
567            data: L2::Orderbook(Orderbook {
568                symbol: "BTC/USD".to_string(),
569                checksum: 2330500275,
570                bids: vec![
571                    BidAsk {
572                        price: dec!(66788.0),
573                        quantity: dec!(3.21926649),
574                    },
575                    BidAsk {
576                        price: dec!(66787.5),
577                        quantity: dec!(0.44916298),
578                    },
579                    BidAsk {
580                        price: dec!(66787.4),
581                        quantity: dec!(0.05992580),
582                    },
583                    BidAsk {
584                        price: dec!(66785.3),
585                        quantity: dec!(0.01496904),
586                    },
587                    BidAsk {
588                        price: dec!(66785.2),
589                        quantity: dec!(0.86989511),
590                    },
591                ],
592                asks: vec![
593                    BidAsk {
594                        price: dec!(66788.1),
595                        quantity: dec!(1.67939137),
596                    },
597                    BidAsk {
598                        price: dec!(66788.4),
599                        quantity: dec!(1.49726637),
600                    },
601                    BidAsk {
602                        price: dec!(66790.0),
603                        quantity: dec!(1.49723133),
604                    },
605                    BidAsk {
606                        price: dec!(66791.1),
607                        quantity: dec!(0.01100000),
608                    },
609                    BidAsk {
610                        price: dec!(66792.6),
611                        quantity: dec!(1.49717197),
612                    },
613                ],
614            }),
615        }));
616
617        ParseIncomingTest::new()
618            .with_incoming(book_snapshot)
619            .expect_message(expected_snapshot)
620            .test()
621            .await;
622    }
623
624    #[tokio::test]
625    async fn test_book_update() {
626        let book_update = r#"{
627        "channel":"book",
628        "type":"update",
629        "data":[{
630            "symbol":"BTC/USD",
631            "bids":[
632                {"price":66786.5,"qty":0.00000000},
633                {"price":66784.5,"qty":0.01470022},
634                {"price":66787.7,"qty":0.12440000}
635            ],
636            "asks":[],
637            "checksum":902440905,
638            "timestamp":"2024-05-19T16:45:24.204654Z"
639        }]
640    }"#
641        .to_string();
642
643        let expected_update = WssMessage::Channel(ChannelMessage::Orderbook(SingleResponse {
644            data: L2::Update(OrderbookUpdate {
645                symbol: "BTC/USD".to_string(),
646                checksum: 902440905,
647                timestamp: "2024-05-19T16:45:24.204654Z".to_string(),
648                bids: vec![
649                    BidAsk {
650                        price: dec!(66786.5),
651                        quantity: dec!(0.00000000),
652                    },
653                    BidAsk {
654                        price: dec!(66784.5),
655                        quantity: dec!(0.01470022),
656                    },
657                    BidAsk {
658                        price: dec!(66787.7),
659                        quantity: dec!(0.12440000),
660                    },
661                ],
662                asks: vec![],
663            }),
664        }));
665
666        ParseIncomingTest::new()
667            .with_incoming(book_update)
668            .expect_message(expected_update)
669            .test()
670            .await;
671    }
672
673    #[tokio::test]
674    async fn test_l3_snapshot() {
675        let l3_snapshot = r#"{
676        "channel":"level3",
677        "type":"snapshot",
678        "data": [{
679        "symbol":"BTC/USD",
680        "checksum":1361442827,
681        "bids":[
682            {"order_id":"OZYA6B-OE3BH-YJ4PY5","limit_price":66579.2,"order_qty":1.35137590,"timestamp":"2024-05-19T18:55:20.910159752Z"},
683            {"order_id":"OIOQ7V-JT5S2-QLIEPO","limit_price":66579.2,"order_qty":0.47905712,"timestamp":"2024-05-19T18:55:20.910276406Z"},
684            {"order_id":"O34I4J-KIE3I-BOT6VC","limit_price":66579.2,"order_qty":0.03003941,"timestamp":"2024-05-19T18:55:23.001943740Z"},
685            {"order_id":"OUOCIK-GA6WX-DSZC2A","limit_price":66574.1,"order_qty":0.45057561,"timestamp":"2024-05-19T18:55:15.431184641Z"}
686        ],
687        "asks":[
688            {"order_id":"OUPTOY-CCUJG-BMAZ5S","limit_price":66579.3,"order_qty":0.07800000,"timestamp":"2024-05-19T18:55:22.531833732Z"},
689            {"order_id":"OFUNE7-IGNAY-5UATGI","limit_price":66581.5,"order_qty":1.50192021,"timestamp":"2024-05-19T18:55:25.967603045Z"},
690            {"order_id":"ORCUC4-UGIUC-MT5KBA","limit_price":66583.7,"order_qty":0.87745184,"timestamp":"2024-05-19T18:55:18.938264721Z"}
691        ]
692    }]}"#.to_string();
693
694        let expected_snapshot = WssMessage::Channel(ChannelMessage::L3(SingleResponse {
695            data: L3::Orderbook(L3Orderbook {
696                symbol: "BTC/USD".to_string(),
697                bids: vec![
698                    L3BidAsk {
699                        order_id: "OZYA6B-OE3BH-YJ4PY5".to_string(),
700                        limit_price: dec!(66579.2),
701                        order_quantity: dec!(1.35137590),
702                        timestamp: "2024-05-19T18:55:20.910159752Z".to_string(),
703                    },
704                    L3BidAsk {
705                        order_id: "OIOQ7V-JT5S2-QLIEPO".to_string(),
706                        limit_price: dec!(66579.2),
707                        order_quantity: dec!(0.47905712),
708                        timestamp: "2024-05-19T18:55:20.910276406Z".to_string(),
709                    },
710                    L3BidAsk {
711                        order_id: "O34I4J-KIE3I-BOT6VC".to_string(),
712                        limit_price: dec!(66579.2),
713                        order_quantity: dec!(0.03003941),
714                        timestamp: "2024-05-19T18:55:23.001943740Z".to_string(),
715                    },
716                    L3BidAsk {
717                        order_id: "OUOCIK-GA6WX-DSZC2A".to_string(),
718                        limit_price: dec!(66574.1),
719                        order_quantity: dec!(0.45057561),
720                        timestamp: "2024-05-19T18:55:15.431184641Z".to_string(),
721                    },
722                ],
723                asks: vec![
724                    L3BidAsk {
725                        order_id: "OUPTOY-CCUJG-BMAZ5S".to_string(),
726                        limit_price: dec!(66579.3),
727                        order_quantity: dec!(0.07800000),
728                        timestamp: "2024-05-19T18:55:22.531833732Z".to_string(),
729                    },
730                    L3BidAsk {
731                        order_id: "OFUNE7-IGNAY-5UATGI".to_string(),
732                        limit_price: dec!(66581.5),
733                        order_quantity: dec!(1.50192021),
734                        timestamp: "2024-05-19T18:55:25.967603045Z".to_string(),
735                    },
736                    L3BidAsk {
737                        order_id: "ORCUC4-UGIUC-MT5KBA".to_string(),
738                        limit_price: dec!(66583.7),
739                        order_quantity: dec!(0.87745184),
740                        timestamp: "2024-05-19T18:55:18.938264721Z".to_string(),
741                    },
742                ],
743                checksum: 1361442827,
744            }),
745        }));
746
747        ParseIncomingTest::new()
748            .with_incoming(l3_snapshot)
749            .expect_message(expected_snapshot)
750            .test()
751            .await;
752    }
753
754    #[tokio::test]
755    async fn test_l3_update() {
756        let l3_update = r#"{
757        "channel":"level3",
758        "type":"update",
759        "data":[{
760            "checksum":2143854316,
761            "symbol":"BTC/USD",
762            "bids":[
763                {
764                    "event":"delete",
765                    "order_id":"O7SO4Y-RHRAK-GGAHJE",
766                    "limit_price":66567.3,
767                    "order_qty":0.22540000,
768                    "timestamp":"2024-05-19T18:59:46.541105556Z"
769                },
770                {
771                    "event":"add",
772                    "order_id":"OI2XQ5-6JUYI-A5NI6J",
773                    "limit_price":66566.9,
774                    "order_qty":2.82230268,
775                    "timestamp":"2024-05-19T18:59:44.900460701Z"
776                }
777            ],
778            "asks":[]
779        }]
780    }"#
781        .to_string();
782
783        let expected_update = WssMessage::Channel(ChannelMessage::L3(SingleResponse {
784            data: L3::Update(L3OrderbookUpdate {
785                symbol: "BTC/USD".to_string(),
786                bids: vec![
787                    L3BidAskUpdate {
788                        event: OrderbookEvent::Delete,
789                        order_id: "O7SO4Y-RHRAK-GGAHJE".to_string(),
790                        limit_price: dec!(66567.3),
791                        order_quantity: dec!(0.22540000),
792                        timestamp: "2024-05-19T18:59:46.541105556Z".to_string(),
793                    },
794                    L3BidAskUpdate {
795                        event: OrderbookEvent::Add,
796                        order_id: "OI2XQ5-6JUYI-A5NI6J".to_string(),
797                        limit_price: dec!(66566.9),
798                        order_quantity: dec!(2.82230268),
799                        timestamp: "2024-05-19T18:59:44.900460701Z".to_string(),
800                    },
801                ],
802                asks: vec![],
803                checksum: 2143854316,
804            }),
805        }));
806
807        ParseIncomingTest::new()
808            .with_incoming(l3_update)
809            .expect_message(expected_update)
810            .test()
811            .await;
812    }
813
814    #[tokio::test]
815    async fn test_candles_snapshot() {
816        let candles_snapshot = r#"{
817        "channel":"ohlc",
818        "type":"snapshot",
819        "timestamp":"2024-05-17T11:21:16.318303322Z",
820        "data":[
821            {"symbol":"ETH/USD","open":3027.80,"high":3027.80,"low":3026.13,"close":3026.13,"trades":9,"volume":13.31603062,"vwap":3027.01,"interval_begin":"2024-05-17T11:12:00.000000000Z","interval":1,"timestamp":"2024-05-17T11:13:00.000000Z"},
822            {"symbol":"ETH/USD","open":3026.46,"high":3026.47,"low":3026.46,"close":3026.47,"trades":4,"volume":2.14044498,"vwap":3026.46,"interval_begin":"2024-05-17T11:13:00.000000000Z","interval":1,"timestamp":"2024-05-17T11:14:00.000000Z"}
823        ]
824    }"#
825            .to_string();
826
827        let expected_snapshot = WssMessage::Channel(ChannelMessage::Ohlc(MarketDataResponse {
828            data: vec![
829                Ohlc {
830                    symbol: "ETH/USD".to_string(),
831                    open: dec!(3027.80),
832                    high: dec!(3027.80),
833                    low: dec!(3026.13),
834                    close: dec!(3026.13),
835                    vwap: dec!(3027.01),
836                    trades: 9,
837                    volume: dec!(13.31603062),
838                    interval_begin: "2024-05-17T11:12:00.000000000Z".to_string(),
839                    interval: 1,
840                },
841                Ohlc {
842                    symbol: "ETH/USD".to_string(),
843                    open: dec!(3026.46),
844                    high: dec!(3026.47),
845                    low: dec!(3026.46),
846                    close: dec!(3026.47),
847                    vwap: dec!(3026.46),
848                    trades: 4,
849                    volume: dec!(2.14044498),
850                    interval_begin: "2024-05-17T11:13:00.000000000Z".to_string(),
851                    interval: 1,
852                },
853            ],
854        }));
855
856        ParseIncomingTest::new()
857            .with_incoming(candles_snapshot)
858            .expect_message(expected_snapshot)
859            .test()
860            .await;
861    }
862
863    #[tokio::test]
864    async fn test_trade_snapshot() {
865        let trade_snapshot = r#"{
866        "channel":"trade",
867        "type":"snapshot",
868        "data":[
869            {"symbol":"BTC/USD","side":"sell","price":68466.9,"qty":0.01919415,"ord_type":"market","trade_id":70635251,"timestamp":"2024-05-27T12:33:10.826003Z"},
870            {"symbol":"BTC/USD","side":"buy","price":68471.2,"qty":0.00007723,"ord_type":"limit","trade_id":70635252,"timestamp":"2024-05-27T12:33:10.980704Z"}
871        ]
872    }"#.to_string();
873
874        let expected_snapshot = WssMessage::Channel(ChannelMessage::Trade(MarketDataResponse {
875            data: vec![
876                Trade {
877                    symbol: "BTC/USD".to_string(),
878                    side: BuySell::Sell,
879                    quantity: dec!(0.01919415),
880                    price: dec!(68466.9),
881                    order_type: MarketLimit::Market,
882                    trade_id: 70635251,
883                    timestamp: "2024-05-27T12:33:10.826003Z".to_string(),
884                },
885                Trade {
886                    symbol: "BTC/USD".to_string(),
887                    side: BuySell::Buy,
888                    quantity: dec!(0.00007723),
889                    price: dec!(68471.2),
890                    order_type: MarketLimit::Limit,
891                    trade_id: 70635252,
892                    timestamp: "2024-05-27T12:33:10.980704Z".to_string(),
893                },
894            ],
895        }));
896
897        ParseIncomingTest::new()
898            .with_incoming(trade_snapshot)
899            .expect_message(expected_snapshot)
900            .test()
901            .await;
902    }
903
904    #[tokio::test]
905    async fn test_trade_update() {
906        let trade_update = r#"{
907        "channel":"trade",
908        "type":"update",
909        "data":[
910            {"symbol":"BTC/USD","side":"buy","price":68500.0,"qty":0.01044926,"ord_type":"limit","trade_id":70635299,"timestamp":"2024-05-27T12:43:11.798009Z"},
911            {"symbol":"BTC/USD","side":"buy","price":68500.0,"qty":0.00483192,"ord_type":"limit","trade_id":70635300,"timestamp":"2024-05-27T12:43:11.798009Z"}
912        ]
913    }"#.to_string();
914
915        let expected_update = WssMessage::Channel(ChannelMessage::Trade(MarketDataResponse {
916            data: vec![
917                Trade {
918                    symbol: "BTC/USD".to_string(),
919                    side: BuySell::Buy,
920                    quantity: dec!(0.01044926),
921                    price: dec!(68500.0),
922                    order_type: MarketLimit::Limit,
923                    trade_id: 70635299,
924                    timestamp: "2024-05-27T12:43:11.798009Z".to_string(),
925                },
926                Trade {
927                    symbol: "BTC/USD".to_string(),
928                    side: BuySell::Buy,
929                    quantity: dec!(0.00483192),
930                    price: dec!(68500.0),
931                    order_type: MarketLimit::Limit,
932                    trade_id: 70635300,
933                    timestamp: "2024-05-27T12:43:11.798009Z".to_string(),
934                },
935            ],
936        }));
937
938        ParseIncomingTest::new()
939            .with_incoming(trade_update)
940            .expect_message(expected_update)
941            .test()
942            .await;
943    }
944
945    #[tokio::test]
946    async fn test_instruments_snapshot() {
947        let instrument_snapshot = r#"{
948        "channel":"instrument",
949        "type":"snapshot",
950        "data":{
951            "assets":[
952                {"id":"USD","status":"enabled","precision":4,"precision_display":2,"borrowable":true,"collateral_value":1.00,"margin_rate":0.025000},
953                {"id":"EUR","status":"enabled","precision":4,"precision_display":2,"borrowable":true,"collateral_value":1.00,"margin_rate":0.020000},
954                {"id":"ETH","status":"enabled","precision":10,"precision_display":5,"borrowable":true,"collateral_value":1.00,"margin_rate":0.020000}
955            ],
956            "pairs": [
957                {"symbol":"EUR/USD","base":"EUR","quote":"USD","status":"online","qty_precision":8,"qty_increment":0.00000001,"price_precision":5,"cost_precision":5,"marginable":false,"has_index":true,"cost_min":0.50,"tick_size":0.00001,"price_increment":0.00001,"qty_min":0.50000000},
958                {"symbol":"ETH/BTC","base":"ETH","quote":"BTC","status":"online","qty_precision":8,"qty_increment":0.00000001,"price_precision":5,"cost_precision":10,"marginable":true,"has_index":true,"cost_min":0.00002,"margin_initial":0.20,"position_limit_long":1000,"position_limit_short":600,"tick_size":0.00001,"price_increment":0.00001,"qty_min":0.00200000}
959            ]
960        }
961    }"#.to_string();
962
963        let expected_snapshot =
964            WssMessage::Channel(ChannelMessage::Instrument(MarketDataResponse {
965                data: Instruments {
966                    assets: vec![
967                        Asset {
968                            id: "USD".to_string(),
969                            margin_rate: Some(dec!(0.025000)),
970                            precision: 4,
971                            precision_display: 2,
972                            status: AssetStatus::Enabled,
973                            borrowable: true,
974                            collateral_value: dec!(1.0),
975                        },
976                        Asset {
977                            id: "EUR".to_string(),
978                            margin_rate: Some(dec!(0.020000)),
979                            precision: 4,
980                            precision_display: 2,
981                            status: AssetStatus::Enabled,
982                            borrowable: true,
983                            collateral_value: dec!(1.0),
984                        },
985                        Asset {
986                            id: "ETH".to_string(),
987                            margin_rate: Some(dec!(0.020000)),
988                            precision: 10,
989                            precision_display: 5,
990                            status: AssetStatus::Enabled,
991                            borrowable: true,
992                            collateral_value: dec!(1.0),
993                        },
994                    ],
995                    pairs: vec![
996                        Pair {
997                            base: "EUR".to_string(),
998                            quote: "USD".to_string(),
999                            cost_min: dec!(0.50),
1000                            cost_precision: 5,
1001                            has_index: true,
1002                            margin_initial: None,
1003                            marginable: false,
1004                            position_limit_long: None,
1005                            position_limit_short: None,
1006                            price_increment: dec!(0.00001),
1007                            price_precision: 5,
1008                            quantity_increment: dec!(0.00000001),
1009                            quantity_min: dec!(0.50),
1010                            quantity_precision: 8,
1011                            status: PairStatus::Online,
1012                            symbol: "EUR/USD".to_string(),
1013                        },
1014                        Pair {
1015                            base: "ETH".to_string(),
1016                            quote: "BTC".to_string(),
1017                            cost_min: dec!(0.00002),
1018                            cost_precision: 10,
1019                            has_index: true,
1020                            margin_initial: Some(dec!(0.2)),
1021                            marginable: true,
1022                            position_limit_long: Some(1000),
1023                            position_limit_short: Some(600),
1024                            price_increment: dec!(0.00001),
1025                            price_precision: 5,
1026                            quantity_increment: dec!(0.00000001),
1027                            quantity_min: dec!(0.002),
1028                            quantity_precision: 8,
1029                            status: PairStatus::Online,
1030                            symbol: "ETH/BTC".to_string(),
1031                        },
1032                    ],
1033                },
1034            }));
1035
1036        ParseIncomingTest::new()
1037            .with_incoming(instrument_snapshot)
1038            .expect_message(expected_snapshot)
1039            .test()
1040            .await;
1041    }
1042
1043    #[tokio::test]
1044    async fn test_execution_subscription() {
1045        let mut execution_params = ExecutionSubscription::new(Token::new("someToken".to_string()));
1046        execution_params.snapshot_trades = Some(true);
1047        execution_params.snapshot_orders = Some(true);
1048
1049        let subscription = Message::new_subscription(execution_params, 0);
1050
1051        CallResponseTest::builder()
1052            .match_on(get_expected_execution_subscription())
1053            .respond_with(get_execution_subscription_response())
1054            .send(subscription)
1055            .expect(get_expected_execution_message())
1056            .build()
1057            .test()
1058            .await;
1059    }
1060
1061    #[tokio::test]
1062    async fn test_balances_subscription() {
1063        let mut balances_params = BalancesSubscription::new(Token::new("anotherToken".to_string()));
1064        balances_params.snapshot = Some(true);
1065
1066        let subscription = Message::new_subscription(balances_params, 10312008);
1067
1068        CallResponseTest::builder()
1069            .match_on(get_expected_balances_subscription())
1070            .respond_with(get_balances_subscription_response())
1071            .send(subscription)
1072            .expect(get_expected_balances_message())
1073            .build()
1074            .test()
1075            .await;
1076    }
1077
1078    #[tokio::test]
1079    async fn test_ticker_subscription() {
1080        let ticker_params = TickerSubscription::new(vec!["BTC/USD".into()]);
1081
1082        let subscription = Message::new_subscription(ticker_params, 42);
1083
1084        CallResponseTest::builder()
1085            .match_on(get_expected_ticker_subscription())
1086            .respond_with(get_ticker_subscription_response())
1087            .send(subscription)
1088            .expect(get_expected_ticker_message())
1089            .build()
1090            .test()
1091            .await;
1092    }
1093
1094    #[tokio::test]
1095    async fn test_book_subscription() {
1096        let mut book_params = BookSubscription::new(vec!["BTC/USD".into()]);
1097        book_params.depth = Some(10);
1098        book_params.snapshot = Some(true);
1099
1100        let subscription = Message::new_subscription(book_params, 11);
1101
1102        CallResponseTest::builder()
1103            .match_on(get_expected_book_subscription())
1104            .respond_with(get_book_subscription_response())
1105            .send(subscription)
1106            .expect(get_expected_book_message())
1107            .build()
1108            .test()
1109            .await;
1110    }
1111
1112    #[tokio::test]
1113    async fn test_l3_subscription() {
1114        let mut book_params =
1115            BookSubscription::new_l3(vec!["BTC/USD".into()], Token::new("someToken".to_string()));
1116        book_params.snapshot = Some(true);
1117
1118        let subscription = Message::new_subscription(book_params, 99);
1119
1120        CallResponseTest::builder()
1121            .match_on(get_expected_l3_subscription())
1122            .respond_with(get_l3_subscription_response())
1123            .send(subscription)
1124            .expect(get_expected_l3_message())
1125            .build()
1126            .test()
1127            .await;
1128    }
1129
1130    #[tokio::test]
1131    async fn test_ohlc_subscription() {
1132        let ohlc_params = OhlcSubscription::new(vec!["ETH/USD".into()], 60);
1133
1134        let subscription = Message::new_subscription(ohlc_params, 121);
1135
1136        CallResponseTest::builder()
1137            .match_on(get_expected_ohlc_subscription())
1138            .respond_with(get_ohlc_subscription_response())
1139            .send(subscription)
1140            .expect(get_expected_ohlc_message())
1141            .build()
1142            .test()
1143            .await;
1144    }
1145
1146    #[tokio::test]
1147    async fn test_trade_subscription() {
1148        let trade_params = TradesSubscription::new(vec!["BTC/USD".into()]);
1149
1150        let subscription = Message::new_subscription(trade_params, 0);
1151
1152        CallResponseTest::builder()
1153            .match_on(get_expected_trade_subscription())
1154            .respond_with(get_trade_subscription_response())
1155            .send(subscription)
1156            .expect(get_expected_trade_message())
1157            .build()
1158            .test()
1159            .await;
1160    }
1161
1162    #[tokio::test]
1163    async fn test_instruments_subscription() {
1164        let instruments_params = InstrumentsSubscription::new(true);
1165
1166        let subscription = Message::new_subscription(instruments_params, 0);
1167
1168        CallResponseTest::builder()
1169            .match_on(get_expected_instruments_subscription())
1170            .respond_with(get_instruments_subscription_response())
1171            .send(subscription)
1172            .expect(get_expected_instruments_message())
1173            .build()
1174            .test()
1175            .await;
1176    }
1177
1178    #[tokio::test]
1179    async fn test_add_order() {
1180        let expected_request = json!({"method":"add_order","params":{"order_type":"limit","side":"buy","symbol":"USDC/USD","limit_price":0.95,"time_in_force":"ioc","order_qty":5.0,"post_only":false,"fee_preference":"quote","token":"aToken","cl_ord_id":"client-zero","sender_sub_id":"sender-one"},"req_id":0});
1181        let response = r#"{"method":"add_order","req_id":0,"result":{"order_id":"OPS23M-VS41G-DDE5Z2","cl_ord_id":"client-zero"},"success":true,"time_in":"2024-05-18T12:05:50.293682Z","time_out":"2024-05-18T12:05:50.300542Z"}"#.to_string();
1182        let expected_response = WssMessage::Method(AddOrder(ResultResponse {
1183            result: Some(AddOrderResult {
1184                order_id: "OPS23M-VS41G-DDE5Z2".to_string(),
1185                order_user_ref: None,
1186                warning: None,
1187                client_order_id: Some("client-zero".to_string()),
1188            }),
1189            error: None,
1190            success: true,
1191            req_id: 0,
1192            time_in: "2024-05-18T12:05:50.293682Z".to_string(),
1193            time_out: "2024-05-18T12:05:50.300542Z".to_string(),
1194        }));
1195
1196        let add_order: AddOrderParams = AddOrderParams {
1197            order_type: OrderType::Limit,
1198            side: BuySell::Buy,
1199            symbol: "USDC/USD".to_string(),
1200            limit_price: Some(dec!(0.95)),
1201            limit_price_type: None,
1202            triggers: None,
1203            time_in_force: Some(TimeInForceV2::IOC),
1204            order_quantity: dec!(5.0),
1205            margin: None,
1206            post_only: Some(false),
1207            reduce_only: None,
1208            effective_time: None,
1209            expire_time: None,
1210            deadline: None,
1211            order_user_ref: None,
1212            conditional: None,
1213            display_quantity: None,
1214            fee_preference: Some(FeePreference::Quote),
1215            no_market_price_protection: None,
1216            stp_type: None,
1217            cash_order_quantity: None,
1218            validate: None,
1219            sender_sub_id: Some("sender-one".to_string()),
1220            token: Token::new("aToken".to_string()),
1221            client_order_id: Some("client-zero".to_string()),
1222        };
1223
1224        let message = Message {
1225            method: "add_order".to_string(),
1226            params: add_order,
1227            req_id: 0,
1228        };
1229
1230        CallResponseTest::builder()
1231            .match_on(expected_request)
1232            .respond_with(response)
1233            .send(message)
1234            .expect(expected_response)
1235            .build()
1236            .test()
1237            .await;
1238    }
1239
1240    #[tokio::test]
1241    async fn test_amend_order() {
1242        let expected_request = json!({"method":"amend_order","params":{"order_id":"BQS60L-EGW18-UPAK9U","order_qty":5.1,"limit_price":0.96,"post_only":false,"token":"aToken"},"req_id":0});
1243        let response = r#"{"method":"amend_order","req_id":0,"result":{"amend_id":"1M2JV8-OEJZD-G5GSBF","order_id":"BQS60L-EGW18-UPAK9U"},"success":true,"time_in":"2024-10-11T12:12:21.003873Z","time_out":"2024-10-11T12:12:21.005064Z"}"#.to_string();
1244        let expected_response = WssMessage::Method(AmendOrder(ResultResponse {
1245            result: Some(AmendOrderResult {
1246                amend_id: "1M2JV8-OEJZD-G5GSBF".to_string(),
1247                order_id: Some("BQS60L-EGW18-UPAK9U".to_string()),
1248                client_order_id: None,
1249                warnings: None,
1250            }),
1251            error: None,
1252            success: true,
1253            req_id: 0,
1254            time_in: "2024-10-11T12:12:21.003873Z".to_string(),
1255            time_out: "2024-10-11T12:12:21.005064Z".to_string(),
1256        }));
1257
1258        let amend_order = AmendOrderParams {
1259            order_id: Some("BQS60L-EGW18-UPAK9U".to_string()),
1260            limit_price: Some(dec!(0.96)),
1261            limit_price_type: None,
1262            post_only: Some(false),
1263            trigger_price: None,
1264            trigger_price_type: None,
1265            deadline: None,
1266            token: Token::new("aToken".to_string()),
1267            client_order_id: None,
1268            order_quantity: dec!(5.1),
1269            display_quantity: None,
1270        };
1271
1272        let message = Message {
1273            method: "amend_order".to_string(),
1274            params: amend_order,
1275            req_id: 0,
1276        };
1277
1278        CallResponseTest::builder()
1279            .match_on(expected_request)
1280            .respond_with(response)
1281            .send(message)
1282            .expect(expected_response)
1283            .build()
1284            .test()
1285            .await;
1286    }
1287
1288    #[tokio::test]
1289    async fn test_amend_order_error_response() {
1290        let response = r#"{"error":"Limit_price field must be a number_float","method":"amend_order","req_id":0,"success":false,"time_in":"2024-10-13T13:31:28.636431Z","time_out":"2024-10-13T13:31:28.636488Z"}"#;
1291
1292        let expected_message = WssMessage::Method(AmendOrder(ResultResponse {
1293            result: None,
1294            error: Some("Limit_price field must be a number_float".to_string()),
1295            success: false,
1296            req_id: 0,
1297            time_in: "2024-10-13T13:31:28.636431Z".to_string(),
1298            time_out: "2024-10-13T13:31:28.636488Z".to_string(),
1299        }));
1300
1301        ParseIncomingTest::new()
1302            .with_incoming(response.to_string())
1303            .expect_message(expected_message)
1304            .test()
1305            .await;
1306    }
1307
1308    #[tokio::test]
1309    async fn test_edit_order() {
1310        let expected_request = json!({"method":"edit_order","params":{"limit_price":0.93,"order_id":"K1FF7H-A13AR-Q1S9Z6","order_qty":6.1,"symbol":"USDC/USD","token":"someToken"},"req_id":0});
1311        let response = r#"{"method":"edit_order","req_id":0,"result":{"order_id":"7FIK6B-S15X0-DPJJTH","original_order_id":"K1FF7H-A13AR-Q1S9Z6"},"success":true,"time_in":"2024-05-19T12:12:30.171615Z","time_out":"2024-05-19T12:12:30.173877Z"}"#.to_string();
1312        let expected_response = WssMessage::Method(EditOrder(ResultResponse {
1313            result: Some(EditOrderResult {
1314                order_id: "7FIK6B-S15X0-DPJJTH".to_string(),
1315                original_order_id: "K1FF7H-A13AR-Q1S9Z6".to_string(),
1316                warning: None,
1317            }),
1318            error: None,
1319            success: true,
1320            req_id: 0,
1321            time_in: "2024-05-19T12:12:30.171615Z".to_string(),
1322            time_out: "2024-05-19T12:12:30.173877Z".to_string(),
1323        }));
1324
1325        let edit_order = EditOrderParams {
1326            symbol: "USDC/USD".to_string(),
1327            limit_price: Some(dec!(0.93)),
1328            triggers: None,
1329            order_quantity: Some(dec!(6.1)),
1330            post_only: None,
1331            reduce_only: None,
1332            deadline: None,
1333            order_user_ref: None,
1334            display_quantity: None,
1335            fee_preference: None,
1336            no_market_price_protection: None,
1337            validate: None,
1338            token: Token::new("someToken".to_string()),
1339            order_id: "K1FF7H-A13AR-Q1S9Z6".to_string(),
1340        };
1341
1342        let message = Message {
1343            method: "edit_order".to_string(),
1344            params: edit_order,
1345            req_id: 0,
1346        };
1347
1348        CallResponseTest::builder()
1349            .match_on(expected_request)
1350            .respond_with(response)
1351            .send(message)
1352            .expect(expected_response)
1353            .build()
1354            .test()
1355            .await;
1356    }
1357
1358    #[tokio::test]
1359    async fn test_cancel_order() {
1360        let expected_request = json!({"method":"cancel_order","params":{"order_id":["1V7PZA-L5RIM-RX2G6B"],"token":"thatToken"},"req_id":0});
1361        let response = r#"{"method":"cancel_order","req_id":0,"result":{"order_id":"1V7PZA-L5RIM-RX2G6B"},"success":true,"time_in":"2024-05-19T19:18:44.987402Z","time_out":"2024-05-19T19:18:44.989756Z"}"#.to_string();
1362        let expected_response = WssMessage::Method(CancelOrder(ResultResponse {
1363            result: Some(CancelOrderResult {
1364                order_id: Some("1V7PZA-L5RIM-RX2G6B".to_string()),
1365                warning: None,
1366                client_order_id: None,
1367            }),
1368            error: None,
1369            success: true,
1370            req_id: 0,
1371            time_in: "2024-05-19T19:18:44.987402Z".to_string(),
1372            time_out: "2024-05-19T19:18:44.989756Z".to_string(),
1373        }));
1374
1375        let cancel_order = CancelOrderParams {
1376            order_id: Some(vec!["1V7PZA-L5RIM-RX2G6B".into()]),
1377            client_order_id: None,
1378            order_user_ref: None,
1379            token: Token::new("thatToken".to_string()),
1380        };
1381
1382        let message = Message {
1383            method: "cancel_order".to_string(),
1384            params: cancel_order,
1385            req_id: 0,
1386        };
1387
1388        CallResponseTest::builder()
1389            .match_on(expected_request)
1390            .respond_with(response)
1391            .send(message)
1392            .expect(expected_response)
1393            .build()
1394            .test()
1395            .await;
1396    }
1397
1398    #[tokio::test]
1399    async fn test_cancel_order_by_client_order_id() {
1400        let expected_request = json!({"method":"cancel_order","params":{"cl_ord_id":["a-uuid"],"token":"thatToken"},"req_id":0});
1401        let response = r#"{"method":"cancel_order","req_id":0,"result":{"cl_ord_id":"a-uuid"},"success":true,"time_in":"2024-05-19T19:18:44.987402Z","time_out":"2024-05-19T19:18:44.989756Z"}"#.to_string();
1402        let expected_response = WssMessage::Method(CancelOrder(ResultResponse {
1403            result: Some(CancelOrderResult {
1404                order_id: None,
1405                warning: None,
1406                client_order_id: Some("a-uuid".to_string()),
1407            }),
1408            error: None,
1409            success: true,
1410            req_id: 0,
1411            time_in: "2024-05-19T19:18:44.987402Z".to_string(),
1412            time_out: "2024-05-19T19:18:44.989756Z".to_string(),
1413        }));
1414
1415        let cancel_order = CancelOrderParams {
1416            order_id: None,
1417            client_order_id: Some(vec!["a-uuid".to_string()]),
1418            order_user_ref: None,
1419            token: Token::new("thatToken".to_string()),
1420        };
1421
1422        let message = Message {
1423            method: "cancel_order".to_string(),
1424            params: cancel_order,
1425            req_id: 0,
1426        };
1427
1428        CallResponseTest::builder()
1429            .match_on(expected_request)
1430            .respond_with(response)
1431            .send(message)
1432            .expect(expected_response)
1433            .build()
1434            .test()
1435            .await;
1436    }
1437
1438    #[tokio::test]
1439    async fn test_cancel_all_orders() {
1440        let expected_request =
1441            json!({"method":"cancel_all","params":{"token":"thisToken"},"req_id":0});
1442        let response = r#"{"method":"cancel_all","req_id":0,"result":{"count":0},"success":true,"time_in":"2024-05-19T11:42:13.815662Z","time_out":"2024-05-19T11:42:13.824053Z"}"#.to_string();
1443        let expected_response =
1444            WssMessage::Method(MethodMessage::CancelAllOrders(ResultResponse {
1445                result: Some(CancelAllOrdersResult {
1446                    count: 0,
1447                    warning: None,
1448                }),
1449                error: None,
1450                success: true,
1451                req_id: 0,
1452                time_in: "2024-05-19T11:42:13.815662Z".to_string(),
1453                time_out: "2024-05-19T11:42:13.824053Z".to_string(),
1454            }));
1455
1456        let cancel_all = CancelAllOrdersParams {
1457            token: Token::new("thisToken".to_string()),
1458        };
1459
1460        let message = Message {
1461            method: "cancel_all".to_string(),
1462            params: cancel_all,
1463            req_id: 0,
1464        };
1465
1466        CallResponseTest::builder()
1467            .match_on(expected_request)
1468            .respond_with(response)
1469            .send(message)
1470            .expect(expected_response)
1471            .build()
1472            .test()
1473            .await;
1474    }
1475
1476    #[tokio::test]
1477    async fn test_cancel_on_disconnect() {
1478        let expected_request = json!({"method":"cancel_all_orders_after","params":{"timeout":5,"token":"yourToken"},"req_id":0});
1479        let response = r#"{"method":"cancel_all_orders_after","req_id":0,"result":{"currentTime":"2024-05-19T19:22:20Z","triggerTime":"2024-05-19T19:22:25Z"},"success":true,"time_in":"2024-05-19T19:22:19.975239Z","time_out":"2024-05-19T19:22:19.981369Z"}"#.to_string();
1480        let expected_response =
1481            WssMessage::Method(MethodMessage::CancelOnDisconnect(ResultResponse {
1482                result: Some(CancelOnDisconnectResult {
1483                    current_time: "2024-05-19T19:22:20Z".into(),
1484                    warning: None,
1485                    trigger_time: "2024-05-19T19:22:25Z".into(),
1486                }),
1487                error: None,
1488                success: true,
1489                req_id: 0,
1490                time_in: "2024-05-19T19:22:19.975239Z".to_string(),
1491                time_out: "2024-05-19T19:22:19.981369Z".to_string(),
1492            }));
1493
1494        let cancel_on_disconnect = CancelOnDisconnectParams {
1495            timeout: 5,
1496            token: Token::new("yourToken".to_string()),
1497        };
1498
1499        let message = Message {
1500            method: "cancel_all_orders_after".to_string(),
1501            params: cancel_on_disconnect,
1502            req_id: 0,
1503        };
1504
1505        CallResponseTest::builder()
1506            .match_on(expected_request)
1507            .respond_with(response)
1508            .send(message)
1509            .expect(expected_response)
1510            .build()
1511            .test()
1512            .await;
1513    }
1514
1515    #[tokio::test]
1516    async fn test_batch_add() {
1517        let expected_request = json!({"method":"batch_add","params":{"symbol":"USDC/USD","token":"myToken","orders":[{"order_type":"limit","side":"buy","limit_price":0.99,"order_qty":5.0,"post_only":true,"fee_preference":"quote"},{"order_type":"limit","side":"buy","limit_price":0.95,"order_qty":5.0,"post_only":true,"fee_preference":"base"}]},"req_id":0});
1518        let response = r#"{"method":"batch_add","req_id":0,"result":[{"order_id":"JQDNTT-MZEIZ-OZKUDD"},{"order_id":"X67GEK-3VQWM-HPNQ89"}],"success":true,"time_in":"2024-05-19T19:23:21.134538Z","time_out":"2024-05-19T19:23:21.141229Z"}"#.to_string();
1519        let expected_response = WssMessage::Method(MethodMessage::BatchOrder(ResultResponse {
1520            result: Some(vec![
1521                AddOrderResult {
1522                    order_id: "JQDNTT-MZEIZ-OZKUDD".to_string(),
1523                    order_user_ref: None,
1524                    warning: None,
1525                    client_order_id: None,
1526                },
1527                AddOrderResult {
1528                    order_id: "X67GEK-3VQWM-HPNQ89".to_string(),
1529                    order_user_ref: None,
1530                    warning: None,
1531
1532                    client_order_id: None,
1533                },
1534            ]),
1535            error: None,
1536            success: true,
1537            req_id: 0,
1538            time_in: "2024-05-19T19:23:21.134538Z".to_string(),
1539            time_out: "2024-05-19T19:23:21.141229Z".to_string(),
1540        }));
1541
1542        let batch_add = BatchOrderParams {
1543            deadline: None,
1544            symbol: "USDC/USD".to_string(),
1545            token: Token::new("myToken".to_string()),
1546            validate: None,
1547            orders: vec![
1548                BatchOrder {
1549                    order_type: OrderType::Limit,
1550                    side: BuySell::Buy,
1551                    limit_price: Some(dec!(0.99)),
1552                    limit_price_type: None,
1553                    triggers: None,
1554                    time_in_force: None,
1555                    order_quantity: dec!(5.0),
1556                    margin: None,
1557                    post_only: Some(true),
1558                    reduce_only: None,
1559                    effective_time: None,
1560                    expire_time: None,
1561                    order_user_ref: None,
1562                    conditional: None,
1563                    display_quantity: None,
1564                    fee_preference: Some(FeePreference::Quote),
1565                    no_market_price_protection: None,
1566                    stp_type: None,
1567                    cash_order_quantity: None,
1568                    client_order_id: None,
1569                },
1570                BatchOrder {
1571                    order_type: OrderType::Limit,
1572                    side: BuySell::Buy,
1573                    limit_price: Some(dec!(0.95)),
1574                    limit_price_type: None,
1575                    triggers: None,
1576                    time_in_force: None,
1577                    order_quantity: dec!(5.0),
1578                    margin: None,
1579                    post_only: Some(true),
1580                    reduce_only: None,
1581                    effective_time: None,
1582                    expire_time: None,
1583                    order_user_ref: None,
1584                    conditional: None,
1585                    display_quantity: None,
1586                    fee_preference: Some(FeePreference::Base),
1587                    no_market_price_protection: None,
1588                    stp_type: None,
1589                    cash_order_quantity: None,
1590                    client_order_id: None,
1591                },
1592            ],
1593        };
1594
1595        let message = Message {
1596            method: "batch_add".to_string(),
1597            params: batch_add,
1598            req_id: 0,
1599        };
1600
1601        CallResponseTest::builder()
1602            .match_on(expected_request)
1603            .respond_with(response)
1604            .send(message)
1605            .expect(expected_response)
1606            .build()
1607            .test()
1608            .await;
1609    }
1610
1611    #[tokio::test]
1612    async fn test_batch_cancel() {
1613        let expected_request = json!({"method":"batch_cancel","params":{"orders":["IY8YF6-Y6LCR-AMZD7P","XR6VND-GLY6K-DL33TB"],"token":"theirToken"},"req_id":0});
1614        let response = r#"{"method":"batch_cancel","orders_cancelled":2,"req_id":0,"success":true,"time_in":"2024-05-19T19:29:58.063754Z","time_out":"2024-05-19T19:29:58.071569Z"}"#.to_string();
1615        let expected_response =
1616            WssMessage::Method(MethodMessage::BatchCancel(BatchCancelResponse {
1617                orders_cancelled: 2,
1618                error: None,
1619                success: true,
1620                req_id: 0,
1621                time_in: "2024-05-19T19:29:58.063754Z".to_string(),
1622                time_out: "2024-05-19T19:29:58.071569Z".to_string(),
1623                client_order_id: None,
1624            }));
1625
1626        let batch_cancel = BatchCancelParams {
1627            orders: vec!["IY8YF6-Y6LCR-AMZD7P".into(), "XR6VND-GLY6K-DL33TB".into()],
1628            token: Token::new("theirToken".to_string()),
1629            client_order_id: None,
1630        };
1631
1632        let message = Message {
1633            method: "batch_cancel".to_string(),
1634            params: batch_cancel,
1635            req_id: 0,
1636        };
1637
1638        CallResponseTest::builder()
1639            .match_on(expected_request)
1640            .respond_with(response)
1641            .send(message)
1642            .expect(expected_response)
1643            .build()
1644            .test()
1645            .await;
1646    }
1647
1648    #[tokio::test]
1649    async fn test_execution_trades_snapshot() {
1650        let trades_snapshot = r#"{
1651        "channel":"executions",
1652        "type":"snapshot",
1653        "data":[
1654            {"order_id":"NG6PUE-C7MXN-CFCAMC","order_userref":0,"exec_id":"B1Y0D9-6JIJG-W1IB7L","exec_type":"trade","trade_id":37496584,"symbol":"BTC/USD","side":"sell","last_qty":0.00016000,"last_price":63377.2,"liquidity_ind":"t","cost":10.12445,"order_status":"filled","order_type":"limit","sender_sub_id":"some-uuid","timestamp":"2024-04-16T10:54:38.243302Z","fee_usd_equiv":0.04050,"fees":[{"asset":"USD","qty":0.04051}]},
1655            {"order_id":"8G1X9R-F6HH0-R2FYZ0","order_userref":0,"exec_id":"0CVSSH-KVM0J-TCXLSQ","exec_type":"trade","trade_id":2125408,"symbol":"FET/USD","side":"buy","last_qty":25.00000000,"last_price":0.6017,"liquidity_ind":"m","margin":true,"margin_borrow":true,"liquidated":true,"cost":14.013500,"order_status":"filled","order_type":"limit","timestamp":"2024-01-28T21:03:18.167719Z","fee_usd_equiv":0.024028,"fees":[{"asset":"USD","qty":0.024038}]},
1656            {"order_id":"MQUCYY-SX33Q-KX7KCT","order_userref":0,"exec_id":"QEP2P0-DVAJN-VF1UTF","exec_type":"trade","trade_id":35272682,"symbol":"ETH/USD","side":"sell","last_qty":0.01500000,"last_price":2392.41,"liquidity_ind":"t","cost":35.37130,"order_status":"filled","order_type":"market","timestamp":"2024-01-13T12:24:42.541293Z","fee_usd_equiv":0.09327,"fees":[{"asset":"USD","qty":0.09337}]},
1657            {"order_id":"MMNB64-U9T0S-U8W0PJ","order_userref":0,"exec_id":"NG6PUE-C7MXN-CFCAMC","exec_type":"trade","trade_id":112396,"symbol":"BRICK/USD","side":"buy","last_qty":153.25931,"last_price":0.06404,"liquidity_ind":"m","cost":9.262299496,"order_status":"filled","order_type":"limit","timestamp":"2024-01-10T07:14:14.485774Z","fee_usd_equiv":0.015460799,"fees":[{"asset":"USD","qty":0.014460799}]}
1658        ],
1659        "sequence":1
1660    }"#.to_string();
1661
1662        let expected_trades_snapshot = WssMessage::Channel(ChannelMessage::Execution(Response {
1663            data: vec![
1664                ExecutionResult {
1665                    amended: None,
1666                    execution_type: ExecutionType::Trade,
1667                    cash_order_quantity: None,
1668                    contingent: None,
1669                    cost: Some(dec!(10.12445)),
1670                    execution_id: Some("B1Y0D9-6JIJG-W1IB7L".to_string()),
1671                    fees: Some(vec![Fee {
1672                        asset: "USD".to_string(),
1673                        quantity: dec!(0.04051),
1674                    }]),
1675                    liquidity_indicator: Some(MakerTaker::Taker),
1676                    last_price: Some(dec!(63377.2)),
1677                    last_quantity: Some(dec!(0.00016000)),
1678                    average_price: None,
1679                    reason: None,
1680                    cumulative_cost: None,
1681                    cumulative_quantity: None,
1682                    display_quantity: None,
1683                    effective_time: None,
1684                    expire_time: None,
1685                    ext_ord_id: None,
1686                    ext_exec_id: None,
1687                    fee_preference: None,
1688                    fee_usd_equivalent: Some(dec!(0.04050)),
1689                    limit_price: None,
1690                    limit_price_type: None,
1691                    liquidated: None,
1692                    margin: None,
1693                    margin_borrow: None,
1694                    no_market_price_protection: None,
1695                    order_ref_id: None,
1696                    order_id: "NG6PUE-C7MXN-CFCAMC".to_string(),
1697                    order_quantity: None,
1698                    order_type: Some(OrderType::Limit),
1699                    order_status: OrderStatusV2::Filled,
1700                    order_user_ref: Some(0),
1701                    post_only: None,
1702                    position_status: None,
1703                    reduce_only: None,
1704                    sender_sub_id: Some("some-uuid".to_string()),
1705                    side: Some(BuySell::Sell),
1706                    symbol: Some("BTC/USD".to_string()),
1707                    time_in_force: None,
1708                    timestamp: "2024-04-16T10:54:38.243302Z".to_string(),
1709                    trade_id: Some(37496584),
1710                    triggers: None,
1711                    client_order_id: None,
1712                },
1713                ExecutionResult {
1714                    amended: None,
1715                    execution_type: ExecutionType::Trade,
1716                    cash_order_quantity: None,
1717                    contingent: None,
1718                    cost: Some(dec!(14.013500)),
1719                    execution_id: Some("0CVSSH-KVM0J-TCXLSQ".to_string()),
1720                    fees: Some(vec![Fee {
1721                        asset: "USD".to_string(),
1722                        quantity: dec!(0.024038),
1723                    }]),
1724                    liquidity_indicator: Some(MakerTaker::Maker),
1725                    last_price: Some(dec!(0.6017)),
1726                    last_quantity: Some(dec!(25.00000000)),
1727                    average_price: None,
1728                    reason: None,
1729                    cumulative_cost: None,
1730                    cumulative_quantity: None,
1731                    display_quantity: None,
1732                    effective_time: None,
1733                    expire_time: None,
1734                    ext_ord_id: None,
1735                    ext_exec_id: None,
1736                    fee_preference: None,
1737                    fee_usd_equivalent: Some(dec!(0.024028)),
1738                    limit_price: None,
1739                    limit_price_type: None,
1740                    liquidated: Some(true),
1741                    margin: Some(true),
1742                    margin_borrow: Some(true),
1743                    no_market_price_protection: None,
1744                    order_ref_id: None,
1745                    order_id: "8G1X9R-F6HH0-R2FYZ0".to_string(),
1746                    order_quantity: None,
1747                    order_type: Some(OrderType::Limit),
1748                    order_status: OrderStatusV2::Filled,
1749                    order_user_ref: Some(0),
1750                    post_only: None,
1751                    position_status: None,
1752                    reduce_only: None,
1753                    sender_sub_id: None,
1754                    side: Some(BuySell::Buy),
1755                    symbol: Some("FET/USD".to_string()),
1756                    time_in_force: None,
1757                    timestamp: "2024-01-28T21:03:18.167719Z".to_string(),
1758                    trade_id: Some(2125408),
1759                    triggers: None,
1760                    client_order_id: None,
1761                },
1762                ExecutionResult {
1763                    amended: None,
1764                    execution_type: ExecutionType::Trade,
1765                    cash_order_quantity: None,
1766                    contingent: None,
1767                    cost: Some(dec!(35.37130)),
1768                    execution_id: Some("QEP2P0-DVAJN-VF1UTF".to_string()),
1769                    fees: Some(vec![Fee {
1770                        asset: "USD".to_string(),
1771                        quantity: dec!(0.09337),
1772                    }]),
1773                    liquidity_indicator: Some(MakerTaker::Taker),
1774                    last_price: Some(dec!(2392.41)),
1775                    last_quantity: Some(dec!(0.01500000)),
1776                    average_price: None,
1777                    reason: None,
1778                    cumulative_cost: None,
1779                    cumulative_quantity: None,
1780                    display_quantity: None,
1781                    effective_time: None,
1782                    expire_time: None,
1783                    ext_ord_id: None,
1784                    ext_exec_id: None,
1785                    fee_preference: None,
1786                    fee_usd_equivalent: Some(dec!(0.09327)),
1787                    limit_price: None,
1788                    limit_price_type: None,
1789                    liquidated: None,
1790                    margin: None,
1791                    margin_borrow: None,
1792                    no_market_price_protection: None,
1793                    order_ref_id: None,
1794                    order_id: "MQUCYY-SX33Q-KX7KCT".to_string(),
1795                    order_quantity: None,
1796                    order_type: Some(OrderType::Market),
1797                    order_status: OrderStatusV2::Filled,
1798                    order_user_ref: Some(0),
1799                    post_only: None,
1800                    position_status: None,
1801                    reduce_only: None,
1802                    sender_sub_id: None,
1803                    side: Some(BuySell::Sell),
1804                    symbol: Some("ETH/USD".to_string()),
1805                    time_in_force: None,
1806                    timestamp: "2024-01-13T12:24:42.541293Z".to_string(),
1807                    trade_id: Some(35272682),
1808                    triggers: None,
1809                    client_order_id: None,
1810                },
1811                ExecutionResult {
1812                    amended: None,
1813                    execution_type: ExecutionType::Trade,
1814                    cash_order_quantity: None,
1815                    contingent: None,
1816                    cost: Some(dec!(9.262299496)),
1817                    execution_id: Some("NG6PUE-C7MXN-CFCAMC".to_string()),
1818                    fees: Some(vec![Fee {
1819                        asset: "USD".to_string(),
1820                        quantity: dec!(0.014460799),
1821                    }]),
1822                    liquidity_indicator: Some(MakerTaker::Maker),
1823                    last_price: Some(dec!(0.06404)),
1824                    last_quantity: Some(dec!(153.25931)),
1825                    average_price: None,
1826                    reason: None,
1827                    cumulative_cost: None,
1828                    cumulative_quantity: None,
1829                    display_quantity: None,
1830                    effective_time: None,
1831                    expire_time: None,
1832                    ext_ord_id: None,
1833                    ext_exec_id: None,
1834                    fee_preference: None,
1835                    fee_usd_equivalent: Some(dec!(0.015460799)),
1836                    limit_price: None,
1837                    limit_price_type: None,
1838                    liquidated: None,
1839                    margin: None,
1840                    margin_borrow: None,
1841                    no_market_price_protection: None,
1842                    order_ref_id: None,
1843                    order_id: "MMNB64-U9T0S-U8W0PJ".to_string(),
1844                    order_quantity: None,
1845                    order_type: Some(OrderType::Limit),
1846                    order_status: OrderStatusV2::Filled,
1847                    order_user_ref: Some(0),
1848                    post_only: None,
1849                    position_status: None,
1850                    reduce_only: None,
1851                    sender_sub_id: None,
1852                    side: Some(BuySell::Buy),
1853                    symbol: Some("BRICK/USD".to_string()),
1854                    time_in_force: None,
1855                    timestamp: "2024-01-10T07:14:14.485774Z".to_string(),
1856                    trade_id: Some(112396),
1857                    triggers: None,
1858                    client_order_id: None,
1859                },
1860            ],
1861            sequence: 1,
1862        }));
1863
1864        ParseIncomingTest::new()
1865            .with_incoming(trades_snapshot)
1866            .expect_message(expected_trades_snapshot)
1867            .test()
1868            .await;
1869    }
1870
1871    #[tokio::test]
1872    async fn test_execution_order_update_cancelled() {
1873        let cancel = r#"{"channel":"executions","type":"update","data":[{"timestamp":"2024-05-18T12:58:40.165132Z",
1874    "order_status":"canceled","exec_type":"canceled","cum_qty":0.00000000,"cum_cost":0.000000,"fee_usd_equiv":0.000000,
1875    "avg_price":0.00000,"order_userref":0,"cancel_reason":"User requested","reason":"User requested",
1876    "order_id":"KIUEL4-G3PWU-HOJTYU"}],"sequence":143}"#.to_string();
1877
1878        let expected_update_cancel = WssMessage::Channel(ChannelMessage::Execution(Response {
1879            data: vec![ExecutionResult {
1880                amended: None,
1881                execution_type: ExecutionType::Canceled,
1882                cash_order_quantity: None,
1883                contingent: None,
1884                cost: None,
1885                execution_id: None,
1886                fees: None,
1887                liquidity_indicator: None,
1888                last_price: None,
1889                last_quantity: None,
1890                average_price: Some(dec!(0.0)),
1891                reason: Some("User requested".to_string()),
1892                cumulative_cost: Some(dec!(0.0)),
1893                cumulative_quantity: Some(dec!(0.0)),
1894                display_quantity: None,
1895                effective_time: None,
1896                expire_time: None,
1897                ext_ord_id: None,
1898                ext_exec_id: None,
1899                fee_preference: None,
1900                fee_usd_equivalent: Some(dec!(0.0)),
1901                limit_price: None,
1902                limit_price_type: None,
1903                liquidated: None,
1904                margin: None,
1905                margin_borrow: None,
1906                no_market_price_protection: None,
1907                order_ref_id: None,
1908                order_id: "KIUEL4-G3PWU-HOJTYU".to_string(),
1909                order_quantity: None,
1910                order_type: None,
1911                order_status: OrderStatusV2::Canceled,
1912                order_user_ref: Some(0),
1913                post_only: None,
1914                position_status: None,
1915                reduce_only: None,
1916                sender_sub_id: None,
1917                side: None,
1918                symbol: None,
1919                time_in_force: None,
1920                timestamp: "2024-05-18T12:58:40.165132Z".to_string(),
1921                trade_id: None,
1922                triggers: None,
1923                client_order_id: None,
1924            }],
1925            sequence: 143,
1926        }));
1927
1928        ParseIncomingTest::new()
1929            .with_incoming(cancel)
1930            .expect_message(expected_update_cancel)
1931            .test()
1932            .await;
1933    }
1934
1935    #[tokio::test]
1936    async fn test_execution_limit_order_update_pending() {
1937        let pending_new = r#"{"channel":"executions","type":"update","data":[{"order_id":"AHOJQ8-1E72C-8M2VQH","symbol":"ADX/USD",
1938    "order_qty":81.36256082,"cum_cost":0.0000000,"time_in_force":"GTC","exec_type":"pending_new","side":"buy","order_type":"limit",
1939    "order_userref":0,"limit_price_type":"static","limit_price":0.18328,"stop_price":0.00000,"order_status":"pending_new",
1940    "fee_usd_equiv":0.0000000,"fee_ccy_pref":"fciq","timestamp":"2024-05-18T12:01:56.165888Z"}],"sequence":120}"#.to_string();
1941
1942        let expected_update_pending = WssMessage::Channel(ChannelMessage::Execution(Response {
1943            data: vec![ExecutionResult {
1944                amended: None,
1945                execution_type: ExecutionType::PendingNew,
1946                cash_order_quantity: None,
1947                contingent: None,
1948                cost: None,
1949                execution_id: None,
1950                fees: None,
1951                liquidity_indicator: None,
1952                last_price: None,
1953                last_quantity: None,
1954                average_price: None,
1955                reason: None,
1956                cumulative_cost: Some(dec!(0.0)),
1957                cumulative_quantity: None,
1958                display_quantity: None,
1959                effective_time: None,
1960                expire_time: None,
1961                ext_ord_id: None,
1962                ext_exec_id: None,
1963                fee_preference: Some(FeePreference::Quote),
1964                fee_usd_equivalent: Some(dec!(0.0)),
1965                limit_price: Some(dec!(0.18328)),
1966                limit_price_type: Some(PriceType::Static),
1967                liquidated: None,
1968                margin: None,
1969                margin_borrow: None,
1970                no_market_price_protection: None,
1971                order_ref_id: None,
1972                order_id: "AHOJQ8-1E72C-8M2VQH".to_string(),
1973                order_quantity: Some(dec!(81.36256082)),
1974                order_type: Some(OrderType::Limit),
1975                order_status: OrderStatusV2::PendingNew,
1976                order_user_ref: Some(0),
1977                post_only: None,
1978                position_status: None,
1979                reduce_only: None,
1980                sender_sub_id: None,
1981                side: Some(BuySell::Buy),
1982                symbol: Some("ADX/USD".to_string()),
1983                time_in_force: Some(TimeInForce::GTC),
1984                timestamp: "2024-05-18T12:01:56.165888Z".to_string(),
1985                trade_id: None,
1986                triggers: None,
1987                client_order_id: None,
1988            }],
1989            sequence: 120,
1990        }));
1991
1992        ParseIncomingTest::new()
1993            .with_incoming(pending_new)
1994            .expect_message(expected_update_pending)
1995            .test()
1996            .await;
1997    }
1998
1999    #[tokio::test]
2000    async fn test_execution_stop_loss_limit_order_update_pending() {
2001        let pending_new = r#"{"channel":"executions","type":"update","data":[{"order_id":"AHOJQ8-1E72C-8M2VQH","symbol":"ADX/USD",
2002    "order_qty":81.36256082,"cum_cost":0,"time_in_force":"GTC","exec_type":"pending_new","side":"buy","order_type":"stop-loss-limit",
2003    "order_userref":0,"limit_price_type":"static","triggers":{"price":0.2,"price_type":"static","reference":"index","status":"untriggered"},
2004    "stop_price":0.2,"limit_price":0.2,"trigger":"index","order_status":"pending_new","fee_usd_equiv":0,"fee_ccy_pref":"fciq",
2005    "timestamp":"2024-05-18T12:01:56.165888Z"}],"sequence":120}"#.to_string();
2006
2007        let expected_update_pending = WssMessage::Channel(ChannelMessage::Execution(Response {
2008            data: vec![ExecutionResult {
2009                amended: None,
2010                execution_type: ExecutionType::PendingNew,
2011                cash_order_quantity: None,
2012                contingent: None,
2013                cost: None,
2014                execution_id: None,
2015                fees: None,
2016                liquidity_indicator: None,
2017                last_price: None,
2018                last_quantity: None,
2019                average_price: None,
2020                reason: None,
2021                cumulative_cost: Some(dec!(0.0)),
2022                cumulative_quantity: None,
2023                display_quantity: None,
2024                effective_time: None,
2025                expire_time: None,
2026                ext_ord_id: None,
2027                ext_exec_id: None,
2028                fee_preference: Some(FeePreference::Quote),
2029                fee_usd_equivalent: Some(dec!(0.0)),
2030                limit_price: Some(dec!(0.2)),
2031                limit_price_type: Some(PriceType::Static),
2032                liquidated: None,
2033                margin: None,
2034                margin_borrow: None,
2035                no_market_price_protection: None,
2036                order_ref_id: None,
2037                order_id: "AHOJQ8-1E72C-8M2VQH".to_string(),
2038                order_quantity: Some(dec!(81.36256082)),
2039                order_type: Some(OrderType::StopLossLimit),
2040                order_status: OrderStatusV2::PendingNew,
2041                order_user_ref: Some(0),
2042                post_only: None,
2043                position_status: None,
2044                reduce_only: None,
2045                sender_sub_id: None,
2046                side: Some(BuySell::Buy),
2047                symbol: Some("ADX/USD".to_string()),
2048                time_in_force: Some(TimeInForce::GTC),
2049                timestamp: "2024-05-18T12:01:56.165888Z".to_string(),
2050                trade_id: None,
2051                triggers: Some(TriggerDescription {
2052                    reference: TriggerType::Index,
2053                    price: dec!(0.2),
2054                    price_type: PriceType::Static,
2055                    actual_price: None,
2056                    peak_price: None,
2057                    last_price: None,
2058                    status: TriggerStatus::Untriggered,
2059                    timestamp: None,
2060                }),
2061                client_order_id: None,
2062            }],
2063            sequence: 120,
2064        }));
2065
2066        ParseIncomingTest::new()
2067            .with_incoming(pending_new)
2068            .expect_message(expected_update_pending)
2069            .test()
2070            .await;
2071    }
2072
2073    #[tokio::test]
2074    async fn test_execution_order_update_new() {
2075        let new = r#"{"channel":"executions","type":"update","data":[{"timestamp":"2024-05-18T12:58:51.121515Z",
2076    "order_status":"new","exec_type":"new","order_userref":0,"order_id":"7J91XK-XMFEL-348VPM"}],"sequence":148}"#.to_string();
2077
2078        let expected_update_new = WssMessage::Channel(ChannelMessage::Execution(Response {
2079            data: vec![ExecutionResult {
2080                amended: None,
2081                execution_type: ExecutionType::New,
2082                cash_order_quantity: None,
2083                contingent: None,
2084                cost: None,
2085                execution_id: None,
2086                fees: None,
2087                liquidity_indicator: None,
2088                last_price: None,
2089                last_quantity: None,
2090                average_price: None,
2091                reason: None,
2092                cumulative_cost: None,
2093                cumulative_quantity: None,
2094                display_quantity: None,
2095                effective_time: None,
2096                expire_time: None,
2097                ext_ord_id: None,
2098                ext_exec_id: None,
2099                fee_preference: None,
2100                fee_usd_equivalent: None,
2101                limit_price: None,
2102                limit_price_type: None,
2103                liquidated: None,
2104                margin: None,
2105                margin_borrow: None,
2106                no_market_price_protection: None,
2107                order_ref_id: None,
2108                order_id: "7J91XK-XMFEL-348VPM".to_string(),
2109                order_quantity: None,
2110                order_type: None,
2111                order_status: OrderStatusV2::New,
2112                order_user_ref: Some(0),
2113                post_only: None,
2114                position_status: None,
2115                reduce_only: None,
2116                sender_sub_id: None,
2117                side: None,
2118                symbol: None,
2119                time_in_force: None,
2120                timestamp: "2024-05-18T12:58:51.121515Z".to_string(),
2121                trade_id: None,
2122                triggers: None,
2123                client_order_id: None,
2124            }],
2125            sequence: 148,
2126        }));
2127
2128        ParseIncomingTest::new()
2129            .with_incoming(new)
2130            .expect_message(expected_update_new)
2131            .test()
2132            .await;
2133    }
2134
2135    #[tokio::test]
2136    async fn test_execution_order_amended() {
2137        let amend = r#"{
2138        "channel":"executions",
2139        "type":"update",
2140        "data":[
2141            {
2142                "timestamp":"2024-10-13T13:38:39.273886Z",
2143                "exec_type":"amended",
2144                "order_status":"new",
2145                "cum_qty":0.00000000,
2146                "reason":"User requested",
2147                "amended":true,
2148                "order_qty":5.10000000,
2149                "limit_price":0.9600,
2150                "limit_price_type":"static",
2151                "order_userref":0,
2152                "order_id":"6LYQGW-FH922-U6JTUM"
2153            }
2154        ],
2155        "sequence":20
2156    }"#;
2157
2158        let expected_execution = WssMessage::Channel(ChannelMessage::Execution(Response {
2159            data: vec![ExecutionResult {
2160                amended: Some(true),
2161                execution_type: ExecutionType::Amended,
2162                cash_order_quantity: None,
2163                client_order_id: None,
2164                contingent: None,
2165                cost: None,
2166                execution_id: None,
2167                fees: None,
2168                liquidity_indicator: None,
2169                last_price: None,
2170                last_quantity: None,
2171                average_price: None,
2172                reason: Some("User requested".to_string()),
2173                cumulative_cost: None,
2174                cumulative_quantity: Some(dec!(0)),
2175                display_quantity: None,
2176                effective_time: None,
2177                expire_time: None,
2178                ext_ord_id: None,
2179                ext_exec_id: None,
2180                fee_preference: None,
2181                fee_usd_equivalent: None,
2182                limit_price: Some(dec!(0.9600)),
2183                limit_price_type: Some(PriceType::Static),
2184                liquidated: None,
2185                margin: None,
2186                margin_borrow: None,
2187                no_market_price_protection: None,
2188                order_ref_id: None,
2189                order_id: "6LYQGW-FH922-U6JTUM".to_string(),
2190                order_quantity: Some(dec!(5.10000000)),
2191                order_type: None,
2192                order_status: OrderStatusV2::New,
2193                order_user_ref: Some(0),
2194                post_only: None,
2195                position_status: None,
2196                reduce_only: None,
2197                sender_sub_id: None,
2198                side: None,
2199                symbol: None,
2200                time_in_force: None,
2201                timestamp: "2024-10-13T13:38:39.273886Z".to_string(),
2202                trade_id: None,
2203                triggers: None,
2204            }],
2205            sequence: 20,
2206        }));
2207
2208        ParseIncomingTest::new()
2209            .with_incoming(amend.to_string())
2210            .expect_message(expected_execution)
2211            .test()
2212            .await;
2213    }
2214
2215    #[tokio::test]
2216    async fn test_balances_snapshot() {
2217        let balances_snapshot = r#"{
2218        "channel":"balances",
2219        "type":"snapshot",
2220        "data":[
2221            {"asset":"BRICK","asset_class":"currency","balance":439.9736, "wallets": []},
2222            {"asset":"KAR","asset_class":"currency","balance":774.6366982600, "wallets": []},
2223            {"asset":"KEEP","asset_class":"currency","balance":622.3962481300, "wallets": []},
2224            {"asset":"MULTI","asset_class":"currency","balance":5.5971035500, "wallets": []},
2225            {"asset":"USD","asset_class":"currency","balance":160.2405, "wallets": [{
2226                "type": "spot",
2227                "id": "main",
2228                "balance": 1.34
2229            }]}
2230        ],
2231        "sequence":1
2232    }
2233    "#
2234        .to_string();
2235
2236        let expected_snapshot = WssMessage::Channel(ChannelMessage::Balance(Response {
2237            data: BalanceResponse::Snapshot(vec![
2238                Balance {
2239                    asset: "BRICK".to_string(),
2240                    balance: dec!(439.9736),
2241                    wallets: vec![],
2242                },
2243                Balance {
2244                    asset: "KAR".to_string(),
2245                    balance: dec!(774.6366982600),
2246                    wallets: vec![],
2247                },
2248                Balance {
2249                    asset: "KEEP".to_string(),
2250                    balance: dec!(622.3962481300),
2251                    wallets: vec![],
2252                },
2253                Balance {
2254                    asset: "MULTI".to_string(),
2255                    balance: dec!(5.5971035500),
2256                    wallets: vec![],
2257                },
2258                Balance {
2259                    asset: "USD".to_string(),
2260                    balance: dec!(160.2405),
2261                    wallets: vec![Wallet {
2262                        balance: dec!(1.34),
2263                        wallet_type: WalletType::Spot,
2264                        id: WalletId::Main,
2265                    }],
2266                },
2267            ]),
2268            sequence: 1,
2269        }));
2270
2271        ParseIncomingTest::new()
2272            .with_incoming(balances_snapshot)
2273            .expect_message(expected_snapshot)
2274            .test()
2275            .await;
2276    }
2277
2278    #[tokio::test]
2279    async fn test_balances_updates() {
2280        let usd_update = r#"{
2281        "channel":"balances",
2282        "type":"update",
2283        "data":[{
2284            "ledger_id":"DATKX6-PEHL1-HZKND8",
2285            "ref_id":"LKAKN2-N0N12-VKQNLN",
2286            "timestamp":"2024-05-24T14:01:53.526524Z",
2287            "type":"trade",
2288            "asset":"USD",
2289            "asset_class":"currency",
2290            "category":"trade",
2291            "wallet_type":"spot",
2292            "wallet_id":"main",
2293            "amount":-19.9743,
2294            "fee":0.0499,
2295            "balance":118.0677
2296        }],
2297        "sequence":4
2298    }"#
2299        .to_string();
2300
2301        let expected_usd_update = WssMessage::Channel(ChannelMessage::Balance(Response {
2302            data: BalanceResponse::Update(vec![LedgerUpdate {
2303                asset: "USD".to_string(),
2304                amount: dec!(-19.9743),
2305                balance: dec!(118.0677),
2306                fee: dec!(0.0499),
2307                ledger_id: "DATKX6-PEHL1-HZKND8".to_string(),
2308                ref_id: "LKAKN2-N0N12-VKQNLN".to_string(),
2309                timestamp: "2024-05-24T14:01:53.526524Z".to_string(),
2310                asset_class: "currency".to_string(),
2311                ledger_type: LedgerEntryTypeV2::Trade,
2312                sub_type: None,
2313                category: LedgerCategory::Trade,
2314                wallet_type: WalletType::Spot,
2315                wallet_id: WalletId::Main,
2316            }]),
2317            sequence: 4,
2318        }));
2319
2320        let base_update = r#"{
2321        "channel":"balances",
2322        "type":"update",
2323        "data":[{
2324            "ledger_id":"9K6IR4-X9PQJ-OMBG73",
2325            "ref_id":"WLINKJ-1TZZW-M3HCOY",
2326            "timestamp":"2024-05-12T12:11:57.525134Z",
2327            "type":"trade",
2328            "asset":"ADX",
2329            "asset_class":"currency",
2330            "category":"trade",
2331            "wallet_type":"spot",
2332            "wallet_id":"main",
2333            "amount":111.0857412800,
2334            "fee":0.0000000000,
2335            "balance":147.1906006900
2336        }],
2337        "sequence":5
2338    }"#
2339        .to_string();
2340
2341        let expected_base_update = WssMessage::Channel(ChannelMessage::Balance(Response {
2342            data: BalanceResponse::Update(vec![LedgerUpdate {
2343                asset: "ADX".to_string(),
2344                amount: dec!(111.0857412800),
2345                balance: dec!(147.1906006900),
2346                fee: dec!(0.0),
2347                ledger_id: "9K6IR4-X9PQJ-OMBG73".to_string(),
2348                ref_id: "WLINKJ-1TZZW-M3HCOY".to_string(),
2349                timestamp: "2024-05-12T12:11:57.525134Z".to_string(),
2350                asset_class: "currency".to_string(),
2351                ledger_type: LedgerEntryTypeV2::Trade,
2352                sub_type: None,
2353                category: LedgerCategory::Trade,
2354                wallet_type: WalletType::Spot,
2355                wallet_id: WalletId::Main,
2356            }]),
2357            sequence: 5,
2358        }));
2359
2360        ParseIncomingTest::new()
2361            .with_incoming(usd_update)
2362            .expect_message(expected_usd_update)
2363            .with_incoming(base_update)
2364            .expect_message(expected_base_update)
2365            .test()
2366            .await;
2367    }
2368}