kraken_async_rs/wss/messages/
base_messages.rs

1use crate::wss::StatusUpdate;
2use crate::wss::{
3    AddOrderResult, AmendOrderResult, BatchCancelResponse, CancelAllOrdersResult,
4    CancelOnDisconnectResult, CancelOrderResult, EditOrderResult,
5};
6use crate::wss::{BalanceResponse, ExecutionResult, SubscriptionResult};
7use crate::wss::{Instruments, Ohlc, Ticker, Trade, L2, L3};
8use serde::{de, Deserialize, Deserializer, Serialize};
9use serde_json::Value::Null;
10use std::collections::VecDeque;
11use std::fmt::Debug;
12
13#[derive(Debug, Deserialize, PartialEq)]
14#[serde(untagged)]
15pub enum WssMessage {
16    Channel(ChannelMessage),
17    Method(MethodMessage),
18    Error(ErrorResponse),
19}
20
21#[derive(Debug, Deserialize, PartialEq)]
22#[serde(tag = "method")]
23pub enum MethodMessage {
24    #[serde(rename = "add_order")]
25    AddOrder(ResultResponse<AddOrderResult>),
26    #[serde(rename = "edit_order")]
27    EditOrder(ResultResponse<EditOrderResult>),
28    #[serde(rename = "amend_order")]
29    AmendOrder(ResultResponse<AmendOrderResult>),
30    #[serde(rename = "cancel_order")]
31    CancelOrder(ResultResponse<CancelOrderResult>),
32    #[serde(rename = "cancel_all")]
33    CancelAllOrders(ResultResponse<CancelAllOrdersResult>),
34    #[serde(rename = "cancel_all_orders_after")]
35    CancelOnDisconnect(ResultResponse<CancelOnDisconnectResult>),
36    #[serde(rename = "batch_add")]
37    BatchOrder(ResultResponse<Vec<AddOrderResult>>),
38    #[serde(rename = "batch_cancel")]
39    BatchCancel(BatchCancelResponse),
40    #[serde(rename = "subscribe")]
41    Subscription(ResultResponse<SubscriptionResult>),
42    #[serde(alias = "ping")]
43    Ping(ResultResponse<Option<()>>),
44    #[serde(rename = "pong")]
45    Pong(PongResponse),
46}
47
48#[derive(Debug, Deserialize, PartialEq)]
49#[serde(tag = "channel")]
50pub enum ChannelMessage {
51    #[serde(rename = "heartbeat")]
52    Heartbeat,
53    #[serde(rename = "status")]
54    Status(SingleResponse<StatusUpdate>),
55    #[serde(rename = "executions")]
56    Execution(Response<Vec<ExecutionResult>>),
57    #[serde(rename = "balances")]
58    Balance(Response<BalanceResponse>),
59    #[serde(rename = "trade")]
60    Trade(MarketDataResponse<Vec<Trade>>),
61    #[serde(rename = "ticker")]
62    Ticker(SingleResponse<Ticker>),
63    #[serde(rename = "ohlc")]
64    Ohlc(MarketDataResponse<Vec<Ohlc>>),
65    #[serde(rename = "instrument")]
66    Instrument(MarketDataResponse<Instruments>),
67    #[serde(rename = "book")]
68    Orderbook(SingleResponse<L2>),
69    #[serde(rename = "level3")]
70    L3(SingleResponse<L3>),
71}
72
73#[derive(Debug, Serialize, Deserialize, Clone)]
74pub struct Message<T>
75where
76    T: Debug,
77{
78    pub method: String,
79    #[serde(skip_serializing_if = "is_none")]
80    pub params: T,
81    pub req_id: i64,
82}
83
84impl<T> Message<T>
85where
86    T: Debug,
87{
88    pub fn new_subscription(params: T, req_id: i64) -> Self {
89        Message {
90            method: "subscribe".to_string(),
91            params,
92            req_id,
93        }
94    }
95}
96
97// this is required to not serialize None for generic type parameters
98//  (skip_serializing_none fails there)
99fn is_none<T: Serialize>(t: T) -> bool {
100    serde_json::to_value(t).unwrap_or(Null).is_null()
101}
102
103#[derive(Debug, Deserialize, PartialEq)]
104pub struct Pong {
105    pub warning: Vec<String>,
106}
107
108#[derive(Debug, Deserialize, PartialEq)]
109pub struct Response<T> {
110    pub data: T,
111    pub sequence: i64,
112}
113
114#[derive(Debug, Deserialize, PartialEq)]
115pub struct MarketDataResponse<T> {
116    pub data: T,
117}
118
119#[derive(Debug, Deserialize, PartialEq)]
120pub struct SingleResponse<T>
121where
122    T: for<'a> Deserialize<'a>,
123{
124    #[serde(deserialize_with = "flatten_vec")]
125    pub data: T,
126}
127
128fn flatten_vec<'de, D, T>(deserializer: D) -> Result<T, D::Error>
129where
130    D: Deserializer<'de>,
131    T: for<'a> Deserialize<'a>,
132{
133    let mut vec: VecDeque<T> = de::Deserialize::deserialize(deserializer)?;
134    vec.pop_front()
135        .ok_or(de::Error::custom("Expected Vec with at least one element"))
136}
137
138#[derive(Debug, Deserialize, PartialEq)]
139#[serde(deny_unknown_fields)]
140pub struct ResultResponse<T> {
141    pub result: Option<T>,
142    pub error: Option<String>,
143    pub success: bool,
144    pub req_id: i64,
145    pub time_in: String,
146    pub time_out: String,
147}
148
149#[derive(Debug, Deserialize, PartialEq)]
150#[serde(deny_unknown_fields)]
151pub struct ErrorResponse {
152    pub error: Option<String>,
153    pub method: String,
154    pub status: String,
155    pub success: bool,
156    pub req_id: i64,
157    pub time_in: String,
158    pub time_out: String,
159}
160
161#[derive(Debug, Deserialize, PartialEq)]
162#[serde(deny_unknown_fields)]
163pub struct PongResponse {
164    pub error: Option<String>,
165    pub req_id: i64,
166    pub time_in: String,
167    pub time_out: String,
168}
169
170#[cfg(test)]
171mod tests {
172    use crate::response_types::SystemStatus;
173    use crate::wss::StatusUpdate;
174    use crate::wss::{ChannelMessage, ErrorResponse, SingleResponse, WssMessage};
175    use serde_json::Number;
176    use std::str::FromStr;
177
178    #[test]
179    fn test_deserializing_status_update() {
180        let message = r#"{"channel":"status","data":[{"api_version":"v2","connection_id":18266300427528990701,"system":"online","version":"2.0.4"}],"type":"update"}"#;
181        let expected = WssMessage::Channel(ChannelMessage::Status(SingleResponse {
182            data: StatusUpdate {
183                api_version: "v2".to_string(),
184                connection_id: Some(Number::from_str("18266300427528990701").unwrap()),
185                system: SystemStatus::Online,
186                version: "2.0.4".to_string(),
187            },
188        }));
189
190        let parsed = serde_json::from_str::<WssMessage>(message).unwrap();
191
192        assert_eq!(expected, parsed);
193    }
194
195    #[test]
196    fn test_deserializing_maintenance_status_update() {
197        let message = r#"{"channel":"status","data":[{"api_version":"v2","system":"maintenance","version":"2.0.6"}],"type":"update"}"#;
198        let expected = WssMessage::Channel(ChannelMessage::Status(SingleResponse {
199            data: StatusUpdate {
200                api_version: "v2".to_string(),
201                connection_id: None,
202                system: SystemStatus::Maintenance,
203                version: "2.0.6".to_string(),
204            },
205        }));
206
207        let parsed = serde_json::from_str::<WssMessage>(message).unwrap();
208
209        assert_eq!(expected, parsed);
210    }
211
212    #[test]
213    fn test_deserializing_l2_update() {
214        let raw = r#"{"channel":"book","type":"update","data":[{"symbol":"BTC/USD","bids":[],"asks":[{"price":66732.5,"qty":5.48256063}],"checksum":2855135483,"timestamp":"2024-05-19T16:32:26.777454Z"}]}"#;
215        let _parsed = serde_json::from_str::<ChannelMessage>(raw).unwrap();
216    }
217
218    #[test]
219    fn test_deserializing_error_message() {
220        let raw = r#"{"error":"ESession:Invalid session","method":"subscribe","req_id":42,"status":"error","success":false,"time_in":"2023-04-19T12:04:41.320119Z","time_out":"2023-04-19T12:04:41.980119Z"}"#;
221
222        let expected = WssMessage::Error(ErrorResponse {
223            error: Some("ESession:Invalid session".to_string()),
224            method: "subscribe".to_string(),
225            status: "error".to_string(),
226            success: false,
227            req_id: 42,
228            time_in: "2023-04-19T12:04:41.320119Z".to_string(),
229            time_out: "2023-04-19T12:04:41.980119Z".to_string(),
230        });
231
232        let parsed = serde_json::from_str::<WssMessage>(raw).unwrap();
233        assert_eq!(expected, parsed);
234    }
235}