solana_pubsub_client/
pubsub_client.rs

1//! A client for subscribing to messages from the RPC server.
2//!
3//! The [`PubsubClient`] implements [Solana WebSocket event
4//! subscriptions][spec].
5//!
6//! [spec]: https://solana.com/docs/rpc/websocket
7//!
8//! This is a blocking API. For a non-blocking API use the asynchronous client
9//! in [`crate::nonblocking::pubsub_client`].
10//!
11//! `PubsubClient` contains static methods to subscribe to events, like
12//! [`PubsubClient::account_subscribe`]. These methods each return their own
13//! subscription type, like [`AccountSubscription`], that are typedefs of
14//! tuples, the first element being a handle to the subscription, like
15//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of
16//! whichever type is appropriate for the subscription. The subscription handle
17//! is a typedef of [`PubsubClientSubscription`], and it must remain live for
18//! the receiver to continue receiving messages.
19//!
20//! Because this is a blocking API, with blocking receivers, a reasonable
21//! pattern for using this API is to move each event receiver to its own thread
22//! to block on messages, while holding all subscription handles on a single
23//! primary thread.
24//!
25//! While `PubsubClientSubscription` contains methods for shutting down,
26//! [`PubsubClientSubscription::send_unsubscribe`], and
27//! [`PubsubClientSubscription::shutdown`], because its internal receivers block
28//! on events from the server, these subscriptions cannot actually be shutdown
29//! reliably. For a non-blocking, cancelable API, use the asynchronous client
30//! in [`crate::nonblocking::pubsub_client`].
31//!
32//! By default the [`block_subscribe`] and [`vote_subscribe`] events are
33//! disabled on RPC nodes. They can be enabled by passing
34//! `--rpc-pubsub-enable-block-subscription` and
35//! `--rpc-pubsub-enable-vote-subscription` to `agave-validator`. When these
36//! methods are disabled, the RPC server will return a "Method not found" error
37//! message.
38//!
39//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe
40//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe
41//!
42//! # Examples
43//!
44//! This example subscribes to account events and then loops forever receiving
45//! them.
46//!
47//! ```
48//! use anyhow::Result;
49//! use solana_sdk::commitment_config::CommitmentConfig;
50//! use solana_pubsub_client::pubsub_client::PubsubClient;
51//! use solana_rpc_client_api::config::RpcAccountInfoConfig;
52//! use solana_sdk::pubkey::Pubkey;
53//! use std::thread;
54//!
55//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> {
56//!     let url = "wss://api.devnet.solana.com/";
57//!
58//!     let (mut account_subscription_client, account_subscription_receiver) =
59//!         PubsubClient::account_subscribe(
60//!             url,
61//!             &account_pubkey,
62//!             Some(RpcAccountInfoConfig {
63//!                 encoding: None,
64//!                 data_slice: None,
65//!                 commitment: Some(CommitmentConfig::confirmed()),
66//!                 min_context_slot: None,
67//!             }),
68//!         )?;
69//!
70//!     loop {
71//!         match account_subscription_receiver.recv() {
72//!             Ok(response) => {
73//!                 println!("account subscription response: {:?}", response);
74//!             }
75//!             Err(e) => {
76//!                 println!("account subscription error: {:?}", e);
77//!                 break;
78//!             }
79//!         }
80//!     }
81//!
82//!     Ok(())
83//! }
84//! #
85//! # get_account_updates(solana_sdk::pubkey::new_rand());
86//! # Ok::<(), anyhow::Error>(())
87//! ```
88
89pub use crate::nonblocking::pubsub_client::PubsubClientError;
90use {
91    crossbeam_channel::{unbounded, Receiver, Sender},
92    log::*,
93    serde::de::DeserializeOwned,
94    serde_json::{
95        json,
96        value::Value::{Number, Object},
97        Map, Value,
98    },
99    solana_account_decoder::UiAccount,
100    solana_rpc_client_api::{
101        config::{
102            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
103            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
104            RpcTransactionLogsFilter,
105        },
106        response::{
107            Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
108            RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
109        },
110    },
111    solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
112    std::{
113        marker::PhantomData,
114        net::TcpStream,
115        sync::{
116            atomic::{AtomicBool, Ordering},
117            Arc, RwLock,
118        },
119        thread::{sleep, JoinHandle},
120        time::Duration,
121    },
122    tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
123    url::Url,
124};
125
126/// A subscription.
127///
128/// The subscription is unsubscribed on drop, and note that unsubscription (and
129/// thus drop) time is unbounded. See
130/// [`PubsubClientSubscription::send_unsubscribe`].
131pub struct PubsubClientSubscription<T>
132where
133    T: DeserializeOwned,
134{
135    message_type: PhantomData<T>,
136    operation: &'static str,
137    socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
138    subscription_id: u64,
139    t_cleanup: Option<JoinHandle<()>>,
140    exit: Arc<AtomicBool>,
141}
142
143impl<T> Drop for PubsubClientSubscription<T>
144where
145    T: DeserializeOwned,
146{
147    fn drop(&mut self) {
148        self.send_unsubscribe()
149            .unwrap_or_else(|_| warn!("unable to unsubscribe from websocket"));
150        self.socket
151            .write()
152            .unwrap()
153            .close(None)
154            .unwrap_or_else(|_| warn!("unable to close websocket"));
155    }
156}
157
158impl<T> PubsubClientSubscription<T>
159where
160    T: DeserializeOwned,
161{
162    fn send_subscribe(
163        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
164        body: String,
165    ) -> Result<u64, PubsubClientError> {
166        writable_socket.write().unwrap().send(Message::Text(body))?;
167        let message = writable_socket.write().unwrap().read()?;
168        Self::extract_subscription_id(message)
169    }
170
171    fn extract_subscription_id(message: Message) -> Result<u64, PubsubClientError> {
172        let message_text = &message.into_text()?;
173
174        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
175            if let Some(Number(x)) = json_msg.get("result") {
176                if let Some(x) = x.as_u64() {
177                    return Ok(x);
178                }
179            }
180        }
181
182        Err(PubsubClientError::UnexpectedSubscriptionResponse(format!(
183            "msg={message_text}"
184        )))
185    }
186
187    /// Send an unsubscribe message to the server.
188    ///
189    /// Note that this will block as long as the internal subscription receiver
190    /// is waiting on messages from the server, and this can take an unbounded
191    /// amount of time if the server does not send any messages.
192    ///
193    /// If a pubsub client needs to shutdown reliably it should use
194    /// the async client in [`crate::nonblocking::pubsub_client`].
195    pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
196        let method = format!("{}Unsubscribe", self.operation);
197        self.socket
198            .write()
199            .unwrap()
200            .send(Message::Text(
201                json!({
202                "jsonrpc":"2.0","id":1,"method":method,"params":[self.subscription_id]
203                })
204                .to_string(),
205            ))
206            .map_err(|err| err.into())
207    }
208
209    fn read_message(
210        writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
211    ) -> Result<Option<T>, PubsubClientError> {
212        let message = writable_socket.write().unwrap().read()?;
213        if message.is_ping() {
214            return Ok(None);
215        }
216        let message_text = &message.into_text()?;
217        if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
218            if let Some(Object(params)) = json_msg.get("params") {
219                if let Some(result) = params.get("result") {
220                    if let Ok(x) = serde_json::from_value::<T>(result.clone()) {
221                        return Ok(Some(x));
222                    }
223                }
224            }
225        }
226
227        Err(PubsubClientError::UnexpectedMessageError(format!(
228            "msg={message_text}"
229        )))
230    }
231
232    /// Shutdown the internel message receiver and wait for its thread to exit.
233    ///
234    /// Note that this will block as long as the subscription receiver is
235    /// waiting on messages from the server, and this can take an unbounded
236    /// amount of time if the server does not send any messages.
237    ///
238    /// If a pubsub client needs to shutdown reliably it should use
239    /// the async client in [`crate::nonblocking::pubsub_client`].
240    pub fn shutdown(&mut self) -> std::thread::Result<()> {
241        if self.t_cleanup.is_some() {
242            info!("websocket thread - shutting down");
243            self.exit.store(true, Ordering::Relaxed);
244            let x = self.t_cleanup.take().unwrap().join();
245            info!("websocket thread - shut down.");
246            x
247        } else {
248            warn!("websocket thread - already shut down.");
249            Ok(())
250        }
251    }
252}
253
254pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
255pub type LogsSubscription = (
256    PubsubLogsClientSubscription,
257    Receiver<RpcResponse<RpcLogsResponse>>,
258);
259
260pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
261pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
262
263pub type PubsubSignatureClientSubscription =
264    PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
265pub type SignatureSubscription = (
266    PubsubSignatureClientSubscription,
267    Receiver<RpcResponse<RpcSignatureResult>>,
268);
269
270pub type PubsubBlockClientSubscription = PubsubClientSubscription<RpcResponse<RpcBlockUpdate>>;
271pub type BlockSubscription = (
272    PubsubBlockClientSubscription,
273    Receiver<RpcResponse<RpcBlockUpdate>>,
274);
275
276pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
277pub type ProgramSubscription = (
278    PubsubProgramClientSubscription,
279    Receiver<RpcResponse<RpcKeyedAccount>>,
280);
281
282pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
283pub type AccountSubscription = (
284    PubsubAccountClientSubscription,
285    Receiver<RpcResponse<UiAccount>>,
286);
287
288pub type PubsubVoteClientSubscription = PubsubClientSubscription<RpcVote>;
289pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver<RpcVote>);
290
291pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
292pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
293
294/// A client for subscribing to messages from the RPC server.
295///
296/// See the [module documentation][self].
297pub struct PubsubClient {}
298
299fn connect_with_retry(
300    url: Url,
301) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
302    let mut connection_retries = 5;
303    loop {
304        let result = connect(url.clone()).map(|(socket, _)| socket);
305        if let Err(tungstenite::Error::Http(response)) = &result {
306            if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS && connection_retries > 0
307            {
308                let mut duration = Duration::from_millis(500);
309                if let Some(retry_after) = response.headers().get(reqwest::header::RETRY_AFTER) {
310                    if let Ok(retry_after) = retry_after.to_str() {
311                        if let Ok(retry_after) = retry_after.parse::<u64>() {
312                            if retry_after < 120 {
313                                duration = Duration::from_secs(retry_after);
314                            }
315                        }
316                    }
317                }
318
319                connection_retries -= 1;
320                debug!(
321                    "Too many requests: server responded with {:?}, {} retries left, pausing for {:?}",
322                    response, connection_retries, duration
323                );
324
325                sleep(duration);
326                continue;
327            }
328        }
329        return result;
330    }
331}
332
333impl PubsubClient {
334    /// Subscribe to account events.
335    ///
336    /// Receives messages of type [`UiAccount`] when an account's lamports or data changes.
337    ///
338    /// # RPC Reference
339    ///
340    /// This method corresponds directly to the [`accountSubscribe`] RPC method.
341    ///
342    /// [`accountSubscribe`]: https://solana.com/docs/rpc/websocket/accountsubscribe
343    pub fn account_subscribe(
344        url: &str,
345        pubkey: &Pubkey,
346        config: Option<RpcAccountInfoConfig>,
347    ) -> Result<AccountSubscription, PubsubClientError> {
348        let url = Url::parse(url)?;
349        let socket = connect_with_retry(url)?;
350        let (sender, receiver) = unbounded();
351
352        let socket = Arc::new(RwLock::new(socket));
353        let socket_clone = socket.clone();
354        let exit = Arc::new(AtomicBool::new(false));
355        let exit_clone = exit.clone();
356        let body = json!({
357            "jsonrpc":"2.0",
358            "id":1,
359            "method":"accountSubscribe",
360            "params":[
361                pubkey.to_string(),
362                config
363            ]
364        })
365        .to_string();
366        let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
367
368        let t_cleanup = std::thread::spawn(move || {
369            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
370        });
371
372        let result = PubsubClientSubscription {
373            message_type: PhantomData,
374            operation: "account",
375            socket,
376            subscription_id,
377            t_cleanup: Some(t_cleanup),
378            exit,
379        };
380
381        Ok((result, receiver))
382    }
383
384    /// Subscribe to block events.
385    ///
386    /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized.
387    ///
388    /// This method is disabled by default. It can be enabled by passing
389    /// `--rpc-pubsub-enable-block-subscription` to `agave-validator`.
390    ///
391    /// # RPC Reference
392    ///
393    /// This method corresponds directly to the [`blockSubscribe`] RPC method.
394    ///
395    /// [`blockSubscribe`]: https://solana.com/docs/rpc/websocket/blocksubscribe
396    pub fn block_subscribe(
397        url: &str,
398        filter: RpcBlockSubscribeFilter,
399        config: Option<RpcBlockSubscribeConfig>,
400    ) -> Result<BlockSubscription, PubsubClientError> {
401        let url = Url::parse(url)?;
402        let socket = connect_with_retry(url)?;
403        let (sender, receiver) = unbounded();
404
405        let socket = Arc::new(RwLock::new(socket));
406        let socket_clone = socket.clone();
407        let exit = Arc::new(AtomicBool::new(false));
408        let exit_clone = exit.clone();
409        let body = json!({
410            "jsonrpc":"2.0",
411            "id":1,
412            "method":"blockSubscribe",
413            "params":[filter, config]
414        })
415        .to_string();
416
417        let subscription_id = PubsubBlockClientSubscription::send_subscribe(&socket_clone, body)?;
418
419        let t_cleanup = std::thread::spawn(move || {
420            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
421        });
422
423        let result = PubsubClientSubscription {
424            message_type: PhantomData,
425            operation: "block",
426            socket,
427            subscription_id,
428            t_cleanup: Some(t_cleanup),
429            exit,
430        };
431
432        Ok((result, receiver))
433    }
434
435    /// Subscribe to transaction log events.
436    ///
437    /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed.
438    ///
439    /// # RPC Reference
440    ///
441    /// This method corresponds directly to the [`logsSubscribe`] RPC method.
442    ///
443    /// [`logsSubscribe`]: https://solana.com/docs/rpc/websocket/logssubscribe
444    pub fn logs_subscribe(
445        url: &str,
446        filter: RpcTransactionLogsFilter,
447        config: RpcTransactionLogsConfig,
448    ) -> Result<LogsSubscription, PubsubClientError> {
449        let url = Url::parse(url)?;
450        let socket = connect_with_retry(url)?;
451        let (sender, receiver) = unbounded();
452
453        let socket = Arc::new(RwLock::new(socket));
454        let socket_clone = socket.clone();
455        let exit = Arc::new(AtomicBool::new(false));
456        let exit_clone = exit.clone();
457        let body = json!({
458            "jsonrpc":"2.0",
459            "id":1,
460            "method":"logsSubscribe",
461            "params":[filter, config]
462        })
463        .to_string();
464
465        let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
466
467        let t_cleanup = std::thread::spawn(move || {
468            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
469        });
470
471        let result = PubsubClientSubscription {
472            message_type: PhantomData,
473            operation: "logs",
474            socket,
475            subscription_id,
476            t_cleanup: Some(t_cleanup),
477            exit,
478        };
479
480        Ok((result, receiver))
481    }
482
483    /// Subscribe to program account events.
484    ///
485    /// Receives messages of type [`RpcKeyedAccount`] when an account owned
486    /// by the given program changes.
487    ///
488    /// # RPC Reference
489    ///
490    /// This method corresponds directly to the [`programSubscribe`] RPC method.
491    ///
492    /// [`programSubscribe`]: https://solana.com/docs/rpc/websocket/programsubscribe
493    pub fn program_subscribe(
494        url: &str,
495        pubkey: &Pubkey,
496        config: Option<RpcProgramAccountsConfig>,
497    ) -> Result<ProgramSubscription, PubsubClientError> {
498        let url = Url::parse(url)?;
499        let socket = connect_with_retry(url)?;
500        let (sender, receiver) = unbounded();
501
502        let socket = Arc::new(RwLock::new(socket));
503        let socket_clone = socket.clone();
504        let exit = Arc::new(AtomicBool::new(false));
505        let exit_clone = exit.clone();
506
507        let body = json!({
508            "jsonrpc":"2.0",
509            "id":1,
510            "method":"programSubscribe",
511            "params":[
512                pubkey.to_string(),
513                config
514            ]
515        })
516        .to_string();
517        let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
518
519        let t_cleanup = std::thread::spawn(move || {
520            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
521        });
522
523        let result = PubsubClientSubscription {
524            message_type: PhantomData,
525            operation: "program",
526            socket,
527            subscription_id,
528            t_cleanup: Some(t_cleanup),
529            exit,
530        };
531
532        Ok((result, receiver))
533    }
534
535    /// Subscribe to vote events.
536    ///
537    /// Receives messages of type [`RpcVote`] when a new vote is observed. These
538    /// votes are observed prior to confirmation and may never be confirmed.
539    ///
540    /// This method is disabled by default. It can be enabled by passing
541    /// `--rpc-pubsub-enable-vote-subscription` to `agave-validator`.
542    ///
543    /// # RPC Reference
544    ///
545    /// This method corresponds directly to the [`voteSubscribe`] RPC method.
546    ///
547    /// [`voteSubscribe`]: https://solana.com/docs/rpc/websocket/votesubscribe
548    pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
549        let url = Url::parse(url)?;
550        let socket = connect_with_retry(url)?;
551        let (sender, receiver) = unbounded();
552
553        let socket = Arc::new(RwLock::new(socket));
554        let socket_clone = socket.clone();
555        let exit = Arc::new(AtomicBool::new(false));
556        let exit_clone = exit.clone();
557        let body = json!({
558            "jsonrpc":"2.0",
559            "id":1,
560            "method":"voteSubscribe",
561        })
562        .to_string();
563        let subscription_id = PubsubVoteClientSubscription::send_subscribe(&socket_clone, body)?;
564
565        let t_cleanup = std::thread::spawn(move || {
566            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
567        });
568
569        let result = PubsubClientSubscription {
570            message_type: PhantomData,
571            operation: "vote",
572            socket,
573            subscription_id,
574            t_cleanup: Some(t_cleanup),
575            exit,
576        };
577
578        Ok((result, receiver))
579    }
580
581    /// Subscribe to root events.
582    ///
583    /// Receives messages of type [`Slot`] when a new [root] is set by the
584    /// validator.
585    ///
586    /// [root]: https://solana.com/docs/terminology#root
587    ///
588    /// # RPC Reference
589    ///
590    /// This method corresponds directly to the [`rootSubscribe`] RPC method.
591    ///
592    /// [`rootSubscribe`]: https://solana.com/docs/rpc/websocket/rootsubscribe
593    pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
594        let url = Url::parse(url)?;
595        let socket = connect_with_retry(url)?;
596        let (sender, receiver) = unbounded();
597
598        let socket = Arc::new(RwLock::new(socket));
599        let socket_clone = socket.clone();
600        let exit = Arc::new(AtomicBool::new(false));
601        let exit_clone = exit.clone();
602        let body = json!({
603            "jsonrpc":"2.0",
604            "id":1,
605            "method":"rootSubscribe",
606        })
607        .to_string();
608        let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
609
610        let t_cleanup = std::thread::spawn(move || {
611            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
612        });
613
614        let result = PubsubClientSubscription {
615            message_type: PhantomData,
616            operation: "root",
617            socket,
618            subscription_id,
619            t_cleanup: Some(t_cleanup),
620            exit,
621        };
622
623        Ok((result, receiver))
624    }
625
626    /// Subscribe to transaction confirmation events.
627    ///
628    /// Receives messages of type [`RpcSignatureResult`] when a transaction
629    /// with the given signature is committed.
630    ///
631    /// This is a subscription to a single notification. It is automatically
632    /// cancelled by the server once the notification is sent.
633    ///
634    /// # RPC Reference
635    ///
636    /// This method corresponds directly to the [`signatureSubscribe`] RPC method.
637    ///
638    /// [`signatureSubscribe`]: https://solana.com/docs/rpc/websocket/signaturesubscribe
639    pub fn signature_subscribe(
640        url: &str,
641        signature: &Signature,
642        config: Option<RpcSignatureSubscribeConfig>,
643    ) -> Result<SignatureSubscription, PubsubClientError> {
644        let url = Url::parse(url)?;
645        let socket = connect_with_retry(url)?;
646        let (sender, receiver) = unbounded();
647
648        let socket = Arc::new(RwLock::new(socket));
649        let socket_clone = socket.clone();
650        let exit = Arc::new(AtomicBool::new(false));
651        let exit_clone = exit.clone();
652        let body = json!({
653            "jsonrpc":"2.0",
654            "id":1,
655            "method":"signatureSubscribe",
656            "params":[
657                signature.to_string(),
658                config
659            ]
660        })
661        .to_string();
662        let subscription_id =
663            PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
664
665        let t_cleanup = std::thread::spawn(move || {
666            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
667        });
668
669        let result = PubsubClientSubscription {
670            message_type: PhantomData,
671            operation: "signature",
672            socket,
673            subscription_id,
674            t_cleanup: Some(t_cleanup),
675            exit,
676        };
677
678        Ok((result, receiver))
679    }
680
681    /// Subscribe to slot events.
682    ///
683    /// Receives messages of type [`SlotInfo`] when a slot is processed.
684    ///
685    /// # RPC Reference
686    ///
687    /// This method corresponds directly to the [`slotSubscribe`] RPC method.
688    ///
689    /// [`slotSubscribe`]: https://solana.com/docs/rpc/websocket/slotsubscribe
690    pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
691        let url = Url::parse(url)?;
692        let socket = connect_with_retry(url)?;
693        let (sender, receiver) = unbounded::<SlotInfo>();
694
695        let socket = Arc::new(RwLock::new(socket));
696        let socket_clone = socket.clone();
697        let exit = Arc::new(AtomicBool::new(false));
698        let exit_clone = exit.clone();
699        let body = json!({
700            "jsonrpc":"2.0",
701            "id":1,
702            "method":"slotSubscribe",
703            "params":[]
704        })
705        .to_string();
706        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
707
708        let t_cleanup = std::thread::spawn(move || {
709            Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
710        });
711
712        let result = PubsubClientSubscription {
713            message_type: PhantomData,
714            operation: "slot",
715            socket,
716            subscription_id,
717            t_cleanup: Some(t_cleanup),
718            exit,
719        };
720
721        Ok((result, receiver))
722    }
723
724    /// Subscribe to slot update events.
725    ///
726    /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur.
727    ///
728    /// Note that this method operates differently than other subscriptions:
729    /// instead of sending the message to a receiver on a channel, it accepts a
730    /// `handler` callback that processes the message directly. This processing
731    /// occurs on another thread.
732    ///
733    /// # RPC Reference
734    ///
735    /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method.
736    ///
737    /// [`slotUpdatesSubscribe`]: https://solana.com/docs/rpc/websocket/slotsupdatessubscribe
738    pub fn slot_updates_subscribe(
739        url: &str,
740        handler: impl Fn(SlotUpdate) + Send + 'static,
741    ) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
742        let url = Url::parse(url)?;
743        let socket = connect_with_retry(url)?;
744
745        let socket = Arc::new(RwLock::new(socket));
746        let socket_clone = socket.clone();
747        let exit = Arc::new(AtomicBool::new(false));
748        let exit_clone = exit.clone();
749        let body = json!({
750            "jsonrpc":"2.0",
751            "id":1,
752            "method":"slotsUpdatesSubscribe",
753            "params":[]
754        })
755        .to_string();
756        let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
757
758        let t_cleanup = std::thread::spawn(move || {
759            Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
760        });
761
762        Ok(PubsubClientSubscription {
763            message_type: PhantomData,
764            operation: "slotsUpdates",
765            socket,
766            subscription_id,
767            t_cleanup: Some(t_cleanup),
768            exit,
769        })
770    }
771
772    fn cleanup_with_sender<T>(
773        exit: Arc<AtomicBool>,
774        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
775        sender: Sender<T>,
776    ) where
777        T: DeserializeOwned + Send + 'static,
778    {
779        let handler = move |message| match sender.send(message) {
780            Ok(_) => (),
781            Err(err) => {
782                info!("receive error: {:?}", err);
783            }
784        };
785        Self::cleanup_with_handler(exit, socket, handler);
786    }
787
788    fn cleanup_with_handler<T, F>(
789        exit: Arc<AtomicBool>,
790        socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
791        handler: F,
792    ) where
793        T: DeserializeOwned,
794        F: Fn(T) + Send + 'static,
795    {
796        loop {
797            if exit.load(Ordering::Relaxed) {
798                break;
799            }
800
801            match PubsubClientSubscription::read_message(socket) {
802                Ok(Some(message)) => handler(message),
803                Ok(None) => {
804                    // Nothing useful, means we received a ping message
805                }
806                Err(err) => {
807                    info!("receive error: {:?}", err);
808                    break;
809                }
810            }
811        }
812
813        info!("websocket - exited receive loop");
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    // see client-test/test/client.rs
820}