safecoin_client/
pubsub_client.rs

1use {
2    crate::{
3        rpc_config::{
4            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
5            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
6            RpcTransactionLogsFilter,
7        },
8        rpc_filter,
9        rpc_response::{
10            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
11            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
12        },
13    },
14    crossbeam_channel::{unbounded, Receiver, Sender},
15    log::*,
16    serde::de::DeserializeOwned,
17    serde_json::{
18        json,
19        value::Value::{Number, Object},
20        Map, Value,
21    },
22    safecoin_account_decoder::UiAccount,
23    solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
24    std::{
25        marker::PhantomData,
26        net::TcpStream,
27        sync::{
28            atomic::{AtomicBool, Ordering},
29            Arc, RwLock,
30        },
31        thread::{sleep, JoinHandle},
32        time::Duration,
33    },
34    thiserror::Error,
35    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
36    url::{ParseError, Url},
37};
38
39#[derive(Debug, Error)]
40pub enum PubsubClientError {
41    #[error("url parse error")]
42    UrlParseError(#[from] ParseError),
43
44    #[error("unable to connect to server")]
45    ConnectionError(#[from] tungstenite::Error),
46
47    #[error("json parse error")]
48    JsonParseError(#[from] serde_json::error::Error),
49
50    #[error("unexpected message format: {0}")]
51    UnexpectedMessageError(String),
52
53    #[error("request error: {0}")]
54    RequestError(String),
55}
56
57pub struct PubsubClientSubscription<T>
58where
59    T: DeserializeOwned,
60{
61    message_type: PhantomData<T>,
62    operation: &'static str,
63    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
64    subscription_id: u64,
65    t_cleanup: Option<JoinHandle<()>>,
66    exit: Arc<AtomicBool>,
67}
68
69impl<T> Drop for PubsubClientSubscription<T>
70where
71    T: DeserializeOwned,
72{
73    fn drop(&mut self) {
74        self.send_unsubscribe()
75            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
76        self.socket
77            .write()
78            .unwrap()
79            .close(None)
80            .unwrap_or_else(|_| warn!("unable to close websocket"));
81    }
82}
83
84impl<T> PubsubClientSubscription<T>
85where
86    T: DeserializeOwned,
87{
88    fn send_subscribe(
89        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
90        body: String,
91    ) -> Result<u64, PubsubClientError> {
92        writable_socket
93            .write()
94            .unwrap()
95            .write_message(Message::Text(body))?;
96        let message = writable_socket.write().unwrap().read_message()?;
97        Self::extract_subscription_id(message)
98    }
99
100    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
101        let message_text = &message.into_text()?;
102        let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
103
104        if let Some(Number(x)) = json_msg.get("result") {
105            if let Some(x) = x.as_u64() {
106                return Ok(x);
107            }
108        }
109        // TODO: Add proper JSON RPC response/error handling...
110        Err(PubsubClientError::UnexpectedMessageError(format!(
111            "{:?}",
112            json_msg
113        )))
114    }
115
116    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
117        let method = format!("{}Unsubscribe", self.operation);
118        self.socket
119            .write()
120            .unwrap()
121            .write_message(Message::Text(
122                json!({
123                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
124                })
125                .to_string(),
126            ))
127            .map_err(|err| err.into())
128    }
129
130    fn get_version(
131        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
132    ) -> Result<semver::Version, PubsubClientError> {
133        writable_socket
134            .write()
135            .unwrap()
136            .write_message(Message::Text(
137                json!({
138                    "jsonrpc":"2.0","id":1,"method":"getVersion",
139                })
140                .to_string(),
141            ))?;
142        let message = writable_socket.write().unwrap().read_message()?;
143        let message_text = &message.into_text()?;
144        let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
145
146        if let Some(Object(version_map)) = json_msg.get("result") {
147            if let Some(node_version) = version_map.get("safecoin-core") {
148                let node_version = semver::Version::parse(
149                    node_version.as_str().unwrap_or_default(),
150                )
151                .map_err(|e| {
152                    PubsubClientError::RequestError(format!(
153                        "failed to parse cluster version: {}",
154                        e
155                    ))
156                })?;
157                return Ok(node_version);
158            }
159        }
160        // TODO: Add proper JSON RPC response/error handling...
161        Err(PubsubClientError::UnexpectedMessageError(format!(
162            "{:?}",
163            json_msg
164        )))
165    }
166
167    fn read_message(
168        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
169    ) -> Result<T, PubsubClientError> {
170        let message = writable_socket.write().unwrap().read_message()?;
171        let message_text = &message.into_text().unwrap();
172        let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
173
174        if let Some(Object(params)) = json_msg.get("params") {
175            if let Some(result) = params.get("result") {
176                let x: T = serde_json::from_value::<T>(result.clone()).unwrap();
177                return Ok(x);
178            }
179        }
180
181        // TODO: Add proper JSON RPC response/error handling...
182        Err(PubsubClientError::UnexpectedMessageError(format!(
183            "{:?}",
184            json_msg
185        )))
186    }
187
188    pub fn shutdown(&mut self) -> std::thread::Result<()> {
189        if self.t_cleanup.is_some() {
190            info!("websocket thread - shutting down");
191            self.exit.store(true, Ordering::Relaxed);
192            let x = self.t_cleanup.take().unwrap().join();
193            info!("websocket thread - shut down.");
194            x
195        } else {
196            warn!("websocket thread - already shut down.");
197            Ok(())
198        }
199    }
200}
201
202pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
203pub type LogsSubscription = (
204    PubsubLogsClientSubscription,
205    Receiver<RpcResponse<RpcLogsResponse>>,
206);
207
208pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
209pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
210
211pub type PubsubSignatureClientSubscription =
212    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
213pub type SignatureSubscription = (
214    PubsubSignatureClientSubscription,
215    Receiver<RpcResponse<RpcSignatureResult>>,
216);
217
218pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
219pub type BlockSubscription = (
220    PubsubBlockClientSubscription,
221    Receiver<RpcResponse<RpcBlockUpdate>>,
222);
223
224pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
225pub type ProgramSubscription = (
226    PubsubProgramClientSubscription,
227    Receiver<RpcResponse<RpcKeyedAccount>>,
228);
229
230pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
231pub type AccountSubscription = (
232    PubsubAccountClientSubscription,
233    Receiver<RpcResponse<UiAccount>>,
234);
235
236pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
237pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
238
239pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
240pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
241
242pub struct PubsubClient {}
243
244fn connect_with_retry(
245    url: Url,
246) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
247    let mut connection_retries = 5;
248    loop {
249        let result = connect(url.clone()).map(|(socket, _)| socket);
250        if let Err(tungstenite::Error::Http(response)) = &result {
251            if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
252            {
253                let mut duration = Duration::from_millis(500);
254                if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
255                    if let Ok(retry_after) = retry_after.to_str() {
256                        if let Ok(retry_after) = retry_after.parse::<u64>() {
257                            if retry_after < 120 {
258                                duration = Duration::from_secs(retry_after);
259                            }
260                        }
261                    }
262                }
263
264                connection_retries -= 1;
265                debug!(
266                    "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
267                    response, connection_retries, duration
268                );
269
270                sleep(duration);
271                continue;
272            }
273        }
274        return result;
275    }
276}
277
278impl PubsubClient {
279    pub fn account_subscribe(
280        url: &str,
281        pubkey: &Pubkey,
282        config: Option<RpcAccountInfoConfig>,
283    ) -> Result<AccountSubscription, PubsubClientError> {
284        let url = Url::parse(url)?;
285        let socket = connect_with_retry(url)?;
286        let (sender, receiver) = unbounded();
287
288        let socket = Arc::new(RwLock::new(socket));
289        let socket_clone = socket.clone();
290        let exit = Arc::new(AtomicBool::new(false));
291        let exit_clone = exit.clone();
292        let body = json!({
293            "jsonrpc":"2.0",
294            "id":1,
295            "method":"accountSubscribe",
296            "params":[
297                pubkey.to_string(),
298                config
299            ]
300        })
301        .to_string();
302        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
303
304        let t_cleanup = std::thread::spawn(move || {
305            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
306        });
307
308        let result = PubsubClientSubscription {
309            message_type: PhantomData,
310            operation: "account",
311            socket,
312            subscription_id,
313            t_cleanup: Some(t_cleanup),
314            exit,
315        };
316
317        Ok((result, receiver))
318    }
319
320    pub fn block_subscribe(
321        url: &str,
322        filter: RpcBlockSubscribeFilter,
323        config: Option<RpcBlockSubscribeConfig>,
324    ) -> Result<BlockSubscription, PubsubClientError> {
325        let url = Url::parse(url)?;
326        let socket = connect_with_retry(url)?;
327        let (sender, receiver) = unbounded();
328
329        let socket = Arc::new(RwLock::new(socket));
330        let socket_clone = socket.clone();
331        let exit = Arc::new(AtomicBool::new(false));
332        let exit_clone = exit.clone();
333        let body = json!({
334            "jsonrpc":"2.0",
335            "id":1,
336            "method":"blockSubscribe",
337            "params":[filter, config]
338        })
339        .to_string();
340
341        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
342
343        let t_cleanup = std::thread::spawn(move || {
344            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
345        });
346
347        let result = PubsubClientSubscription {
348            message_type: PhantomData,
349            operation: "block",
350            socket,
351            subscription_id,
352            t_cleanup: Some(t_cleanup),
353            exit,
354        };
355
356        Ok((result, receiver))
357    }
358
359    pub fn logs_subscribe(
360        url: &str,
361        filter: RpcTransactionLogsFilter,
362        config: RpcTransactionLogsConfig,
363    ) -> Result<LogsSubscription, PubsubClientError> {
364        let url = Url::parse(url)?;
365        let socket = connect_with_retry(url)?;
366        let (sender, receiver) = unbounded();
367
368        let socket = Arc::new(RwLock::new(socket));
369        let socket_clone = socket.clone();
370        let exit = Arc::new(AtomicBool::new(false));
371        let exit_clone = exit.clone();
372        let body = json!({
373            "jsonrpc":"2.0",
374            "id":1,
375            "method":"logsSubscribe",
376            "params":[filter, config]
377        })
378        .to_string();
379
380        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
381
382        let t_cleanup = std::thread::spawn(move || {
383            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
384        });
385
386        let result = PubsubClientSubscription {
387            message_type: PhantomData,
388            operation: "logs",
389            socket,
390            subscription_id,
391            t_cleanup: Some(t_cleanup),
392            exit,
393        };
394
395        Ok((result, receiver))
396    }
397
398    pub fn program_subscribe(
399        url: &str,
400        pubkey: &Pubkey,
401        mut config: Option<RpcProgramAccountsConfig>,
402    ) -> Result<ProgramSubscription, PubsubClientError> {
403        let url = Url::parse(url)?;
404        let socket = connect_with_retry(url)?;
405        let (sender, receiver) = unbounded();
406
407        let socket = Arc::new(RwLock::new(socket));
408        let socket_clone = socket.clone();
409        let exit = Arc::new(AtomicBool::new(false));
410        let exit_clone = exit.clone();
411
412        if let Some(ref mut config) = config {
413            if let Some(ref mut filters) = config.filters {
414                let node_version = PubsubProgramClientSubscription::get_version(&socket_clone).ok();
415                // If node does not support the pubsub `getVersion` method, assume version is old
416                // and filters should be mapped (node_version.is_none()).
417                rpc_filter::maybe_map_filters(node_version, filters)
418                    .map_err(PubsubClientError::RequestError)?;
419            }
420        }
421
422        let body = json!({
423            "jsonrpc":"2.0",
424            "id":1,
425            "method":"programSubscribe",
426            "params":[
427                pubkey.to_string(),
428                config
429            ]
430        })
431        .to_string();
432        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
433
434        let t_cleanup = std::thread::spawn(move || {
435            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
436        });
437
438        let result = PubsubClientSubscription {
439            message_type: PhantomData,
440            operation: "program",
441            socket,
442            subscription_id,
443            t_cleanup: Some(t_cleanup),
444            exit,
445        };
446
447        Ok((result, receiver))
448    }
449
450    pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
451        let url = Url::parse(url)?;
452        let socket = connect_with_retry(url)?;
453        let (sender, receiver) = unbounded();
454
455        let socket = Arc::new(RwLock::new(socket));
456        let socket_clone = socket.clone();
457        let exit = Arc::new(AtomicBool::new(false));
458        let exit_clone = exit.clone();
459        let body = json!({
460            "jsonrpc":"2.0",
461            "id":1,
462            "method":"voteSubscribe",
463        })
464        .to_string();
465        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
466
467        let t_cleanup = std::thread::spawn(move || {
468            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
469        });
470
471        let result = PubsubClientSubscription {
472            message_type: PhantomData,
473            operation: "vote",
474            socket,
475            subscription_id,
476            t_cleanup: Some(t_cleanup),
477            exit,
478        };
479
480        Ok((result, receiver))
481    }
482
483    pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
484        let url = Url::parse(url)?;
485        let socket = connect_with_retry(url)?;
486        let (sender, receiver) = unbounded();
487
488        let socket = Arc::new(RwLock::new(socket));
489        let socket_clone = socket.clone();
490        let exit = Arc::new(AtomicBool::new(false));
491        let exit_clone = exit.clone();
492        let body = json!({
493            "jsonrpc":"2.0",
494            "id":1,
495            "method":"rootSubscribe",
496        })
497        .to_string();
498        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
499
500        let t_cleanup = std::thread::spawn(move || {
501            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
502        });
503
504        let result = PubsubClientSubscription {
505            message_type: PhantomData,
506            operation: "root",
507            socket,
508            subscription_id,
509            t_cleanup: Some(t_cleanup),
510            exit,
511        };
512
513        Ok((result, receiver))
514    }
515
516    pub fn signature_subscribe(
517        url: &str,
518        signature: &Signature,
519        config: Option<RpcSignatureSubscribeConfig>,
520    ) -> Result<SignatureSubscription, PubsubClientError> {
521        let url = Url::parse(url)?;
522        let socket = connect_with_retry(url)?;
523        let (sender, receiver) = unbounded();
524
525        let socket = Arc::new(RwLock::new(socket));
526        let socket_clone = socket.clone();
527        let exit = Arc::new(AtomicBool::new(false));
528        let exit_clone = exit.clone();
529        let body = json!({
530            "jsonrpc":"2.0",
531            "id":1,
532            "method":"signatureSubscribe",
533            "params":[
534                signature.to_string(),
535                config
536            ]
537        })
538        .to_string();
539        let subscription_id =
540            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
541
542        let t_cleanup = std::thread::spawn(move || {
543            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
544        });
545
546        let result = PubsubClientSubscription {
547            message_type: PhantomData,
548            operation: "signature",
549            socket,
550            subscription_id,
551            t_cleanup: Some(t_cleanup),
552            exit,
553        };
554
555        Ok((result, receiver))
556    }
557
558    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
559        let url = Url::parse(url)?;
560        let socket = connect_with_retry(url)?;
561        let (sender, receiver) = unbounded::<SlotInfo>();
562
563        let socket = Arc::new(RwLock::new(socket));
564        let socket_clone = socket.clone();
565        let exit = Arc::new(AtomicBool::new(false));
566        let exit_clone = exit.clone();
567        let body = json!({
568            "jsonrpc":"2.0",
569            "id":1,
570            "method":"slotSubscribe",
571            "params":[]
572        })
573        .to_string();
574        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
575
576        let t_cleanup = std::thread::spawn(move || {
577            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
578        });
579
580        let result = PubsubClientSubscription {
581            message_type: PhantomData,
582            operation: "slot",
583            socket,
584            subscription_id,
585            t_cleanup: Some(t_cleanup),
586            exit,
587        };
588
589        Ok((result, receiver))
590    }
591
592    pub fn slot_updates_subscribe(
593        url: &str,
594        handler: impl Fn(SlotUpdate) + Send + 'static,
595    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
596        let url = Url::parse(url)?;
597        let socket = connect_with_retry(url)?;
598
599        let socket = Arc::new(RwLock::new(socket));
600        let socket_clone = socket.clone();
601        let exit = Arc::new(AtomicBool::new(false));
602        let exit_clone = exit.clone();
603        let body = json!({
604            "jsonrpc":"2.0",
605            "id":1,
606            "method":"slotsUpdatesSubscribe",
607            "params":[]
608        })
609        .to_string();
610        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
611
612        let t_cleanup = std::thread::spawn(move || {
613            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
614        });
615
616        Ok(PubsubClientSubscription {
617            message_type: PhantomData,
618            operation: "slotsUpdates",
619            socket,
620            subscription_id,
621            t_cleanup: Some(t_cleanup),
622            exit,
623        })
624    }
625
626    fn cleanup_with_sender<T>(
627        exit: Arc<AtomicBool>,
628        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
629        sender: Sender<T>,
630    ) where
631        T: DeserializeOwned + Send + 'static,
632    {
633        let handler = move |message| match sender.send(message) {
634            Ok(_) => (),
635            Err(err) => {
636                info!("receive error: {:?}", err);
637            }
638        };
639        Self::cleanup_with_handler(exit, socket, handler);
640    }
641
642    fn cleanup_with_handler<T, F>(
643        exit: Arc<AtomicBool>,
644        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
645        handler: F,
646    ) where
647        T: DeserializeOwned,
648        F: Fn(T) + Send + 'static,
649    {
650        loop {
651            if exit.load(Ordering::Relaxed) {
652                break;
653            }
654
655            match PubsubClientSubscription::read_message(socket) {
656                Ok(message) => handler(message),
657                Err(err) => {
658                    info!("receive error: {:?}", err);
659                    break;
660                }
661            }
662        }
663
664        info!("websocket - exited receive loop");
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    // see client-test/test/client.rs
671}