solana_pubsub_client/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a blocking API. For a non-blocking API use the asynchronous client
9//! in [`crate::nonblocking::pubsub_client`].
10//!
11//! `PubsubClient` contains static methods to subscribe to events, like
12//! [`PubsubClient::account_subscribe`]. These methods each return their own
13//! subscription type, like [`AccountSubscription`], that are typedefs of
14//! tuples, the first element being a handle to the subscription, like
15//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of
16//! whichever type is appropriate for the subscription. The subscription handle
17//! is a typedef of [`PubsubClientSubscription`], and it must remain live for
18//! the receiver to continue receiving messages.
19//!
20//! Because this is a blocking API, with blocking receivers, a reasonable
21//! pattern for using this API is to move each event receiver to its own thread
22//! to block on messages, while holding all subscription handles on a single
23//! primary thread.
24//!
25//! While `PubsubClientSubscription` contains methods for shutting down,
26//! [`PubsubClientSubscription::send_unsubscribe`], and
27//! [`PubsubClientSubscription::shutdown`], because its internal receivers block
28//! on events from the server, these subscriptions cannot actually be shutdown
29//! reliably. For a non-blocking, cancelable API, use the asynchronous client
30//! in [`crate::nonblocking::pubsub_client`].
31//!
32//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
33//! disabled on RPC nodes. They can be enabled by passing
34//! `--rpc-pubsub-enable-block-subscription` and
35//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
36//! methods are disabled, the RPC server will return a "Method not found" error
37//! message.
38//!
39//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
40//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
41//!
42//! # Examples
43//!
44//! This example subscribes to account events and then loops forever receiving
45//! them.
46//!
47//! ```
48//! use anyhow::Result;
49//! use solana_commitment_config::CommitmentConfig;
50//! use solana_pubkey::Pubkey;
51//! use solana_pubsub_client::pubsub_client::PubsubClient;
52//! use solana_rpc_client_api::config::RpcAccountInfoConfig;
53//! use std::thread;
54//!
55//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> {
56//!     let url = "wss://api.devnet.solana.com/";
57//!
58//!     let (mut account_subscription_client, account_subscription_receiver) =
59//!         PubsubClient::account_subscribe(
60//!             url,
61//!             &account_pubkey,
62//!             Some(RpcAccountInfoConfig {
63//!                 encoding: None,
64//!                 data_slice: None,
65//!                 commitment: Some(CommitmentConfig::confirmed()),
66//!                 min_context_slot: None,
67//!             }),
68//!         )?;
69//!
70//!     loop {
71//!         match account_subscription_receiver.recv() {
72//!             Ok(response) => {
73//!                 println!("account subscription response: {:?}", response);
74//!             }
75//!             Err(e) => {
76//!                 println!("account subscription error: {:?}", e);
77//!                 break;
78//!             }
79//!         }
80//!     }
81//!
82//!     Ok(())
83//! }
84//! #
85//! # get_account_updates(solana_pubkey::new_rand());
86//! # Ok::<(), anyhow::Error>(())
87//! ```
88
89pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91    crossbeam_channel::{unbounded, Receiver, Sender},
92    log::*,
93    serde::de::DeserializeOwned,
94    serde_json::{
95        json,
96        value::Value::{Number, Object},
97        Map, Value,
98    },
99    solana_account_decoder_client_types::UiAccount,
100    solana_clock::Slot,
101    solana_pubkey::Pubkey,
102    solana_rpc_client_api::{
103        config::{
104            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
105            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
106            RpcTransactionLogsFilter,
107        },
108        response::{
109            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
110            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
111        },
112    },
113    solana_signature::Signature,
114    std::{
115        marker::PhantomData,
116        net::TcpStream,
117        sync::{
118            atomic::{AtomicBool, Ordering},
119            Arc, RwLock,
120        },
121        thread::{sleep, JoinHandle},
122        time::Duration,
123    },
124    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
125    url::Url,
126};
127
128/// A subscription.
129///
130/// The subscription is unsubscribed on drop, and note that unsubscription (and
131/// thus drop) time is unbounded. See
132/// [`PubsubClientSubscription::send_unsubscribe`].
133pub struct PubsubClientSubscription<T>
134where
135    T: DeserializeOwned,
136{
137    message_type: PhantomData<T>,
138    operation: &'static str,
139    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
140    subscription_id: u64,
141    t_cleanup: Option<JoinHandle<()>>,
142    exit: Arc<AtomicBool>,
143}
144
145impl<T> Drop for PubsubClientSubscription<T>
146where
147    T: DeserializeOwned,
148{
149    fn drop(&mut self) {
150        self.send_unsubscribe()
151            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
152        self.socket
153            .write()
154            .unwrap()
155            .close(None)
156            .unwrap_or_else(|_| warn!("unable to close websocket"));
157    }
158}
159
160impl<T> PubsubClientSubscription<T>
161where
162    T: DeserializeOwned,
163{
164    fn send_subscribe(
165        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
166        body: String,
167    ) -> Result<u64, PubsubClientError> {
168        writable_socket.write().unwrap().send(Message::Text(body))?;
169        let message = writable_socket.write().unwrap().read()?;
170        Self::extract_subscription_id(message)
171    }
172
173    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
174        let message_text = &message.into_text()?;
175
176        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
177            if let Some(Number(x)) = json_msg.get("result") {
178                if let Some(x) = x.as_u64() {
179                    return Ok(x);
180                }
181            }
182        }
183
184        Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
185            "msg={message_text}"
186        )))
187    }
188
189    /// Send an unsubscribe message to the server.
190    ///
191    /// Note that this will block as long as the internal subscription receiver
192    /// is waiting on messages from the server, and this can take an unbounded
193    /// amount of time if the server does not send any messages.
194    ///
195    /// If a pubsub client needs to shutdown reliably it should use
196    /// the async client in [`crate::nonblocking::pubsub_client`].
197    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
198        let method = format!("{}Unsubscribe", self.operation);
199        self.socket
200            .write()
201            .unwrap()
202            .send(Message::Text(
203                json!({
204                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
205                })
206                .to_string(),
207            ))
208            .map_err(|err| err.into())
209    }
210
211    fn read_message(
212        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
213    ) -> Result<Option<T>, PubsubClientError> {
214        let message = writable_socket.write().unwrap().read()?;
215        if message.is_ping() {
216            return Ok(None);
217        }
218        let message_text = &message.into_text()?;
219        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
220            if let Some(Object(params)) = json_msg.get("params") {
221                if let Some(result) = params.get("result") {
222                    if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
223                        return Ok(Some(x));
224                    }
225                }
226            }
227        }
228
229        Err(PubsubClientError::UnexpectedMessageError(format!(
230            "msg={message_text}"
231        )))
232    }
233
234    /// Shutdown the internel message receiver and wait for its thread to exit.
235    ///
236    /// Note that this will block as long as the subscription receiver is
237    /// waiting on messages from the server, and this can take an unbounded
238    /// amount of time if the server does not send any messages.
239    ///
240    /// If a pubsub client needs to shutdown reliably it should use
241    /// the async client in [`crate::nonblocking::pubsub_client`].
242    pub fn shutdown(&mut self) -> std::thread::Result<()> {
243        if self.t_cleanup.is_some() {
244            info!("websocket thread - shutting down");
245            self.exit.store(true, Ordering::Relaxed);
246            let x = self.t_cleanup.take().unwrap().join();
247            info!("websocket thread - shut down.");
248            x
249        } else {
250            warn!("websocket thread - already shut down.");
251            Ok(())
252        }
253    }
254}
255
256pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
257pub type LogsSubscription = (
258    PubsubLogsClientSubscription,
259    Receiver<RpcResponse<RpcLogsResponse>>,
260);
261
262pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
263pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
264
265pub type PubsubSignatureClientSubscription =
266    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
267pub type SignatureSubscription = (
268    PubsubSignatureClientSubscription,
269    Receiver<RpcResponse<RpcSignatureResult>>,
270);
271
272pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
273pub type BlockSubscription = (
274    PubsubBlockClientSubscription,
275    Receiver<RpcResponse<RpcBlockUpdate>>,
276);
277
278pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
279pub type ProgramSubscription = (
280    PubsubProgramClientSubscription,
281    Receiver<RpcResponse<RpcKeyedAccount>>,
282);
283
284pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
285pub type AccountSubscription = (
286    PubsubAccountClientSubscription,
287    Receiver<RpcResponse<UiAccount>>,
288);
289
290pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
291pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
292
293pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
294pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
295
296/// A client for subscribing to messages from the RPC server.
297///
298/// See the [module documentation][self].
299pub struct PubsubClient {}
300
301fn connect_with_retry(
302    url: Url,
303) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
304    let mut connection_retries = 5;
305    loop {
306        let result = connect(url.clone()).map(|(socket, _)| socket);
307        if let Err(tungstenite::Error::Http(response)) = &result {
308            if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
309            {
310                let mut duration = Duration::from_millis(500);
311                if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
312                    if let Ok(retry_after) = retry_after.to_str() {
313                        if let Ok(retry_after) = retry_after.parse::<u64>() {
314                            if retry_after < 120 {
315                                duration = Duration::from_secs(retry_after);
316                            }
317                        }
318                    }
319                }
320
321                connection_retries -= 1;
322                debug!(
323                    "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
324                    response, connection_retries, duration
325                );
326
327                sleep(duration);
328                continue;
329            }
330        }
331        return result;
332    }
333}
334
335impl PubsubClient {
336    /// Subscribe to account events.
337    ///
338    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
339    ///
340    /// # RPC Reference
341    ///
342    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
343    ///
344    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket/accountsubscribe
345    pub fn account_subscribe(
346        url: &str,
347        pubkey: &Pubkey,
348        config: Option<RpcAccountInfoConfig>,
349    ) -> Result<AccountSubscription, PubsubClientError> {
350        let url = Url::parse(url)?;
351        let socket = connect_with_retry(url)?;
352        let (sender, receiver) = unbounded();
353
354        let socket = Arc::new(RwLock::new(socket));
355        let socket_clone = socket.clone();
356        let exit = Arc::new(AtomicBool::new(false));
357        let exit_clone = exit.clone();
358        let body = json!({
359            "jsonrpc":"2.0",
360            "id":1,
361            "method":"accountSubscribe",
362            "params":[
363                pubkey.to_string(),
364                config
365            ]
366        })
367        .to_string();
368        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
369
370        let t_cleanup = std::thread::spawn(move || {
371            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
372        });
373
374        let result = PubsubClientSubscription {
375            message_type: PhantomData,
376            operation: "account",
377            socket,
378            subscription_id,
379            t_cleanup: Some(t_cleanup),
380            exit,
381        };
382
383        Ok((result, receiver))
384    }
385
386    /// Subscribe to block events.
387    ///
388    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
389    ///
390    /// This method is disabled by default. It can be enabled by passing
391    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
392    ///
393    /// # RPC Reference
394    ///
395    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
396    ///
397    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket/blocksubscribe
398    pub fn block_subscribe(
399        url: &str,
400        filter: RpcBlockSubscribeFilter,
401        config: Option<RpcBlockSubscribeConfig>,
402    ) -> Result<BlockSubscription, PubsubClientError> {
403        let url = Url::parse(url)?;
404        let socket = connect_with_retry(url)?;
405        let (sender, receiver) = unbounded();
406
407        let socket = Arc::new(RwLock::new(socket));
408        let socket_clone = socket.clone();
409        let exit = Arc::new(AtomicBool::new(false));
410        let exit_clone = exit.clone();
411        let body = json!({
412            "jsonrpc":"2.0",
413            "id":1,
414            "method":"blockSubscribe",
415            "params":[filter, config]
416        })
417        .to_string();
418
419        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
420
421        let t_cleanup = std::thread::spawn(move || {
422            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
423        });
424
425        let result = PubsubClientSubscription {
426            message_type: PhantomData,
427            operation: "block",
428            socket,
429            subscription_id,
430            t_cleanup: Some(t_cleanup),
431            exit,
432        };
433
434        Ok((result, receiver))
435    }
436
437    /// Subscribe to transaction log events.
438    ///
439    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
440    ///
441    /// # RPC Reference
442    ///
443    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
444    ///
445    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket/logssubscribe
446    pub fn logs_subscribe(
447        url: &str,
448        filter: RpcTransactionLogsFilter,
449        config: RpcTransactionLogsConfig,
450    ) -> Result<LogsSubscription, PubsubClientError> {
451        let url = Url::parse(url)?;
452        let socket = connect_with_retry(url)?;
453        let (sender, receiver) = unbounded();
454
455        let socket = Arc::new(RwLock::new(socket));
456        let socket_clone = socket.clone();
457        let exit = Arc::new(AtomicBool::new(false));
458        let exit_clone = exit.clone();
459        let body = json!({
460            "jsonrpc":"2.0",
461            "id":1,
462            "method":"logsSubscribe",
463            "params":[filter, config]
464        })
465        .to_string();
466
467        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
468
469        let t_cleanup = std::thread::spawn(move || {
470            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
471        });
472
473        let result = PubsubClientSubscription {
474            message_type: PhantomData,
475            operation: "logs",
476            socket,
477            subscription_id,
478            t_cleanup: Some(t_cleanup),
479            exit,
480        };
481
482        Ok((result, receiver))
483    }
484
485    /// Subscribe to program account events.
486    ///
487    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
488    /// by the given program changes.
489    ///
490    /// # RPC Reference
491    ///
492    /// This method corresponds directly to the [`programSubscribe`] RPC method.
493    ///
494    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket/programsubscribe
495    pub fn program_subscribe(
496        url: &str,
497        pubkey: &Pubkey,
498        config: Option<RpcProgramAccountsConfig>,
499    ) -> Result<ProgramSubscription, PubsubClientError> {
500        let url = Url::parse(url)?;
501        let socket = connect_with_retry(url)?;
502        let (sender, receiver) = unbounded();
503
504        let socket = Arc::new(RwLock::new(socket));
505        let socket_clone = socket.clone();
506        let exit = Arc::new(AtomicBool::new(false));
507        let exit_clone = exit.clone();
508
509        let body = json!({
510            "jsonrpc":"2.0",
511            "id":1,
512            "method":"programSubscribe",
513            "params":[
514                pubkey.to_string(),
515                config
516            ]
517        })
518        .to_string();
519        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
520
521        let t_cleanup = std::thread::spawn(move || {
522            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
523        });
524
525        let result = PubsubClientSubscription {
526            message_type: PhantomData,
527            operation: "program",
528            socket,
529            subscription_id,
530            t_cleanup: Some(t_cleanup),
531            exit,
532        };
533
534        Ok((result, receiver))
535    }
536
537    /// Subscribe to vote events.
538    ///
539    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
540    /// votes are observed prior to confirmation and may never be confirmed.
541    ///
542    /// This method is disabled by default. It can be enabled by passing
543    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
544    ///
545    /// # RPC Reference
546    ///
547    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
548    ///
549    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
550    pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
551        let url = Url::parse(url)?;
552        let socket = connect_with_retry(url)?;
553        let (sender, receiver) = unbounded();
554
555        let socket = Arc::new(RwLock::new(socket));
556        let socket_clone = socket.clone();
557        let exit = Arc::new(AtomicBool::new(false));
558        let exit_clone = exit.clone();
559        let body = json!({
560            "jsonrpc":"2.0",
561            "id":1,
562            "method":"voteSubscribe",
563        })
564        .to_string();
565        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
566
567        let t_cleanup = std::thread::spawn(move || {
568            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
569        });
570
571        let result = PubsubClientSubscription {
572            message_type: PhantomData,
573            operation: "vote",
574            socket,
575            subscription_id,
576            t_cleanup: Some(t_cleanup),
577            exit,
578        };
579
580        Ok((result, receiver))
581    }
582
583    /// Subscribe to root events.
584    ///
585    /// Receives messages of type [`Slot`] when a new [root] is set by the
586    /// validator.
587    ///
588    /// [root]: https://solana.com/docs/terminology#root
589    ///
590    /// # RPC Reference
591    ///
592    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
593    ///
594    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
595    pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
596        let url = Url::parse(url)?;
597        let socket = connect_with_retry(url)?;
598        let (sender, receiver) = unbounded();
599
600        let socket = Arc::new(RwLock::new(socket));
601        let socket_clone = socket.clone();
602        let exit = Arc::new(AtomicBool::new(false));
603        let exit_clone = exit.clone();
604        let body = json!({
605            "jsonrpc":"2.0",
606            "id":1,
607            "method":"rootSubscribe",
608        })
609        .to_string();
610        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
611
612        let t_cleanup = std::thread::spawn(move || {
613            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
614        });
615
616        let result = PubsubClientSubscription {
617            message_type: PhantomData,
618            operation: "root",
619            socket,
620            subscription_id,
621            t_cleanup: Some(t_cleanup),
622            exit,
623        };
624
625        Ok((result, receiver))
626    }
627
628    /// Subscribe to transaction confirmation events.
629    ///
630    /// Receives messages of type [`RpcSignatureResult`] when a transaction
631    /// with the given signature is committed.
632    ///
633    /// This is a subscription to a single notification. It is automatically
634    /// cancelled by the server once the notification is sent.
635    ///
636    /// # RPC Reference
637    ///
638    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
639    ///
640    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket/signaturesubscribe
641    pub fn signature_subscribe(
642        url: &str,
643        signature: &Signature,
644        config: Option<RpcSignatureSubscribeConfig>,
645    ) -> Result<SignatureSubscription, PubsubClientError> {
646        let url = Url::parse(url)?;
647        let socket = connect_with_retry(url)?;
648        let (sender, receiver) = unbounded();
649
650        let socket = Arc::new(RwLock::new(socket));
651        let socket_clone = socket.clone();
652        let exit = Arc::new(AtomicBool::new(false));
653        let exit_clone = exit.clone();
654        let body = json!({
655            "jsonrpc":"2.0",
656            "id":1,
657            "method":"signatureSubscribe",
658            "params":[
659                signature.to_string(),
660                config
661            ]
662        })
663        .to_string();
664        let subscription_id =
665            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
666
667        let t_cleanup = std::thread::spawn(move || {
668            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
669        });
670
671        let result = PubsubClientSubscription {
672            message_type: PhantomData,
673            operation: "signature",
674            socket,
675            subscription_id,
676            t_cleanup: Some(t_cleanup),
677            exit,
678        };
679
680        Ok((result, receiver))
681    }
682
683    /// Subscribe to slot events.
684    ///
685    /// Receives messages of type [`SlotInfo`] when a slot is processed.
686    ///
687    /// # RPC Reference
688    ///
689    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
690    ///
691    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
692    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
693        let url = Url::parse(url)?;
694        let socket = connect_with_retry(url)?;
695        let (sender, receiver) = unbounded::<SlotInfo>();
696
697        let socket = Arc::new(RwLock::new(socket));
698        let socket_clone = socket.clone();
699        let exit = Arc::new(AtomicBool::new(false));
700        let exit_clone = exit.clone();
701        let body = json!({
702            "jsonrpc":"2.0",
703            "id":1,
704            "method":"slotSubscribe",
705            "params":[]
706        })
707        .to_string();
708        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
709
710        let t_cleanup = std::thread::spawn(move || {
711            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
712        });
713
714        let result = PubsubClientSubscription {
715            message_type: PhantomData,
716            operation: "slot",
717            socket,
718            subscription_id,
719            t_cleanup: Some(t_cleanup),
720            exit,
721        };
722
723        Ok((result, receiver))
724    }
725
726    /// Subscribe to slot update events.
727    ///
728    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
729    ///
730    /// Note that this method operates differently than other subscriptions:
731    /// instead of sending the message to a receiver on a channel, it accepts a
732    /// `handler` callback that processes the message directly. This processing
733    /// occurs on another thread.
734    ///
735    /// # RPC Reference
736    ///
737    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
738    ///
739    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket/slotsupdatessubscribe
740    pub fn slot_updates_subscribe(
741        url: &str,
742        handler: impl Fn(SlotUpdate) + Send + 'static,
743    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
744        let url = Url::parse(url)?;
745        let socket = connect_with_retry(url)?;
746
747        let socket = Arc::new(RwLock::new(socket));
748        let socket_clone = socket.clone();
749        let exit = Arc::new(AtomicBool::new(false));
750        let exit_clone = exit.clone();
751        let body = json!({
752            "jsonrpc":"2.0",
753            "id":1,
754            "method":"slotsUpdatesSubscribe",
755            "params":[]
756        })
757        .to_string();
758        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
759
760        let t_cleanup = std::thread::spawn(move || {
761            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
762        });
763
764        Ok(PubsubClientSubscription {
765            message_type: PhantomData,
766            operation: "slotsUpdates",
767            socket,
768            subscription_id,
769            t_cleanup: Some(t_cleanup),
770            exit,
771        })
772    }
773
774    fn cleanup_with_sender<T>(
775        exit: Arc<AtomicBool>,
776        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
777        sender: Sender<T>,
778    ) where
779        T: DeserializeOwned + Send + 'static,
780    {
781        let handler = move |message| match sender.send(message) {
782            Ok(_) => (),
783            Err(err) => {
784                info!("receive error: {:?}", err);
785            }
786        };
787        Self::cleanup_with_handler(exit, socket, handler);
788    }
789
790    fn cleanup_with_handler<T, F>(
791        exit: Arc<AtomicBool>,
792        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
793        handler: F,
794    ) where
795        T: DeserializeOwned,
796        F: Fn(T) + Send + 'static,
797    {
798        loop {
799            if exit.load(Ordering::Relaxed) {
800                break;
801            }
802
803            match PubsubClientSubscription::read_message(socket) {
804                Ok(Some(message)) => handler(message),
805                Ok(None) => {
806                    // Nothing useful, means we received a ping message
807                }
808                Err(err) => {
809                    info!("receive error: {:?}", err);
810                    break;
811                }
812            }
813        }
814
815        info!("websocket - exited receive loop");
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    // see client-test/test/client.rs
822}