kraken_async_rs/wss/messages/
base_messages.rs1use crate::wss::StatusUpdate;
2use crate::wss::{
3 AddOrderResult, AmendOrderResult, BatchCancelResponse, CancelAllOrdersResult,
4 CancelOnDisconnectResult, CancelOrderResult, EditOrderResult,
5};
6use crate::wss::{BalanceResponse, ExecutionResult, SubscriptionResult};
7use crate::wss::{Instruments, Ohlc, Ticker, Trade, L2, L3};
8use serde::{de, Deserialize, Deserializer, Serialize};
9use serde_json::Value::Null;
10use std::collections::VecDeque;
11use std::fmt::Debug;
12
13#[derive(Debug, Deserialize, PartialEq)]
14#[serde(untagged)]
15pub enum WssMessage {
16 Channel(ChannelMessage),
17 Method(MethodMessage),
18 Error(ErrorResponse),
19}
20
21#[derive(Debug, Deserialize, PartialEq)]
22#[serde(tag = "method")]
23pub enum MethodMessage {
24 #[serde(rename = "add_order")]
25 AddOrder(ResultResponse<AddOrderResult>),
26 #[serde(rename = "edit_order")]
27 EditOrder(ResultResponse<EditOrderResult>),
28 #[serde(rename = "amend_order")]
29 AmendOrder(ResultResponse<AmendOrderResult>),
30 #[serde(rename = "cancel_order")]
31 CancelOrder(ResultResponse<CancelOrderResult>),
32 #[serde(rename = "cancel_all")]
33 CancelAllOrders(ResultResponse<CancelAllOrdersResult>),
34 #[serde(rename = "cancel_all_orders_after")]
35 CancelOnDisconnect(ResultResponse<CancelOnDisconnectResult>),
36 #[serde(rename = "batch_add")]
37 BatchOrder(ResultResponse<Vec<AddOrderResult>>),
38 #[serde(rename = "batch_cancel")]
39 BatchCancel(BatchCancelResponse),
40 #[serde(rename = "subscribe")]
41 Subscription(ResultResponse<SubscriptionResult>),
42 #[serde(alias = "ping")]
43 Ping(ResultResponse<Option<()>>),
44 #[serde(rename = "pong")]
45 Pong(PongResponse),
46}
47
48#[derive(Debug, Deserialize, PartialEq)]
49#[serde(tag = "channel")]
50pub enum ChannelMessage {
51 #[serde(rename = "heartbeat")]
52 Heartbeat,
53 #[serde(rename = "status")]
54 Status(SingleResponse<StatusUpdate>),
55 #[serde(rename = "executions")]
56 Execution(Response<Vec<ExecutionResult>>),
57 #[serde(rename = "balances")]
58 Balance(Response<BalanceResponse>),
59 #[serde(rename = "trade")]
60 Trade(MarketDataResponse<Vec<Trade>>),
61 #[serde(rename = "ticker")]
62 Ticker(SingleResponse<Ticker>),
63 #[serde(rename = "ohlc")]
64 Ohlc(MarketDataResponse<Vec<Ohlc>>),
65 #[serde(rename = "instrument")]
66 Instrument(MarketDataResponse<Instruments>),
67 #[serde(rename = "book")]
68 Orderbook(SingleResponse<L2>),
69 #[serde(rename = "level3")]
70 L3(SingleResponse<L3>),
71}
72
73#[derive(Debug, Serialize, Deserialize, Clone)]
74pub struct Message<T>
75where
76 T: Debug,
77{
78 pub method: String,
79 #[serde(skip_serializing_if = "is_none")]
80 pub params: T,
81 pub req_id: i64,
82}
83
84impl<T> Message<T>
85where
86 T: Debug,
87{
88 pub fn new_subscription(params: T, req_id: i64) -> Self {
89 Message {
90 method: "subscribe".to_string(),
91 params,
92 req_id,
93 }
94 }
95}
96
97fn is_none<T: Serialize>(t: T) -> bool {
100 serde_json::to_value(t).unwrap_or(Null).is_null()
101}
102
103#[derive(Debug, Deserialize, PartialEq)]
104pub struct Pong {
105 pub warning: Vec<String>,
106}
107
108#[derive(Debug, Deserialize, PartialEq)]
109pub struct Response<T> {
110 pub data: T,
111 pub sequence: i64,
112}
113
114#[derive(Debug, Deserialize, PartialEq)]
115pub struct MarketDataResponse<T> {
116 pub data: T,
117}
118
119#[derive(Debug, Deserialize, PartialEq)]
120pub struct SingleResponse<T>
121where
122 T: for<'a> Deserialize<'a>,
123{
124 #[serde(deserialize_with = "flatten_vec")]
125 pub data: T,
126}
127
128fn flatten_vec<'de, D, T>(deserializer: D) -> Result<T, D::Error>
129where
130 D: Deserializer<'de>,
131 T: for<'a> Deserialize<'a>,
132{
133 let mut vec: VecDeque<T> = de::Deserialize::deserialize(deserializer)?;
134 vec.pop_front()
135 .ok_or(de::Error::custom("Expected Vec with at least one element"))
136}
137
138#[derive(Debug, Deserialize, PartialEq)]
139#[serde(deny_unknown_fields)]
140pub struct ResultResponse<T> {
141 pub result: Option<T>,
142 pub error: Option<String>,
143 pub success: bool,
144 pub req_id: i64,
145 pub time_in: String,
146 pub time_out: String,
147}
148
149#[derive(Debug, Deserialize, PartialEq)]
150#[serde(deny_unknown_fields)]
151pub struct ErrorResponse {
152 pub error: Option<String>,
153 pub method: String,
154 pub status: String,
155 pub success: bool,
156 pub req_id: i64,
157 pub time_in: String,
158 pub time_out: String,
159}
160
161#[derive(Debug, Deserialize, PartialEq)]
162#[serde(deny_unknown_fields)]
163pub struct PongResponse {
164 pub error: Option<String>,
165 pub req_id: i64,
166 pub time_in: String,
167 pub time_out: String,
168}
169
170#[cfg(test)]
171mod tests {
172 use crate::response_types::SystemStatus;
173 use crate::wss::StatusUpdate;
174 use crate::wss::{ChannelMessage, ErrorResponse, SingleResponse, WssMessage};
175 use serde_json::Number;
176 use std::str::FromStr;
177
178 #[test]
179 fn test_deserializing_status_update() {
180 let message = r#"{"channel":"status","data":[{"api_version":"v2","connection_id":18266300427528990701,"system":"online","version":"2.0.4"}],"type":"update"}"#;
181 let expected = WssMessage::Channel(ChannelMessage::Status(SingleResponse {
182 data: StatusUpdate {
183 api_version: "v2".to_string(),
184 connection_id: Some(Number::from_str("18266300427528990701").unwrap()),
185 system: SystemStatus::Online,
186 version: "2.0.4".to_string(),
187 },
188 }));
189
190 let parsed = serde_json::from_str::<WssMessage>(message).unwrap();
191
192 assert_eq!(expected, parsed);
193 }
194
195 #[test]
196 fn test_deserializing_maintenance_status_update() {
197 let message = r#"{"channel":"status","data":[{"api_version":"v2","system":"maintenance","version":"2.0.6"}],"type":"update"}"#;
198 let expected = WssMessage::Channel(ChannelMessage::Status(SingleResponse {
199 data: StatusUpdate {
200 api_version: "v2".to_string(),
201 connection_id: None,
202 system: SystemStatus::Maintenance,
203 version: "2.0.6".to_string(),
204 },
205 }));
206
207 let parsed = serde_json::from_str::<WssMessage>(message).unwrap();
208
209 assert_eq!(expected, parsed);
210 }
211
212 #[test]
213 fn test_deserializing_l2_update() {
214 let raw = r#"{"channel":"book","type":"update","data":[{"symbol":"BTC/USD","bids":[],"asks":[{"price":66732.5,"qty":5.48256063}],"checksum":2855135483,"timestamp":"2024-05-19T16:32:26.777454Z"}]}"#;
215 let _parsed = serde_json::from_str::<ChannelMessage>(raw).unwrap();
216 }
217
218 #[test]
219 fn test_deserializing_error_message() {
220 let raw = r#"{"error":"ESession:Invalid session","method":"subscribe","req_id":42,"status":"error","success":false,"time_in":"2023-04-19T12:04:41.320119Z","time_out":"2023-04-19T12:04:41.980119Z"}"#;
221
222 let expected = WssMessage::Error(ErrorResponse {
223 error: Some("ESession:Invalid session".to_string()),
224 method: "subscribe".to_string(),
225 status: "error".to_string(),
226 success: false,
227 req_id: 42,
228 time_in: "2023-04-19T12:04:41.320119Z".to_string(),
229 time_out: "2023-04-19T12:04:41.980119Z".to_string(),
230 });
231
232 let parsed = serde_json::from_str::<WssMessage>(raw).unwrap();
233 assert_eq!(expected, parsed);
234 }
235}